Async Example/Tour
This shows how to use APSW in async node.
asyncio, trio (note), and anyio (note) are supported. A different one is
used for each block, although they all work.
Note
You don’t have to make all connections exclusively sync or async, and can mix and match as needed. SQLite is fast, and in many cases there may not be a benefit to concurrency with async.
#!/usr/bin/env python3 # This code uses Python's optional typing annotations. You can # ignore them and do not need to use them. If you do use them # then you must include this future annotations line first. from__future__import annotations importcontextlib importio importtime importsys frompprintimport pprint importapsw importapsw.aio importapsw.bestpractice importapsw.ext importapsw.fts5 importapsw.shell # all the popular async frameworks are supported importasyncio importanyio importtrio
Basics
Use Connection.as_async() to get an async connection, and
async for to iterate results. We also apply best
practice and use contextlib.aclosing() to
ensure the database is closed.
apsw.bestpractice.apply(apsw.bestpractice.recommended) async defbasics(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): # create some rows await db.execute(""" CREATE TABLE numbers(value, name); INSERT INTO numbers VALUES (1, 'one'), (20, 'twenty'), (3, 'three'), (10, 'ten'); """) # query - note we have to await to get the cursor before # iterating in the for loop async for value, name in await db.execute( "SELECT value, name FROM numbers ORDER BY value DESC" ): print(f"{value=}{name=}") # .get is great if you expect only a single value or # row. Lets get the number of registered functions count = await ( await db.execute( "SELECT COUNT(DISTINCT(name)) FROM pragma_function_list" ) ).get print(f"There are {count} functions") # a pragma print(f"journal_mode={awaitdb.pragma('journal_mode')}") # You should always use a transaction - use async with async with db: await db.execute("INSERT INTO numbers VALUES(7, 'seven')") # nested transactions are supported via savepoints async with db: await db.execute("DROP TABLE numbers") # any exception in the async with block # will rollback that block, while successful # completion commits the changes asyncio.run(basics())
value=20 name='twenty' value=10 name='ten' value=3 name='three' value=1 name='one' There are 174 functions journal_mode=memory
Functions and callbacks
Any function/callback can be sync or async.
# an async scalar function async defa_add(one, two): print("async scalar called") return one + two # and a sync scalar function defs_add(one, two): print("sync scalar called") return one + two # how about an async update hook? async defmy_hook(op: int, dbname: str, table: str, rowid: int): print(f"update {op=}{dbname=}{table=}{rowid=}") async defcallbacks(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.create_scalar_function("a_add", a_add) await db.create_scalar_function("s_add", s_add) # use them both in the same query. we need to await execute # to get the cursor, and then await the fetchall on the # cursor. print( await ( await db.execute("SELECT a_add(1, 2), s_add(3, 4)") ).fetchall() ) # A regular callback await db.set_update_hook(my_hook) await db.execute( "CREATE TABLE x(y); INSERT INTO x VALUES(42)" ) # use anyio this time anyio.run(callbacks)
async scalar called sync scalar called [(3, 7)] update op=18 dbname='main' table='x' rowid=1
Cancellation
Often a group of tasks are run at the same time. The frameworks provide a way to group tasks, wait until all are complete, and if any fail, then cancel uncompleted ones, and raise the resulting exceptions.
See also
except * and
ExceptionGroup- Python syntax for catching multiple exceptions such as from a group of tasks
This example shows asyncio, but the principles are the same across all the frameworks.
async defcancellation(): # this will block SQLite for the sleep duration async defsleep(duration): await asyncio.sleep(duration) return duration async defdeliberate_error(): 1 / 0 return 3 db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.create_scalar_function("sleep", sleep) start = time.monotonic() # create some tasks in a task group that will all # run simultaneously try: async with asyncio.TaskGroup() as tg: # this query will sleep for an hour task1 = tg.create_task( db.execute("SELECT sleep(3600)") ) # this query will only run after the hour sleep query # finishes because we can only do one SQLite query at # a time task2 = tg.create_task( db.execute("SELECT * FROM sqlite_schema") ) # this will also sleep for an hour task3 = tg.create_task(asyncio.sleep(3600)) # this will have an error task4 = tg.create_task(deliberate_error()) # the TaskGroup with block will now run all the tasks # to completion before exiting the block # note the * after except which is how you do exception groups except* ZeroDivisionError: print( f"got zero division error after {time.monotonic()-start:.6f} seconds" ) # Lets see what happened to all the tasks. Note how they are # all done (complete) and how all except the deliberate error # got cancelled. print(f"{task1.done()=}{task1.cancelled()=}") print(f"{task2.done()=}{task2.cancelled()=}") print(f"{task3.done()=}{task3.cancelled()=}") print(f"{task4.done()=}{task4.cancelled()=}") # Lets verify SQLite is not still waiting for an hour start = time.monotonic() functions = await ( await db.execute( "SELECT COUNT(*) FROM pragma_function_list" ) ).get print( f"After {time.monotonic()-start:.6f} seconds, " f"there are {functions} registered SQLite functions" ) asyncio.run(cancellation())
got zero division error after 0.000128 seconds task1.done()=True task1.cancelled()=True task2.done()=True task2.cancelled()=True task3.done()=True task3.cancelled()=True task4.done()=True task4.cancelled()=False After 0.000198 seconds, there are 212 registered SQLite functions
Timeouts
This demonstrates timeouts for both async and sync code. The sync
SQL is the outlandish fractal
but with the 28 changed to 800_000` and would take days to
run to completion.
The deadline for async functions is enforced by the async event loop
and tends to be accurate. The deadline for sync functions is based
on SQLite periodically calling the progress handler.
There is a dedicated apsw.aio.deadline which takes priority
for all frameworks, For trio (note) and anyio (note) their native timeouts are
also supported if apsw.aio.deadline has not been set. The
deadline documentation has more details on setting
deadlines for each framework, getting their current time, and
exceptions raised on timeout.
See also
# The query is not reproduced here but is used when running this # example. fractal_sql = "outlandish fractal" async deftimeouts(): async defsleep(amount): await trio.sleep(amount) return 42 db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.create_scalar_function("sleep", sleep) try: # This will work with every framework. Half a second from now. with apsw.aio.contextvar_set( apsw.aio.deadline, trio.current_time() + 0.5 ): start = trio.current_time() await ( await db.execute("SELECT sleep(3600)") ).fetchall() except trio.TooSlowError: end = trio.current_time() print( f"Got async function TooSlowError after {end-start:.6f} seconds" ) # With trio and anyio we can use the native framework timeouts try: with trio.fail_after(0.5): await (await db.execute(fractal_sql)).fetchall() except trio.TooSlowError: end = trio.current_time() print( f"Got sync function TooSlowError after {end-start:.6f} seconds" ) trio.run(timeouts)
Got async function TooSlowError after 0.501438 seconds Got sync function TooSlowError after 1.001749 seconds
Worker thread
Async connections work by running the SQLite operations in a
dedicated background thread. You can also run your own code there
which is especially useful if it does many calls before returning a
final result. Use Connection.async_run().
Examples shown include Managing and updating your schema and getting a text dump.
In the worker thread, the connection is a regular sync connection.
defschema_upgrade(db: apsw.Connection): # The user_version is a great way of tracking and upgrading the # schema. Because this is run in the worker thread it is the # normal sync approach. # Do everything in a single transactions with db: # database fresh state if db.pragma("user_version") == 0: db.execute(""" CREATE TABLE products(id, name, sku, price); CREATE TABLE orders(id, product_id, quantity); pragma user_version = 1; """) if db.pragma("user_version") == 1: db.execute(""" ALTER TABLE products ADD COLUMN description; CREATE INDEX orders_idx ON orders(id, product_id); pragma user_version = 2; """) # we could exception here if user_version > 2 because it means # a more recent schema is present than this code understands. # Perhaps a version downgrade happened? async defworker_thread(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): # Do the upgrade await db.async_run(schema_upgrade, db) # This is doing one operation await db.async_run( db.execute, "INSERT INTO products(id, name) VALUES(?,?)", (37, "Banana"), ) # Getting a result rows = await db.async_run( lambda: db.execute("SELECT COUNT(*) FROM products").get ) print(f"{rows=}") # get a dump - to a memory file here, but you'd want to use # a real file out = io.StringIO() await db.async_run( lambda: apsw.shell.Shell( db=db, stdout=out ).process_command(".dump") ) dump = out.getvalue() print(f"Dump is {len(dump)} chars starting {repr(dump):.40}") # Some stuff from apsw.ext usage = await db.async_run(apsw.ext.analyze_pages, db, 2) details = await db.async_run( apsw.ext.query_info, db, "SELECT * FROM sqlite_schema" ) anyio.run(worker_thread)
rows=1 Dump is 747 chars starting "-- SQLite dump (by APSW 3.53.3.0)\n-- S
Virtual tables
Virtual tables are a very good match for
async, especially if networking is involved. You can do your own
full implementation using sync and async methods
as needed, but will find it easier to start with
apsw.ext.make_virtual_module() that turns any Python function
(sync or async) in a virtual table accepting positional and keyword
arguments. The sync example is here
with the async below being pretty much the same thing.
async defdata_table(flags, server="example.com"): # one positional and one keyword argument. note you can't change # the number of columns or their order based on the arguments but # you can change what rows are returned and their contents print(f"data_table called with {flags=} and {server=}") # you would do real work here - we just provide some books yield ("The Great Gatsby", 1925, 9.2) yield ("To Kill a Mockingbird", 1960, 9.5) yield ("1984", 1949, 9.8) yield ("The Catcher in the Rye", 1951, 8.4) yield ("The Hobbit", 1937, 9.6) # Tell make_virtual_module about the columns data_table.columns = ("title", "year", "review") # ... and how to extract them from each row data_table.column_access = apsw.ext.VTColumnAccess.By_Index async defvirtual_tables(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await apsw.ext.make_virtual_module(db, "books", data_table) # regular query async for row in await db.execute( "SELECT * FROM books WHERE flags=94 AND server=?", ("example2.com",), ): print(row) # SQLite will figure out the review and order work async for row in await db.execute( "SELECT * FROM books WHERE server=? AND flags=?AND review > 9.55 " " ORDER BY year DESC", ("orange", -2), ): print(row) trio.run(virtual_tables)
data_table called with flags=94 and server='example2.com' ('The Great Gatsby', 1925, 9.2) ('To Kill a Mockingbird', 1960, 9.5) ('1984', 1949, 9.8) ('The Catcher in the Rye', 1951, 8.4) ('The Hobbit', 1937, 9.6) data_table called with flags=-2 and server='orange' ('1984', 1949, 9.8) ('The Hobbit', 1937, 9.6)
Tracing in a block
This is the same as sync tracing in a block
adapted to use async with for apsw.ext.Trace and
transaction control.
async deftracing(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): # Use None instead of stdout and no information is printed or gathered async with apsw.ext.Trace( sys.stdout, db=db, vtable=True, updates=True, transaction=True, ): # APSW does a savepoint behind the scenes to wrap the block async with db: # Some regular SQL await db.execute("create table multi(x)") # executemany runs the same statement repeatedly await db.executemany( "insert into multi values(?)", ((x,) for x in range(5)), ) # See how many rows were processed await ( await db.execute("select * from multi limit 2") ).fetchall() # You can also see how many rows were changed await db.execute("delete from multi where x < 4") # pragma functions are virtual tables - see how many rows this processes even # though only one has 'pow' await ( await db.execute( "SELECT narg FROM pragma_function_list WHERE name='pow'" ) ).get # trigger that causes rollback await db.execute(""" create trigger error after insert on multi begin update multi set rowid=100+new.rowid where rowid=new.rowid; select raise(rollback, 'nope'); end; """) with contextlib.suppress(apsw.ConstraintError): await db.execute("insert into multi values(54)") asyncio.run(tracing())
> BEGIN DEFERRED Time: 0.000 !BEGIN > create table multi(x) Time: 0.000 > insert into multi values(?) INS 1 (0) Time: 0.000 Changes: 1 > insert into multi values(?) INS 2 (1) Time: 0.000 Changes: 1 > insert into multi values(?) INS 3 (2) Time: 0.000 Changes: 1 > insert into multi values(?) INS 4 (3) Time: 0.000 Changes: 1 > insert into multi values(?) INS 5 (4) Time: 0.000 Changes: 1 > select * from multi limit 2 Time: 0.000 Rows: 3 > delete from multi where x < 4 DEL 1 (0) DEL 2 (1) DEL 3 (2) DEL 4 (3) Time: 0.000 Changes: 4 > COMMIT !COMMIT Time: 0.000 > SELECT narg FROM pragma_function_list WHERE name='pow' V PRAGMA function_list Time: 0.000 Rows: 211 VmStep: 1,481 Mem: 76.8KB < SELECT narg FROM pragma_function_list WHERE name='pow' Time: 0.000 Rows: 1 VmStep: 644 !BEGIN > create trigger error after insert on multi begin ... !COMMIT Time: 0.000 !BEGIN > insert into multi values(54) INS 6 (54) T TRIGGER error UPD 6>106 (...) !ROLLBACK < insert into multi values(54) Time: 0.001 Changes: 1
Resource usage in a block
The async equivalent of the sync example.
# The standard outlandish fractal example which is used when running # but not reproduced here. query = "fractal" async defresource_usage(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): print("thread (async event loop)") async with apsw.ext.ShowResourceUsage( sys.stdout, db=db, scope="thread" ): # some SQLite work await (await db.execute(query)).get # and take some wall clock time await trio.sleep(0.5) print("\nprocess (including background SQLite worker)") async with apsw.ext.ShowResourceUsage( sys.stdout, db=db, scope="process" ): # some SQLite work await (await db.execute(query)).get # and take some wall clock time await trio.sleep(0.5) trio.run(resource_usage)
thread (async event loop) Process CPU consumption 0.025 Wall clock 0.526 Involuntary context switches 6 Voluntary context switches 3 SQLite full table scan 1,365 SQLite sort operations 2 SQLite vm operations 1,015,353 SQLite statements completed 1 SQLite lookaside slots used 11 SQLite allocations using lookaside 17,414 SQLite allocations too big for lookaside 1 SQLite allocations lookaside full 282 SQLite statement memory 17,080 process (including background SQLite worker) Process CPU consumption 0.025 Wall clock 0.526 Involuntary context switches 12 Voluntary context switches 9 CPU time in user mode 0.025 SQLite full table scan 1,365 SQLite sort operations 2 SQLite vm operations 1,015,353 SQLite statements completed 1 SQLite allocations using lookaside 17,272
Blob
Async Blob Input/Output run in the SQLite worker thread. See the sync example which this is a direct translation to async.
async defblob(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.execute("create table blobby(x,y)") # Add a blob we will fill in later await db.execute( "insert into blobby values(1, zeroblob(10000))" ) # Or as a binding await db.execute( "insert into blobby values(2, ?)", (apsw.zeroblob(20000),) ) # Open a blob for writing. We need to know the rowid rowid = await ( await db.execute("select ROWID from blobby where x=1") ).get blob = await db.blob_open("main", "blobby", "y", rowid, True) await blob.write(b"hello world") # seeking is immediate (no await) blob.seek(2000) await blob.read(24) # seek relative to the end blob.seek(-32, 2) await blob.write(b"hello world, again") # it will be automatically closed when the connection is # closed, but explicitly closing chooses transaction # boundaries await blob.aclose() anyio.run(blob)
Backup
Backups run in the SQLite worker thread of the async
destination database. The source can be a sync or async database.
You do backups by getting the backup object from the destination
database telling it about the source using
Connection.backup(),
If the destination is sync and you are working with an async source, you can run the backup in the async source thread as demonstrated below.
async defbackup(): # Setup source and destinations async_source = await apsw.Connection.as_async("") # ... and fill it with a large amount of data await async_source.execute( "CREATE TABLE x(y); INSERT INTO x VALUES(randomblob(250000))" ) sync_source = apsw.Connection("") sync_source.execute( "CREATE TABLE x(y); INSERT INTO x VALUES(randomblob(250000))" ) async_dest = await apsw.Connection.as_async("") sync_dest = apsw.Connection("") print("async destination, async source") async with await async_dest.backup( "main", async_source, "main" ) as backup: while not backup.done: await backup.step(42) print( f"page_count = {backup.page_count} remaining = {backup.remaining}" ) print("async destination, sync source") async with await async_dest.backup( "main", sync_source, "main" ) as backup: while not backup.done: await backup.step(42) print( f"page_count = {backup.page_count} remaining = {backup.remaining}" ) print("sync destination, async source") # we will run this in the async source thread defdo_backup(): with sync_dest.backup("main", async_source, "main") as backup: while not backup.done: backup.step(42) print( f"page_count = {backup.page_count} remaining = {backup.remaining}" ) await async_source.async_run(do_backup) # ensure connections get closed await async_source.aclose() await async_dest.aclose() asyncio.run(backup())
async destination, async source page_count = 63 remaining = 21 page_count = 63 remaining = 0 async destination, sync source page_count = 63 remaining = 21 page_count = 63 remaining = 0 sync destination, async source page_count = 63 remaining = 21 page_count = 63 remaining = 0
Full Text Search
Table accesses the database for virtually all
methods and attributes, so using the worker thread is needed. A subset of the
Full Text Search Example/Tour is shown.
async deffts(): db = await apsw.Connection.as_async("recipes.db") # always close database async with contextlib.aclosing(db): if not await db.table_exists("main", "search"): search_table: apsw.fts5.Table = await db.async_run( apsw.fts5.Table.create, db, "search", content="recipes", columns=None, generate_triggers=True, tokenize=[ "simplify", "casefold", "true", "strip", "true", "strip", "true", "unicodewords", ], ) else: search_table: apsw.fts5.Table = await db.async_run( apsw.fts5.Table, db, "search" ) # property access print( "row_count =", await db.async_run(getattr, search_table, "row_count"), ) # we need to do search processing in the worker thread defsearch_processing(query: str, limit: int): matches = [] for match in search_table.search(query): matches.append(match) if len(matches) >= limit: break return matches for match in await db.async_run( search_processing, "lemon OR guava", 10 ): pprint(match) break print( "First match name is", await db.async_run( search_table.row_by_id, match.rowid, "name" ), ) # query suggestion query = "nyme:(minced OR oyl NOT peenut)" print( query, "=>", await db.async_run(search_table.query_suggest, query), ) asyncio.run(fts())
row_count = 173278 MatchInfo(query_info=QueryInfo(phrases=(('lemon',), ('guava',))), rowid=1642796043941632884, column_size=[3, 24, 5], phrase_columns=[[1], [0, 2]]) First match name is P.L's Guava Jelly nyme:(minced OR oyl NOT peenut) => name: (minced OR oil NOT peanut)
Session
Use apsw.aio.make_session() to create the
Session object in async mode from an async
connection.
async defsession_example(): db = await apsw.Connection.as_async(":memory:") # always close database async with contextlib.aclosing(db): await db.execute("CREATE TABLE x(y PRIMARY KEY, z)") session = await apsw.aio.make_session(db, "main") # We'd like size estimates session.config(apsw.SQLITE_SESSION_OBJCONFIG_SIZE, True) # all tables await session.attach() # add some data await db.executemany( "INSERT INTO x VALUES(?,?)", ((i, "a" * i) for i in range(200)), ) print("Size estimate {session.changeset_size}") changeset = await session.changeset() print(f"Actual size {len(changeset)}") # Other than apply, changeset operations don't use a # Connection so we'll use trio's mechanism to do invert in a # background thread. undo = await trio.to_thread.run_sync( apsw.Changeset.invert, changeset ) # Undo the changes await apsw.Changeset.apply(undo, db) trio.run(session_example)