-
Notifications
You must be signed in to change notification settings - Fork 431
-
For copying data into a table we have 2 streaming options:
- Stream bytes (
copy_to_table
) - Stream records (
copy_records_to_table
, which accepts an async iterable)
But for getting data back from the database, we only have 1 streaming option:
- Stream bytes (
copy_from_table
, which accepts a(bytes) -> Awaitable
coroutine)
I think it would be nice to get 2 convenience features:
- Stream records from the database (currently not possible as far as I can tell, not sure if this is a protocol limitation or not)
- Streaming as async iterables (bytes or records)*
* You can currently do this via queues and tasks, here's something I wrote today for a quick script (i.e. maybe there's a better way)
async def get_records(pool: asyncpg.Pool) -> AsyncIterable[Tuple[str, str]]: send, rcv = anyio.create_memory_object_stream(0, bytes) async def copy() -> None: async with send: await pool.copy_from_query( ... output=send.send, ) def parse_records(data: bytes) -> Iterable[MyRecord]: lines = data.decode().splitlines() for line in lines: ... # could be complicated depending on data async with anyio.create_task_group() as tg: # or equivalent asyncio tasks + gather tg.start_soon(copy) async for data in rcv: for record in parse_records(data): yield record
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 2 comments 1 reply
-
I'm just now finding existing issues (I swear I searched first). It looks like this use case would be well suited by using cursors, I didn't realize asyncpg exposed cursors. I'll report back with what that looks like.
Beta Was this translation helpful? Give feedback.
All reactions
-
Trivial:
async def get_records(pool: asyncpg.Pool) -> AsyncIterable[MyRecord]: conn: asyncpg.Connection async with pool.acquire() as conn: async with conn.transaction(): cur = conn.cursor(... ) async for record in cur: yield record
Great rubber ducking everyone 😄
Beta Was this translation helpful? Give feedback.