Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 4a5d365

Browse files
Add connection_holder_class parameter to Pool for customizable connection management
1 parent 5b14653 commit 4a5d365

File tree

1 file changed

+49
-25
lines changed

1 file changed

+49
-25
lines changed

‎asyncpg/pool.py

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from __future__ import annotations
88

99
import asyncio
10+
import typing
1011
from collections.abc import Awaitable, Callable
1112
import functools
1213
import inspect
@@ -341,7 +342,7 @@ class Pool:
341342
'_queue', '_loop', '_minsize', '_maxsize',
342343
'_init', '_connect', '_reset', '_connect_args', '_connect_kwargs',
343344
'_holders', '_initialized', '_initializing', '_closing',
344-
'_closed', '_connection__record_class', '_generation',
345+
'_closed', '_connection_class', '_connection_holder_class', '_record_class', '_generation',
345346
'_setup', '_max_queries', '_max_inactive_connection_lifetime'
346347
)
347348

@@ -356,6 +357,7 @@ def __init__(self, *connect_args,
356357
reset=None,
357358
loop,
358359
connection_class,
360+
connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
359361
record_class,
360362
**connect_kwargs):
361363

@@ -408,6 +410,7 @@ def __init__(self, *connect_args,
408410
self._queue = None
409411

410412
self._connection_class = connection_class
413+
self._connection_holder_class = connection_holder_class
411414
self._record_class = record_class
412415

413416
self._closing = False
@@ -443,37 +446,48 @@ async def _async__init__(self):
443446
self._initialized = True
444447

445448
async def _initialize(self):
449+
self._initialize_connections_queue()
450+
if self._minsize:
451+
await self._initialize_connections()
452+
453+
def _initialize_connections_queue(self) -> None:
446454
self._queue = asyncio.LifoQueue(maxsize=self._maxsize)
447455
for _ in range(self._maxsize):
448-
ch = PoolConnectionHolder(
449-
self,
456+
ch = self._connection_holder_class(
457+
pool=self,
458+
setup=self._setup,
450459
max_queries=self._max_queries,
451460
max_inactive_time=self._max_inactive_connection_lifetime,
452-
setup=self._setup)
453-
461+
)
454462
self._holders.append(ch)
455463
self._queue.put_nowait(ch)
456464

457-
if self._minsize:
458-
# Since we use a LIFO queue, the first items in the queue will be
459-
# the last ones in `self._holders`. We want to pre-connect the
460-
# first few connections in the queue, therefore we want to walk
461-
# `self._holders` in reverse.
462-
463-
# Connect the first connection holder in the queue so that
464-
# any connection issues are visible early.
465-
first_ch = self._holders[-1] # type: PoolConnectionHolder
466-
await first_ch.connect()
467-
468-
if self._minsize > 1:
469-
connect_tasks = []
470-
for i, ch in enumerate(reversed(self._holders[:-1])):
471-
# `minsize - 1` because we already have first_ch
472-
if i >= self._minsize - 1:
473-
break
474-
connect_tasks.append(ch.connect())
475-
476-
await asyncio.gather(*connect_tasks)
465+
466+
async def _initialize_connections(self) -> None:
467+
468+
if not self._minsize:
469+
raise exceptions.InterfaceError(
470+
'pool is already initialized with min_size > 0')
471+
472+
# Since we use a LIFO queue, the first items in the queue will be
473+
# the last ones in `self._holders`. We want to pre-connect the
474+
# first few connections in the queue, therefore we want to walk
475+
# `self._holders` in reverse.
476+
477+
# Connect the first connection holder in the queue so that
478+
# any connection issues are visible early.
479+
first_ch = self._holders[-1] # type: PoolConnectionHolder
480+
await first_ch.connect()
481+
482+
if self._minsize > 1:
483+
connect_tasks = []
484+
for i, ch in enumerate(reversed(self._holders[:-1])):
485+
# `minsize - 1` because we already have first_ch
486+
if i >= self._minsize - 1:
487+
break
488+
connect_tasks.append(ch.connect())
489+
490+
await asyncio.gather(*connect_tasks)
477491

478492
def is_closing(self):
479493
"""Return ``True`` if the pool is closing or is closed.
@@ -1083,6 +1097,7 @@ def create_pool(dsn=None, *,
10831097
reset=None,
10841098
loop=None,
10851099
connection_class=connection.Connection,
1100+
connection_holder_class: typing.Type[PoolConnectionHolder] = PoolConnectionHolder,
10861101
record_class=protocol.Record,
10871102
**connect_kwargs):
10881103
r"""Create a connection pool.
@@ -1142,6 +1157,11 @@ def create_pool(dsn=None, *,
11421157
The class to use for connections. Must be a subclass of
11431158
:class:`~asyncpg.connection.Connection`.
11441159
1160+
:param PoolConnectionHolder connection_holder_class:
1161+
The class to use for connection holders. This class is used
1162+
to manage the connection lifecycle in the pool. Must be a subclass of
1163+
:class:`~asyncpg.pool.PoolConnectionHolder`
1164+
11451165
:param type record_class:
11461166
If specified, the class to use for records returned by queries on
11471167
the connections in this pool. Must be a subclass of
@@ -1230,10 +1250,14 @@ def create_pool(dsn=None, *,
12301250
12311251
.. versionchanged:: 0.30.0
12321252
Added the *connect* and *reset* parameters.
1253+
1254+
.. versionchanged:: 0.31.0
1255+
Added the *pool_connection_holder_class* parameter.
12331256
"""
12341257
return Pool(
12351258
dsn,
12361259
connection_class=connection_class,
1260+
connection_holder_class=connection_holder_class,
12371261
record_class=record_class,
12381262
min_size=min_size,
12391263
max_size=max_size,

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /