Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 75eee91

Browse files
PYTHON-5505 Prototype system overload retry loop for all operations (#2497)
All commands that fail with the "Retryable" error label will be retried up to 3 times. When the error includes the "SystemOverloaded" error label we apply exponential backoff with jitter before attempting a retry.
1 parent cf7a1aa commit 75eee91

File tree

11 files changed

+491
-34
lines changed

11 files changed

+491
-34
lines changed

‎pymongo/asynchronous/collection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
AsyncCursor,
5959
AsyncRawBatchCursor,
6060
)
61+
from pymongo.asynchronous.helpers import _retry_overload
6162
from pymongo.collation import validate_collation_or_none
6263
from pymongo.common import _ecoc_coll_name, _esc_coll_name
6364
from pymongo.errors import (
@@ -2227,6 +2228,7 @@ async def create_indexes(
22272228
return await self._create_indexes(indexes, session, **kwargs)
22282229

22292230
@_csot.apply
2231+
@_retry_overload
22302232
async def _create_indexes(
22312233
self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any
22322234
) -> list[str]:
@@ -2422,7 +2424,6 @@ async def drop_indexes(
24222424
kwargs["comment"] = comment
24232425
await self._drop_index("*", session=session, **kwargs)
24242426

2425-
@_csot.apply
24262427
async def drop_index(
24272428
self,
24282429
index_or_name: _IndexKeyHint,
@@ -2472,6 +2473,7 @@ async def drop_index(
24722473
await self._drop_index(index_or_name, session, comment, **kwargs)
24732474

24742475
@_csot.apply
2476+
@_retry_overload
24752477
async def _drop_index(
24762478
self,
24772479
index_or_name: _IndexKeyHint,
@@ -3079,6 +3081,7 @@ async def aggregate_raw_batches(
30793081
)
30803082

30813083
@_csot.apply
3084+
@_retry_overload
30823085
async def rename(
30833086
self,
30843087
new_name: str,

‎pymongo/asynchronous/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream
3939
from pymongo.asynchronous.collection import AsyncCollection
4040
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
41+
from pymongo.asynchronous.helpers import _retry_overload
4142
from pymongo.common import _ecoc_coll_name, _esc_coll_name
4243
from pymongo.database_shared import _check_name, _CodecDocumentType
4344
from pymongo.errors import CollectionInvalid, InvalidOperation
@@ -477,6 +478,7 @@ async def watch(
477478
return change_stream
478479

479480
@_csot.apply
481+
@_retry_overload
480482
async def create_collection(
481483
self,
482484
name: str,
@@ -816,6 +818,7 @@ async def command(
816818
...
817819

818820
@_csot.apply
821+
@_retry_overload
819822
async def command(
820823
self,
821824
command: Union[str, MutableMapping[str, Any]],
@@ -947,6 +950,7 @@ async def command(
947950
)
948951

949952
@_csot.apply
953+
@_retry_overload
950954
async def cursor_command(
951955
self,
952956
command: Union[str, MutableMapping[str, Any]],
@@ -1264,6 +1268,7 @@ async def _drop_helper(
12641268
)
12651269

12661270
@_csot.apply
1271+
@_retry_overload
12671272
async def drop_collection(
12681273
self,
12691274
name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]],

‎pymongo/asynchronous/helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
import asyncio
1919
import builtins
20+
import functools
21+
import random
2022
import socket
2123
import sys
24+
import time
2225
from typing import (
2326
Any,
2427
Callable,
@@ -28,6 +31,7 @@
2831

2932
from pymongo.errors import (
3033
OperationFailure,
34+
PyMongoError,
3135
)
3236
from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE
3337

@@ -38,6 +42,7 @@
3842

3943

4044
def _handle_reauth(func: F) -> F:
45+
@functools.wraps(func)
4146
async def inner(*args: Any, **kwargs: Any) -> Any:
4247
no_reauth = kwargs.pop("no_reauth", False)
4348
from pymongo.asynchronous.pool import AsyncConnection
@@ -70,6 +75,42 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7075
return cast(F, inner)
7176

7277

78+
_MAX_RETRIES = 3
79+
_BACKOFF_INITIAL = 0.05
80+
_BACKOFF_MAX = 10
81+
_TIME = time
82+
83+
84+
async def _backoff(
85+
attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX
86+
) -> None:
87+
jitter = random.random() # noqa: S311
88+
backoff = jitter * min(initial_delay * (2**attempt), max_delay)
89+
await asyncio.sleep(backoff)
90+
91+
92+
def _retry_overload(func: F) -> F:
93+
@functools.wraps(func)
94+
async def inner(*args: Any, **kwargs: Any) -> Any:
95+
attempt = 0
96+
while True:
97+
try:
98+
return await func(*args, **kwargs)
99+
except PyMongoError as exc:
100+
if not exc.has_error_label("Retryable"):
101+
raise
102+
attempt += 1
103+
if attempt > _MAX_RETRIES:
104+
raise
105+
106+
# Implement exponential backoff on retry.
107+
if exc.has_error_label("SystemOverloaded"):
108+
await _backoff(attempt)
109+
continue
110+
111+
return cast(F, inner)
112+
113+
73114
async def _getaddrinfo(
74115
host: Any, port: Any, **kwargs: Any
75116
) -> list[

‎pymongo/asynchronous/mongo_client.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
6868
from pymongo.asynchronous.client_session import _EmptyServerSession
6969
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
70+
from pymongo.asynchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload
7071
from pymongo.asynchronous.settings import TopologySettings
7172
from pymongo.asynchronous.topology import Topology, _ErrorContext
7273
from pymongo.client_options import ClientOptions
@@ -2398,6 +2399,7 @@ async def list_database_names(
23982399
return [doc["name"] async for doc in res]
23992400

24002401
@_csot.apply
2402+
@_retry_overload
24012403
async def drop_database(
24022404
self,
24032405
name_or_database: Union[str, database.AsyncDatabase[_DocumentTypeArg]],
@@ -2735,6 +2737,7 @@ def __init__(
27352737
):
27362738
self._last_error: Optional[Exception] = None
27372739
self._retrying = False
2740+
self._always_retryable = False
27382741
self._multiple_retries = _csot.get_timeout() is not None
27392742
self._client = mongo_client
27402743

@@ -2783,14 +2786,22 @@ async def run(self) -> T:
27832786
# most likely be a waste of time.
27842787
raise
27852788
except PyMongoError as exc:
2789+
always_retryable = False
2790+
overloaded = False
2791+
exc_to_check = exc
27862792
# Execute specialized catch on read
27872793
if self._is_read:
27882794
if isinstance(exc, (ConnectionFailure, OperationFailure)):
27892795
# ConnectionFailures do not supply a code property
27902796
exc_code = getattr(exc, "code", None)
2791-
if self._is_not_eligible_for_retry() or (
2792-
isinstance(exc, OperationFailure)
2793-
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2797+
always_retryable = exc.has_error_label("Retryable")
2798+
overloaded = exc.has_error_label("SystemOverloaded")
2799+
if not always_retryable and (
2800+
self._is_not_eligible_for_retry()
2801+
or (
2802+
isinstance(exc, OperationFailure)
2803+
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2804+
)
27942805
):
27952806
raise
27962807
self._retrying = True
@@ -2801,19 +2812,22 @@ async def run(self) -> T:
28012812

28022813
# Specialized catch on write operation
28032814
if not self._is_read:
2804-
if not self._retryable:
2815+
if isinstance(exc, ClientBulkWriteException) and isinstance(
2816+
exc.error, PyMongoError
2817+
):
2818+
exc_to_check = exc.error
2819+
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
2820+
always_retryable = exc_to_check.has_error_label("Retryable")
2821+
overloaded = exc_to_check.has_error_label("SystemOverloaded")
2822+
if not self._retryable and not always_retryable:
28052823
raise
2806-
if isinstance(exc, ClientBulkWriteException) and exc.error:
2807-
retryable_write_error_exc = isinstance(
2808-
exc.error, PyMongoError
2809-
) and exc.error.has_error_label("RetryableWriteError")
2810-
else:
2811-
retryable_write_error_exc = exc.has_error_label("RetryableWriteError")
2812-
if retryable_write_error_exc:
2824+
if retryable_write_label or always_retryable:
28132825
assert self._session
28142826
await self._session._unpin()
2815-
if not retryable_write_error_exc or self._is_not_eligible_for_retry():
2816-
if exc.has_error_label("NoWritesPerformed") and self._last_error:
2827+
if not always_retryable and (
2828+
not retryable_write_label or self._is_not_eligible_for_retry()
2829+
):
2830+
if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error:
28172831
raise self._last_error from exc
28182832
else:
28192833
raise
@@ -2822,14 +2836,24 @@ async def run(self) -> T:
28222836
self._bulk.retrying = True
28232837
else:
28242838
self._retrying = True
2825-
if not exc.has_error_label("NoWritesPerformed"):
2839+
if not exc_to_check.has_error_label("NoWritesPerformed"):
28262840
self._last_error = exc
28272841
if self._last_error is None:
28282842
self._last_error = exc
28292843

28302844
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded:
28312845
self._deprioritized_servers.append(self._server)
28322846

2847+
self._always_retryable = always_retryable
2848+
if always_retryable:
2849+
if self._attempt_number > _MAX_RETRIES:
2850+
if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error:
2851+
raise self._last_error from exc
2852+
else:
2853+
raise
2854+
if overloaded:
2855+
await _backoff(self._attempt_number)
2856+
28332857
def _is_not_eligible_for_retry(self) -> bool:
28342858
"""Checks if the exchange is not eligible for retry"""
28352859
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
@@ -2891,7 +2915,7 @@ async def _write(self) -> T:
28912915
and conn.supports_sessions
28922916
)
28932917
is_mongos = conn.is_mongos
2894-
if not sessions_supported:
2918+
if not self._always_retryableandnotsessions_supported:
28952919
# A retry is not possible because this server does
28962920
# not support sessions raise the last error.
28972921
self._check_last_error()
@@ -2923,7 +2947,7 @@ async def _read(self) -> T:
29232947
conn,
29242948
read_pref,
29252949
):
2926-
if self._retrying and not self._retryable:
2950+
if self._retrying and not self._retryableandnotself._always_retryable:
29272951
self._check_last_error()
29282952
if self._retrying:
29292953
_debug_log(

‎pymongo/synchronous/collection.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
Cursor,
9090
RawBatchCursor,
9191
)
92+
from pymongo.synchronous.helpers import _retry_overload
9293
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
9394
from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean
9495

@@ -2224,6 +2225,7 @@ def create_indexes(
22242225
return self._create_indexes(indexes, session, **kwargs)
22252226

22262227
@_csot.apply
2228+
@_retry_overload
22272229
def _create_indexes(
22282230
self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any
22292231
) -> list[str]:
@@ -2419,7 +2421,6 @@ def drop_indexes(
24192421
kwargs["comment"] = comment
24202422
self._drop_index("*", session=session, **kwargs)
24212423

2422-
@_csot.apply
24232424
def drop_index(
24242425
self,
24252426
index_or_name: _IndexKeyHint,
@@ -2469,6 +2470,7 @@ def drop_index(
24692470
self._drop_index(index_or_name, session, comment, **kwargs)
24702471

24712472
@_csot.apply
2473+
@_retry_overload
24722474
def _drop_index(
24732475
self,
24742476
index_or_name: _IndexKeyHint,
@@ -3072,6 +3074,7 @@ def aggregate_raw_batches(
30723074
)
30733075

30743076
@_csot.apply
3077+
@_retry_overload
30753078
def rename(
30763079
self,
30773080
new_name: str,

‎pymongo/synchronous/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from pymongo.synchronous.change_stream import DatabaseChangeStream
4444
from pymongo.synchronous.collection import Collection
4545
from pymongo.synchronous.command_cursor import CommandCursor
46+
from pymongo.synchronous.helpers import _retry_overload
4647
from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline
4748

4849
if TYPE_CHECKING:
@@ -477,6 +478,7 @@ def watch(
477478
return change_stream
478479

479480
@_csot.apply
481+
@_retry_overload
480482
def create_collection(
481483
self,
482484
name: str,
@@ -816,6 +818,7 @@ def command(
816818
...
817819

818820
@_csot.apply
821+
@_retry_overload
819822
def command(
820823
self,
821824
command: Union[str, MutableMapping[str, Any]],
@@ -945,6 +948,7 @@ def command(
945948
)
946949

947950
@_csot.apply
951+
@_retry_overload
948952
def cursor_command(
949953
self,
950954
command: Union[str, MutableMapping[str, Any]],
@@ -1257,6 +1261,7 @@ def _drop_helper(
12571261
)
12581262

12591263
@_csot.apply
1264+
@_retry_overload
12601265
def drop_collection(
12611266
self,
12621267
name_or_collection: Union[str, Collection[_DocumentTypeArg]],

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /