Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit d418b62

Browse files
added server health checking
1 parent 85017b6 commit d418b62

File tree

2 files changed

+86
-4
lines changed

2 files changed

+86
-4
lines changed

‎src/main/main.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from src.management_api.routes import admin_app
3030
from src.prefetch import prefetch_endpoints
3131
from src.ratelimit import ip_rate_limits, RateLimit
32-
from src.requests import requester
32+
from src.requests import requester, ServerMonitor
3333
from src.utils.my_logger import init_logger
3434
from src.utils.utils import is_development
3535

@@ -340,9 +340,14 @@ async def compare_tokens():
340340
return response
341341

342342

343+
#######################################################################################################################
344+
345+
343346
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
344347
# # # # # # # # # # # # # # STARTUP EVENTS
345348
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
349+
remote_servers = ServerMonitor()
350+
346351

347352
# On Start Up Run the following Tasks
348353
@app.on_event('startup')
@@ -390,18 +395,25 @@ async def backup_cf_firewall_data():
390395

391396
async def clean_up_memcache():
392397
while True:
393-
# This cleans up the cache every ten minutes
398+
# This cleans up the cache every 30 minutes
394399
total_cleaned = await redis_cache.memcache_ttl_cleaner()
395400
app_logger.info(f"Cleaned Up {total_cleaned} Expired Mem Cache Values")
396401
await asyncio.sleep(delay=60 * 30)
397402

403+
async def monitor_servers():
404+
"""will prioritize servers which are responsive and also available"""
405+
while True:
406+
await remote_servers.sort_api_servers_by_health()
407+
await asyncio.sleep(delay=60 * 5)
408+
398409
asyncio.create_task(setup_cf_firewall())
399410
asyncio.create_task(backup_cf_firewall_data())
400411
asyncio.create_task(update_api_keys_background_task())
401412
# asyncio.create_task(prefetch())
402413
asyncio.create_task(process_credit_queue())
403414
asyncio.create_task(email_process.process_message_queues())
404415
asyncio.create_task(clean_up_memcache())
416+
asyncio.create_task(monitor_servers())
405417

406418

407419
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
@@ -437,7 +449,7 @@ async def v1_gateway(request: Request, path: str):
437449
_path = f"/api/v1/{path}"
438450
await create_take_credit_args(api_key=api_key, path=_path)
439451

440-
api_urls = [f'{api_server_url}/api/v1/{path}' for api_server_url in api_server_urls]
452+
api_urls = [f'{api_server_url}/api/v1/{path}' for api_server_url in remote_servers.healthy_server_urls]
441453

442454
# Will Take at least six second on the cache if it finds nothing will return None
443455
# need an improved get timeout for the articles
@@ -450,7 +462,7 @@ async def v1_gateway(request: Request, path: str):
450462
return JSONResponse(content=response, status_code=200, headers={"Content-Type": "application/json"})
451463

452464
app_logger.info(msg="All cached responses not found- Must Be a Slow Day")
453-
for api_url in api_urls:
465+
for api_url in remote_servers.healthy_server_urls:
454466
try:
455467
# 5 minutes timeout on resource fetching from backend - some resources may take very long
456468
response = await requester(api_url=api_url, timeout=9600)

‎src/requests/__init__.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import asyncio
2+
import time
3+
14
import httpx
25
from src.config import config_instance
36
from src.utils.my_logger import init_logger
@@ -48,3 +51,70 @@ async def requester(api_url: str, timeout: int = 30):
4851
raise err
4952

5053
return response.json() if response.headers.get('Content-Type') == "application/json" else None
54+
55+
56+
class ServerMonitor:
57+
58+
def __init__(self):
59+
self.response_time_thresh_hold: int = 900 # 900 milliseconds
60+
self.healthy_server_urls: list[str] = config_instance().API_SERVERS.SERVERS_LIST.split(",")
61+
# Define the health check endpoint for each server
62+
self._server_monitor_endpoint = '/_ah/warmup'
63+
64+
# Define a function to check the health of each server
65+
async def check_health(self, api_url: str) -> tuple[str, bool]:
66+
# Send a GET request to the health check endpoint
67+
health_check_url = f"{api_url}{self._server_monitor_endpoint}"
68+
request_logger.info(f"Server Health Probe: {health_check_url}")
69+
try:
70+
async with httpx.AsyncClient() as client:
71+
response = await async_client.get(url=health_check_url)
72+
if response.status_code == 200:
73+
request_logger.info(f"server still healthy : {api_url}")
74+
return api_url, True
75+
else:
76+
request_logger.info(f"server not healthy : {api_url}")
77+
request_logger.info(f"Response : {response.text}")
78+
return api_url, False
79+
80+
except (ConnectionError, TimeoutError):
81+
return api_url, False
82+
except httpx.HTTPError as http_err:
83+
return api_url, False
84+
85+
# Sort the healthy servers by their response time
86+
async def measure_response_time(self, api_url: str) -> tuple[str, float | None]:
87+
try:
88+
check_url: str = f"{api_url}{self._server_monitor_endpoint}"
89+
request_logger.info(f"Server Health Probe: {check_url}")
90+
async with httpx.AsyncClient() as client:
91+
start_time = time.perf_counter()
92+
response = await async_client.get(url=check_url)
93+
if response.status_code == 200:
94+
elapsed_time = int((time.perf_counter() - start_time) * 1000)
95+
request_logger.info(f"server : {api_url} latency : {elapsed_time}")
96+
return api_url, elapsed_time
97+
else:
98+
request_logger.info(f"server : {api_url} Not healthy")
99+
request_logger.info(f"Response : {response.text}")
100+
return api_url, None
101+
102+
except (ConnectionError, TimeoutError):
103+
return api_url, None
104+
except httpx.HTTPError as http_err:
105+
return api_url, None
106+
107+
async def sort_api_servers_by_health(self) -> None:
108+
# Check the health of each server asynchronously
109+
tasks = [self.check_health(api_url) for api_url in config_instance().API_SERVERS.SERVERS_LIST.split(",")]
110+
health_results = await asyncio.gather(*tasks)
111+
112+
# Filter out the unhealthy servers
113+
healthy_api_urls = [api_url for api_url, is_healthy in health_results if is_healthy]
114+
115+
tasks = [self.measure_response_time(api_url) for api_url in healthy_api_urls]
116+
response_time_results = await asyncio.gather(*tasks)
117+
sorted_response_times = sorted(response_time_results, key=lambda x: x[1])
118+
within_threshold = [api_url for api_url, response_time in sorted_response_times if response_time < self.response_time_thresh_hold]
119+
self.healthy_server_urls = within_threshold if within_threshold else [config_instance().API_SERVERS.SLAVE_API_SERVER]
120+

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /