Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 4cdf082

Browse files
petyaslavovaelena-kolevska
andauthored
Support for maintenance push notifications handling during server upgrade or maintenance procedures. (#3756)
* Hitless upgrade: Support initial implementation for synchronous Redis client - no handshake, no failing over notifications support. (#3713) * Adding handling of FAILING_OVER and FAILED_OVER events/push notifications (#3716) * Hitless upgrade: Adding handshake command to enable the notifications after connection is established (#3735) * Hitless-Upgrade: Add handling of MOVING push notification with "null" host:port info. (#3751) --------- Co-authored-by: Elena Kolevska <elena-kolevska@users.noreply.github.com>
1 parent b5a8ef0 commit 4cdf082

File tree

9 files changed

+4926
-94
lines changed

9 files changed

+4926
-94
lines changed

‎redis/_parsers/base.py‎

Lines changed: 173 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1+
import logging
12
import sys
23
from abc import ABC
34
from asyncio import IncompleteReadError, StreamReader, TimeoutError
4-
from typing import Callable, List, Optional, Protocol, Union
5+
from typing import Awaitable, Callable, List, Optional, Protocol, Union
6+
7+
from redis.maintenance_events import (
8+
MaintenanceEvent,
9+
NodeFailedOverEvent,
10+
NodeFailingOverEvent,
11+
NodeMigratedEvent,
12+
NodeMigratingEvent,
13+
NodeMovingEvent,
14+
)
515

616
if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
717
from asyncio import timeout as async_timeout
@@ -50,6 +60,8 @@
5060
"Client sent AUTH, but no password is set": AuthenticationError,
5161
}
5262

63+
logger = logging.getLogger(__name__)
64+
5365

5466
class BaseParser(ABC):
5567
EXCEPTION_CLASSES = {
@@ -158,48 +170,195 @@ async def read_response(
158170
raise NotImplementedError()
159171

160172

161-
_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"]
173+
class MaintenanceNotificationsParser:
174+
"""Protocol defining maintenance push notification parsing functionality"""
175+
176+
@staticmethod
177+
def parse_maintenance_start_msg(response, notification_type):
178+
# Expected message format is: <event_type> <seq_number> <time>
179+
id = response[1]
180+
ttl = response[2]
181+
return notification_type(id, ttl)
182+
183+
@staticmethod
184+
def parse_maintenance_completed_msg(response, notification_type):
185+
# Expected message format is: <event_type> <seq_number>
186+
id = response[1]
187+
return notification_type(id)
188+
189+
@staticmethod
190+
def parse_moving_msg(response):
191+
# Expected message format is: MOVING <seq_number> <time> <endpoint>
192+
id = response[1]
193+
ttl = response[2]
194+
if response[3] in [b"null", "null"]:
195+
host, port = None, None
196+
else:
197+
value = response[3]
198+
if isinstance(value, bytes):
199+
value = value.decode()
200+
host, port = value.split(":")
201+
port = int(port) if port is not None else None
202+
203+
return NodeMovingEvent(id, host, port, ttl)
204+
205+
206+
_INVALIDATION_MESSAGE = "invalidate"
207+
_MOVING_MESSAGE = "MOVING"
208+
_MIGRATING_MESSAGE = "MIGRATING"
209+
_MIGRATED_MESSAGE = "MIGRATED"
210+
_FAILING_OVER_MESSAGE = "FAILING_OVER"
211+
_FAILED_OVER_MESSAGE = "FAILED_OVER"
212+
213+
_MAINTENANCE_MESSAGES = (
214+
_MIGRATING_MESSAGE,
215+
_MIGRATED_MESSAGE,
216+
_FAILING_OVER_MESSAGE,
217+
_FAILED_OVER_MESSAGE,
218+
)
219+
220+
MSG_TYPE_TO_EVENT_PARSER_MAPPING: dict[str, tuple[type[MaintenanceEvent], Callable]] = {
221+
_MIGRATING_MESSAGE: (
222+
NodeMigratingEvent,
223+
MaintenanceNotificationsParser.parse_maintenance_start_msg,
224+
),
225+
_MIGRATED_MESSAGE: (
226+
NodeMigratedEvent,
227+
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
228+
),
229+
_FAILING_OVER_MESSAGE: (
230+
NodeFailingOverEvent,
231+
MaintenanceNotificationsParser.parse_maintenance_start_msg,
232+
),
233+
_FAILED_OVER_MESSAGE: (
234+
NodeFailedOverEvent,
235+
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
236+
),
237+
_MOVING_MESSAGE: (
238+
NodeMovingEvent,
239+
MaintenanceNotificationsParser.parse_moving_msg,
240+
),
241+
}
162242

163243

164244
class PushNotificationsParser(Protocol):
165245
"""Protocol defining RESP3-specific parsing functionality"""
166246

167247
pubsub_push_handler_func: Callable
168248
invalidation_push_handler_func: Optional[Callable] = None
249+
node_moving_push_handler_func: Optional[Callable] = None
250+
maintenance_push_handler_func: Optional[Callable] = None
169251

170252
def handle_pubsub_push_response(self, response):
171253
"""Handle pubsub push responses"""
172254
raise NotImplementedError()
173255

174256
def handle_push_response(self, response, **kwargs):
175-
if response[0] not in _INVALIDATION_MESSAGE:
257+
msg_type = response[0]
258+
if isinstance(msg_type, bytes):
259+
msg_type = msg_type.decode()
260+
261+
if msg_type not in (
262+
_INVALIDATION_MESSAGE,
263+
*_MAINTENANCE_MESSAGES,
264+
_MOVING_MESSAGE,
265+
):
176266
return self.pubsub_push_handler_func(response)
177-
if self.invalidation_push_handler_func:
178-
return self.invalidation_push_handler_func(response)
267+
268+
try:
269+
if (
270+
msg_type == _INVALIDATION_MESSAGE
271+
and self.invalidation_push_handler_func
272+
):
273+
return self.invalidation_push_handler_func(response)
274+
275+
if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
276+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
277+
278+
notification = parser_function(response)
279+
return self.node_moving_push_handler_func(notification)
280+
281+
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
282+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
283+
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
284+
notification = parser_function(response, notification_type)
285+
286+
if notification is not None:
287+
return self.maintenance_push_handler_func(notification)
288+
except Exception as e:
289+
logger.error(
290+
"Error handling {} message ({}): {}".format(msg_type, response, e)
291+
)
292+
293+
return None
179294

180295
def set_pubsub_push_handler(self, pubsub_push_handler_func):
181296
self.pubsub_push_handler_func = pubsub_push_handler_func
182297

183298
def set_invalidation_push_handler(self, invalidation_push_handler_func):
184299
self.invalidation_push_handler_func = invalidation_push_handler_func
185300

301+
def set_node_moving_push_handler(self, node_moving_push_handler_func):
302+
self.node_moving_push_handler_func = node_moving_push_handler_func
303+
304+
def set_maintenance_push_handler(self, maintenance_push_handler_func):
305+
self.maintenance_push_handler_func = maintenance_push_handler_func
306+
186307

187308
class AsyncPushNotificationsParser(Protocol):
188309
"""Protocol defining async RESP3-specific parsing functionality"""
189310

190311
pubsub_push_handler_func: Callable
191312
invalidation_push_handler_func: Optional[Callable] = None
313+
node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
314+
maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None
192315

193316
async def handle_pubsub_push_response(self, response):
194317
"""Handle pubsub push responses asynchronously"""
195318
raise NotImplementedError()
196319

197320
async def handle_push_response(self, response, **kwargs):
198321
"""Handle push responses asynchronously"""
199-
if response[0] not in _INVALIDATION_MESSAGE:
322+
323+
msg_type = response[0]
324+
if isinstance(msg_type, bytes):
325+
msg_type = msg_type.decode()
326+
327+
if msg_type not in (
328+
_INVALIDATION_MESSAGE,
329+
*_MAINTENANCE_MESSAGES,
330+
_MOVING_MESSAGE,
331+
):
200332
return await self.pubsub_push_handler_func(response)
201-
if self.invalidation_push_handler_func:
202-
return await self.invalidation_push_handler_func(response)
333+
334+
try:
335+
if (
336+
msg_type == _INVALIDATION_MESSAGE
337+
and self.invalidation_push_handler_func
338+
):
339+
return await self.invalidation_push_handler_func(response)
340+
341+
if isinstance(msg_type, bytes):
342+
msg_type = msg_type.decode()
343+
344+
if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
345+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
346+
notification = parser_function(response)
347+
return await self.node_moving_push_handler_func(notification)
348+
349+
if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
350+
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
351+
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
352+
notification = parser_function(response, notification_type)
353+
354+
if notification is not None:
355+
return await self.maintenance_push_handler_func(notification)
356+
except Exception as e:
357+
logger.error(
358+
"Error handling {} message ({}): {}".format(msg_type, response, e)
359+
)
360+
361+
return None
203362

204363
def set_pubsub_push_handler(self, pubsub_push_handler_func):
205364
"""Set the pubsub push handler function"""
@@ -209,6 +368,12 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
209368
"""Set the invalidation push handler function"""
210369
self.invalidation_push_handler_func = invalidation_push_handler_func
211370

371+
def set_node_moving_push_handler(self, node_moving_push_handler_func):
372+
self.node_moving_push_handler_func = node_moving_push_handler_func
373+
374+
def set_maintenance_push_handler(self, maintenance_push_handler_func):
375+
self.maintenance_push_handler_func = maintenance_push_handler_func
376+
212377

213378
class _AsyncRESPBase(AsyncBaseParser):
214379
"""Base class for async resp parsing"""

‎redis/_parsers/hiredis.py‎

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __init__(self, socket_read_size):
4747
self.socket_read_size = socket_read_size
4848
self._buffer = bytearray(socket_read_size)
4949
self.pubsub_push_handler_func = self.handle_pubsub_push_response
50+
self.node_moving_push_handler_func = None
51+
self.maintenance_push_handler_func = None
5052
self.invalidation_push_handler_func = None
5153
self._hiredis_PushNotificationType = None
5254

@@ -141,12 +143,15 @@ def read_response(self, disable_decoding=False, push_request=False):
141143
response, self._hiredis_PushNotificationType
142144
):
143145
response = self.handle_push_response(response)
144-
if not push_request:
145-
return self.read_response(
146-
disable_decoding=disable_decoding, push_request=push_request
147-
)
148-
else:
146+
147+
# if this is a push request return the push response
148+
if push_request:
149149
return response
150+
151+
return self.read_response(
152+
disable_decoding=disable_decoding,
153+
push_request=push_request,
154+
)
150155
return response
151156

152157
if disable_decoding:
@@ -169,12 +174,13 @@ def read_response(self, disable_decoding=False, push_request=False):
169174
response, self._hiredis_PushNotificationType
170175
):
171176
response = self.handle_push_response(response)
172-
if not push_request:
173-
return self.read_response(
174-
disable_decoding=disable_decoding, push_request=push_request
175-
)
176-
else:
177+
if push_request:
177178
return response
179+
return self.read_response(
180+
disable_decoding=disable_decoding,
181+
push_request=push_request,
182+
)
183+
178184
elif (
179185
isinstance(response, list)
180186
and response

‎redis/_parsers/resp3.py‎

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ class _RESP3Parser(_RESPBase, PushNotificationsParser):
1818
def __init__(self, socket_read_size):
1919
super().__init__(socket_read_size)
2020
self.pubsub_push_handler_func = self.handle_pubsub_push_response
21+
self.node_moving_push_handler_func = None
22+
self.maintenance_push_handler_func = None
2123
self.invalidation_push_handler_func = None
2224

2325
def handle_pubsub_push_response(self, response):
@@ -117,17 +119,21 @@ def _read_response(self, disable_decoding=False, push_request=False):
117119
for _ in range(int(response))
118120
]
119121
response = self.handle_push_response(response)
120-
if not push_request:
121-
return self._read_response(
122-
disable_decoding=disable_decoding, push_request=push_request
123-
)
124-
else:
122+
123+
# if this is a push request return the push response
124+
if push_request:
125125
return response
126+
127+
return self._read_response(
128+
disable_decoding=disable_decoding,
129+
push_request=push_request,
130+
)
126131
else:
127132
raise InvalidResponse(f"Protocol Error: {raw!r}")
128133

129134
if isinstance(response, bytes) and disable_decoding is False:
130135
response = self.encoder.decode(response)
136+
131137
return response
132138

133139

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /