Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

API to stream records from a SELECT query #913

adriangb started this conversation in Ideas
Discussion options

For copying data into a table we have 2 streaming options:

  • Stream bytes (copy_to_table)
  • Stream records (copy_records_to_table, which accepts an async iterable)

But for getting data back from the database, we only have 1 streaming option:

  • Stream bytes (copy_from_table, which accepts a (bytes) -> Awaitable coroutine)

I think it would be nice to get 2 convenience features:

  • Stream records from the database (currently not possible as far as I can tell, not sure if this is a protocol limitation or not)
  • Streaming as async iterables (bytes or records)*

* You can currently do this via queues and tasks, here's something I wrote today for a quick script (i.e. maybe there's a better way)

async def get_records(pool: asyncpg.Pool) -> AsyncIterable[Tuple[str, str]]:
 send, rcv = anyio.create_memory_object_stream(0, bytes)
 async def copy() -> None:
 async with send:
 await pool.copy_from_query(
 ...
 output=send.send,
 )
 def parse_records(data: bytes) -> Iterable[MyRecord]:
 lines = data.decode().splitlines()
 for line in lines:
 ... # could be complicated depending on data
 async with anyio.create_task_group() as tg: # or equivalent asyncio tasks + gather
 tg.start_soon(copy)
 async for data in rcv:
 for record in parse_records(data):
 yield record
You must be logged in to vote

Replies: 2 comments 1 reply

Comment options

I'm just now finding existing issues (I swear I searched first). It looks like this use case would be well suited by using cursors, I didn't realize asyncpg exposed cursors. I'll report back with what that looks like.

You must be logged in to vote
1 reply
Comment options

Trivial:

async def get_records(pool: asyncpg.Pool) -> AsyncIterable[MyRecord]:
 conn: asyncpg.Connection
 async with pool.acquire() as conn:
 async with conn.transaction():
 cur = conn.cursor(... )
 async for record in cur:
 yield record

Great rubber ducking everyone 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Ideas
Labels
None yet
1 participant

AltStyle によって変換されたページ (->オリジナル) /