-
Notifications
You must be signed in to change notification settings - Fork 431
-
I'm using asyncpg
in multi host configuration in order to implement zero downtime Postgres upgrade. I have it implemented for psycopg2
, because we use both in our project currently, but I'm having issues with how connect timeouts are being handled by asyncpg
.
I'm using connect_timeout
option to specify a timeout of 2 seconds and expect the driver to automatically fail over when the first host in the list is unavailable. But instead the failover does not really happen.
I'll try to describe our set up:
- we have two database hosts:
host-old
andhost-new
- before we switch over to the
host-new
we suspend a connection pool - we close any open connection and prevent it from creating any new ones - we setup a firewall (AWS Security Group) to prevent any new connections to
host-old
- we release the connection
It's important to understand that after step 3, the host-old
can still be resolved, but any connection attempt will fail.
Now we set connect_timeout
to 2 seconds and we get the connection timeout regardless. This happens because connect_timeout
seem to be an overall connection timeout, not per host connection timeout.
While debugging this I noticed that when __connect_addr
is called it checks whether timeout is less or equals zero and raises timeout error. And this is exactly what happens in my case, because the connection attempt to host-old
uses up all 2 seconds of timeout, and before we retry to connect to host-new
the code updates the timeout
here and the value becomes a zero or less and next retry attempt with a different host is doomed to fail.
Not sure if this is a desired behaviour or maybe I missed some other configuration to make it work in this case, but from the first looks of it it seems like connect_timeout
should be per host or there needs to be another configuration option?
I've made a reproducer that can be run locally with minor changes:
import asyncio import time import asyncpg from asyncpg import Connection async def connect() -> Connection: return await asyncpg.connect( "postgresql://", user="postgres", password="", host=[ "10.255.255.1", # Non-routable IP address to simulate connect timeout "host-new", ], port=[5432, 5432], database="postgres", server_settings={ "search_path": "public", "statement_timeout": "60000", }, timeout=2, ) async def main(): start = time.time() connection = await connect() end = time.time() # We expect this timeout to be a bit over 2 seconds, # because we need to connect to another host print(end - start) result = await connection.fetchval("SELECT version() as version;") print(result) await connection.close() asyncio.run(main())
Not sure what would be the proper solution here.
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 1 comment
-
To mitigate this we have manually patched the library like this
import asyncio import logging import asyncpg logger = logging.getLogger(__name__) async def _patched_connect(*, loop, timeout, connection_class, record_class, **kwargs): if loop is None: loop = asyncio.get_event_loop() addrs, params, config = asyncpg.connect_utils._parse_connect_arguments(timeout=timeout, **kwargs) # Calculating an overall connection timeout to be the sum of all timeouts for each host total_timeout = len(addrs) * timeout return await asyncpg.compat.wait_for( _patched_connect_internal( addrs=addrs, params=params, config=config, loop=loop, timeout=timeout, connection_class=connection_class, record_class=record_class, ), timeout=total_timeout, ) async def _patched_connect_internal(*, addrs, params, config, loop, timeout, connection_class, record_class): last_error: Exception | None = None for addr in addrs: try: return await asyncpg.connect_utils._connect_addr( addr=addr, loop=loop, timeout=timeout, params=params, config=config, connection_class=connection_class, record_class=record_class, ) except (OSError, asyncio.TimeoutError, ConnectionError) as ex: last_error = ex if last_error: raise last_error asyncpg.connect_utils._connect = _patched_connect
Beta Was this translation helpful? Give feedback.