Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 62bc491

Browse files
added a circuit breaker error manager for redis cache.
1 parent b1e62f3 commit 62bc491

File tree

2 files changed

+64
-11
lines changed

2 files changed

+64
-11
lines changed

‎src/cache/custom.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,50 @@
1818
EXPIRATION_TIME = config_instance().CACHE_SETTINGS.CACHE_DEFAULT_TIMEOUT
1919

2020

21+
class RedisErrorManager:
22+
def __init__(self, use_redis: bool = True):
23+
self.use_redis: bool = use_redis
24+
self._permanent_off = use_redis
25+
26+
self.cache_errors: int = 0
27+
28+
self.error_threshold: int = 10
29+
self.min_error_threshold: int = 5
30+
self.initial_off_time: int = 60
31+
self.max_off_time: int = 3600
32+
self.time_since_last_error: int = 0
33+
34+
async def turn_off_redis(self, off_time: int):
35+
self.use_redis = False
36+
self.time_since_last_error = 0
37+
# additional code to shut down Redis or perform other tasks
38+
if off_time == 0:
39+
self._permanent_off = False
40+
return
41+
42+
await asyncio.sleep(off_time)
43+
44+
async def turn_on_redis(self):
45+
self.use_redis = True
46+
# additional code to initialize Redis or perform other tasks
47+
48+
async def check_error_threshold(self):
49+
if self.cache_errors >= self.error_threshold and self.time_since_last_error <= self.max_off_time:
50+
off_time = self.initial_off_time * 2 ** (self.cache_errors - self.min_error_threshold)
51+
off_time = min(off_time, self.max_off_time)
52+
await self.turn_off_redis(off_time)
53+
elif self.cache_errors < self.min_error_threshold and not self.use_redis:
54+
await self.turn_on_redis()
55+
else:
56+
self.time_since_last_error += 1
57+
58+
async def increment_cache_errors(self):
59+
self.cache_errors += 1
60+
61+
async def can_use_redis(self):
62+
return self.use_redis and self._permanent_off
63+
64+
2165
class Cache:
2266
"""
2367
A class to handle caching of data, both in-memory and in Redis.
@@ -38,12 +82,13 @@ def __init__(self, cache_name: str = "mem", max_size: int = MEM_CACHE_SIZE, expi
3882
self.max_size = max_size
3983
self.expiration_time = expiration_time
4084
self._cache_name = cache_name
85+
self.redis_errors = RedisErrorManager(use_redis=use_redis)
4186
self._cache = {}
4287
self._cache_lock = threading.Lock()
43-
self._use_redis=use_redis
88+
4489
self._logger = init_logger(camel_to_snake(self.__class__.__name__))
4590

46-
if self._use_redis:
91+
if self.redis_errors.use_redis:
4792
redis_host = config_instance().REDIS_CACHE.CACHE_REDIS_HOST
4893
redis_port = config_instance().REDIS_CACHE.CACHE_REDIS_PORT
4994
password = config_instance().REDIS_CACHE.REDIS_PASSWORD
@@ -58,14 +103,11 @@ def __init__(self, cache_name: str = "mem", max_size: int = MEM_CACHE_SIZE, expi
58103
config_instance().DEBUG and self._logger.info("Cache -- Redis connected")
59104
except (ConnectionError, AuthenticationError):
60105
config_instance().DEBUG and self._logger.error(msg="Redis failed to connect....")
61-
self.turn_off_redis()
106+
self.redis_errors.turn_off_redis(off_time=0)
62107

63108
@property
64-
async def can_use_redis(self):
65-
return self._use_redis
66-
67-
async def turn_off_redis(self):
68-
self._use_redis = False
109+
async def can_use_redis(self) -> bool:
110+
return await self.redis_errors.can_use_redis()
69111

70112
async def on_delete(self):
71113
"""
@@ -87,7 +129,7 @@ async def _serialize_value(self, value: Any, default=None) -> str:
87129
except (JSONDecodeError, pickle.PicklingError):
88130
config_instance().DEBUG and self._logger.error(f"Serializer Error")
89131
return default
90-
except TypeErrorase:
132+
except TypeError:
91133
config_instance().DEBUG and self._logger.error(f"Serializer Error")
92134
return default
93135

@@ -142,10 +184,11 @@ async def set(self, key: str, value: Any, ttl: int = 0):
142184
await self._remove_oldest_entry()
143185

144186
try:
145-
if self._use_redis:
187+
if awaitself.redis_errors.can_use_redis():
146188
self._redis_client.set(key, value, ex=exp_time)
147189
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError):
148190
# TODO -- keep a count of redis errors if they pass a thresh-hold then switch-off redis
191+
await self.redis_errors.increment_cache_errors()
149192
pass
150193
try:
151194
await self._set_mem_cache(key=key, value=value, ttl=exp_time)
@@ -187,20 +230,23 @@ async def _async_redis_get(get: Callable, _key: str):
187230
value = await asyncio.wait_for(self._get_memcache(key=key), timeout=timeout)
188231
except (asyncio.TimeoutError, KeyError):
189232
# Timed out waiting for the memcache lookup, or KeyError - as a result of cache eviction
233+
await self.redis_errors.increment_cache_errors()
190234
value = None
191235

192236
# will only try and return a value in redis if memcache value does not exist
193-
if self._use_redis and (value is None):
237+
if awaitself.redis_errors.can_use_redis() and (value is None):
194238
try:
195239
# Wait for the result of the redis lookup with a timeout
196240
redis_get = functools.partial(_async_redis_get, get=self._redis_client.get)
197241
value = await asyncio.wait_for(redis_get(_key=key), timeout=timeout)
198242
except (redis.exceptions.TimeoutError, asyncio.TimeoutError):
199243
# Timed out waiting for the redis lookup
200244
config_instance().DEBUG and self._logger.error("Timeout Error Reading from redis")
245+
await self.redis_errors.increment_cache_errors()
201246
value = None
202247
except redis.exceptions.ConnectionError:
203248
config_instance().DEBUG and self._logger.error("ConnectionError Reading from redis")
249+
await self.redis_errors.increment_cache_errors()
204250
value = None
205251

206252
return await self._deserialize_value(value, value) if value else None

‎src/main/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,12 @@ async def clean_up_memcache():
400400
app_logger.info(f"Cleaned Up {total_cleaned} Expired Mem Cache Values")
401401
await asyncio.sleep(delay=60 * 30)
402402

403+
async def check_redis_errors():
404+
"""will check if redis is making too many errors then switch redis off if it's the case"""
405+
while True:
406+
await redis_cache.redis_errors.check_error_threshold()
407+
await asyncio.sleep(delay=60*10)
408+
403409
async def monitor_servers():
404410
"""will prioritize servers which are responsive and also available"""
405411
while True:
@@ -414,6 +420,7 @@ async def monitor_servers():
414420
asyncio.create_task(email_process.process_message_queues())
415421
asyncio.create_task(clean_up_memcache())
416422
asyncio.create_task(monitor_servers())
423+
asyncio.create_task(check_redis_errors())
417424

418425

419426
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /