-
Notifications
You must be signed in to change notification settings - Fork 431
Retrying in SERIALIZABLE isolation level #1263
andrew222651
started this conversation in
Show and tell
-
The following snippets implement a retry loop for transactions in SERIALIZABLE isolation level to handle serialization anomaly errors. I'd be curious if anyone else has an alternative method.
db.py
:
import json from contextlib import asynccontextmanager from typing import AsyncContextManager, AsyncGenerator import asyncpg from asyncpg.exceptions import SerializationError # create/close this at startup/shutdown db_pool: asyncpg.Pool | None = None async def get_db_conn() -> AsyncGenerator[asyncpg.Connection, None]: assert db_pool is not None async with db_pool.acquire() as conn: await conn.set_type_codec( "json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" ) yield conn get_db_conn_cm = asynccontextmanager(get_db_conn) async def get_db_tx_cms( max_retries: int = 10, ) -> AsyncGenerator[AsyncContextManager[asyncpg.Connection], None]: assert max_retries >= 0 retry_count = 0 failed: bool async with get_db_conn_cm() as conn: @asynccontextmanager async def tx_cm() -> AsyncGenerator[asyncpg.Connection, None]: nonlocal failed async with conn.transaction(): await conn.execute( "set transaction isolation level serializable" ) try: yield conn except SerializationError: failed = True while True: failed = False yield tx_cm() if not failed: break retry_count += 1 if retry_count > max_retries: raise RuntimeError("Repeated serialization anomalies")
app.py
:
import db async for tx_cm in db.get_db_tx_cms(): async with tx_cm as tx: row = await tx.fetchrow( """ select foo from bar where baz = 1ドル """, 1, ) ...
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment