Example/Tour

This code demonstrates usage of APSW. It gives you a good overview of all the things that can be done. Also included is output so you can see what gets printed when you run the code.

There are also specific examples in the classes, functions, and attribute documentation.

#!/usr/bin/env python3
# This code uses Python's optional typing annotations. You can
# ignore them and do not need to use them. If you do use them
# then you must include this future annotations line first.
from__future__import annotations
fromtypingimport Optional, Iterator, Any
importos
importsys
importtime
importdatetime
importapsw
importapsw.ext
importrandom
importre
importcontextlib
frompathlibimport Path
# pretty formatting
frompprintimport pprint

Checking APSW and SQLite versions

# Where the extension module is on the filesystem
print(" Using APSW file", apsw.__file__)
# From the extension
print(" APSW version", apsw.apsw_version())
# From the sqlite header file at APSW compile time
print("SQLite header version", apsw.SQLITE_VERSION_NUMBER)
# The SQLite code running
print(" SQLite lib version", apsw.sqlite_lib_version())
# If True then SQLite is incorporated into the extension.
# If False then a shared library is being used, or static linking
print(" Using amalgamation", apsw.using_amalgamation)
 Using APSW file /space/apsw/apsw/__init__.cpython-314-x86_64-linux-gnu.so
 APSW version 3.51.1.0
SQLite header version 3051001
 SQLite lib version 3.51.1
 Using amalgamation True

Best Practice

Ensure SQLite usage prevents common mistakes, and gets best performance via apsw.bestpractice

importapsw.bestpractice
apsw.bestpractice.apply(apsw.bestpractice.recommended)

Logging

It is a good idea to get SQLite’s logs as you will get more information about errors. Best practice also includes this. apsw.ext.log_sqlite() forwards SQLite’s log messages to the logging module.

apsw.ext.log_sqlite()
# You can also write to SQLite's log
apsw.log(apsw.SQLITE_ERROR, "A message from Python")

Opening the database

You open the database by using Connection

# Default will create the database if it doesn't exist
connection = apsw.Connection("dbfile")
# Open existing read-only
connection = apsw.Connection(
 "dbfile", flags=apsw.SQLITE_OPEN_READONLY
)
# Open existing read-write (exception if it doesn't exist)
connection = apsw.Connection(
 "dbfile", flags=apsw.SQLITE_OPEN_READWRITE
)

Executing SQL

Use Connection.execute() to execute SQL

connection.execute("create table point(x,y,z)")
connection.execute("insert into point values(1, 2, 3)")
# You can use multiple ; separated statements
connection.execute(
"""
 insert into point values(4, 5, 6);
 create table log(timestamp, event);
 create table foo(a, b, c);
 create table important(secret, data);
"""
)
# read rows
for row in connection.execute("select * from point"):
 print(row)
(1, 2, 3)
(4, 5, 6)

Why you use bindings to provide values

It is tempting to compose strings with the values in them, but it is easy to mangle the query especially if values contain punctuation and unicode. It is known as SQL injection. Bindings are the correct way to supply values to queries.

# a simple value
event = "system started"
# DO NOT DO THIS
query = f"insert into log values(0, '{event}')"
print("query:", query)
# BECAUSE ... a bad guy could provide a value like this
event = "bad guy here') ; drop table important; -- comment"
# which has effects like this
query = f"insert into log values(0, '{event}')"
print("bad guy:", query)
query: insert into log values(0, 'system started')
bad guy: insert into log values(0, 'bad guy here') ; drop table important; -- comment')

Bindings (sequence)

Bindings can be provided as a sequence such as with a tuple or list. Use ? to show where the values go.

query = "insert into log values(?, ?)"
data = (7, "transmission started")
connection.execute(query, data)
# You can also use numbers after the ? to select
# values from the sequence. Note that numbering
# starts at 1
query = "select ?1, ?3, ?2"
data = ("alpha", "beta", "gamma")
for row in connection.execute(query, data):
 print(row)
('alpha', 'gamma', 'beta')

Bindings (dict)

You can also supply bindings with a dictionary. Use :NAME, @NAME, or $NAME, to provide the key name in the query. Names are case sensitive.

query = "insert into point values(:x, @Y, $z)"
data = {"x": 7, "Y": 8, "z": 9}
connection.execute(query, data)

Transactions

By default each statement is its own transaction. A transaction finishes by flushing data to storage and waiting for the operating system to confirm it is permanently there (ie will survive a power failure) which takes a while.

# 3 separate transactions
connection.execute("insert into point values(2, 2, 2)")
connection.execute("insert into point values(3, 3, 3)")
connection.execute("insert into point values(4, 4, 4)")
# You can use BEGIN / COMMIT to manually make a transaction
connection.execute("BEGIN")
connection.execute("insert into point values(2, 2, 2)")
connection.execute("insert into point values(3, 3, 3)")
connection.execute("insert into point values(4, 4, 4)")
connection.execute("COMMIT")
# Or use `with` that does it automatically
with connection:
 connection.execute("insert into point values(2, 2, 2)")
 connection.execute("insert into point values(3, 3, 3)")
 connection.execute("insert into point values(4, 4, 4)")
# Nested transactions are supported
with connection:
 connection.execute("insert into point values(2, 2, 2)")
 with connection:
 connection.execute("insert into point values(3, 3, 3)")
 connection.execute("insert into point values(4, 4, 4)")

executemany

You can execute the same SQL against a sequence using Connection.executemany()

data = (
 (1, 1, 1),
 (2, 2, 2),
 (3, 3, 3),
 (4, 4, 4),
 (5, 5, 5),
)
query = "insert into point values(?,?,?)"
# we do it in a transaction
with connection:
 # the query is run for each item in data
 connection.executemany(query, data)

Pragmas

SQLite has a wide variety of pragmas to control the database configuration and library behaviour. See the Tips for maintaining your schema.

# WAL mode is good for write performance
connection.pragma("journal_mode", "wal")
# Foreign keys are off by default, so turn them on
connection.pragma("foreign_keys", True)
# You can use this to see if any other connection (including other processes) has
# changed the database
connection.pragma("data_version")
# Useful at startup to detect some database corruption
check = connection.pragma("integrity_check")
if check != "ok":
 print("Integrity check errors", check)

Tracing execution

You can trace execution of SQL statements and their bindings. This involves code changes and is described in more detail here.

There are simpler convenient mechanisms for individual statement tracing, summarising a block of code, and SQLite’s interface which is used by them.

defmy_tracer(
 cursor: apsw.Cursor,
 statement: str,
 bindings: Optional[apsw.Bindings],
) -> bool:
 "Called just before executing each statement"
 print("SQL:", statement.strip())
 print("Bindings:", bindings)
 return True # if you return False then execution is aborted
# you can trace a single cursor
cursor = connection.cursor()
cursor.exec_trace = my_tracer
cursor.execute(
"""
 drop table if exists bar;
 create table bar(x,y,z);
 select * from point where x=?;
 """,
 (3,),
)
# if set on a connection then all cursors are traced
connection.exec_trace = my_tracer
# and clearing it
connection.exec_trace = None
SQL: drop table if exists bar;
Bindings: ()
SQL: create table bar(x,y,z);
Bindings: ()
SQL: select * from point where x=?;
Bindings: (3,)

Tracing returned rows

You can trace returned rows, including modifying what is returned or skipping it completely. See more about tracing.

defrow_tracer(
 cursor: apsw.Cursor, row: apsw.SQLiteValues
) -> apsw.SQLiteValues:
"""Called with each row of results before they are handed off. You can return None to
 cause the row to be skipped or a different set of values to return"""
 print("Row:", row)
 return row
# you can trace a single cursor
cursor = connection.cursor()
cursor.row_trace = row_tracer
for row in cursor.execute("select x,y from point where x>4"):
 pass
# if set on a connection then all cursors are traced
connection.row_trace = row_tracer
# and clearing it
connection.row_trace = None
Row: (7, 8)
Row: (5, 5)

Defining scalar functions

Scalar functions take one or more values and return one value. They are registered by calling Connection.create_scalar_function().

defilove7(*args: apsw.SQLiteValue) -> int:
 "A scalar function"
 print(f"ilove7 got {args} but I love 7")
 return 7
connection.create_scalar_function("seven", ilove7)
for row in connection.execute(
 "select seven(x,y) from point where x>4"
):
 print("row", row)
ilove7 got (7, 8) but I love 7
row (7,)
ilove7 got (5, 5) but I love 7
row (7,)

Defining aggregate functions

Aggregate functions are called multiple times with matching rows, and then provide a final value. An example is calculating an average. They are registered by calling Connection.create_aggregate_function().

classlongest:
 # Find which value when represented as a string is
 # the longest
 def__init__(self) -> None:
 self.longest = ""
 defstep(self, *args: apsw.SQLiteValue) -> None:
 # Called with each matching row
 for arg in args:
 if len(str(arg)) > len(self.longest):
 self.longest = str(arg)
 deffinal(self) -> str:
 # Called at the very end
 return self.longest
connection.create_aggregate_function("longest", longest)
print(connection.execute("select longest(event) from log").get)
transmission started

Defining window functions

Window functions input values come from a "window" around a row of interest. Four methods are called as the window moves to add, remove, get the current value, and finalize.

An example is calculating an average of values in the window to compare to the row. They are registered by calling Connection.create_window_function().

This is the Python equivalent to the C based example in the SQLite documentation

classSumInt:
 def__init__(self):
 self.v = 0
 defstep(self, arg):
 print("step", arg)
 self.v += arg
 definverse(self, arg):
 print("inverse", arg)
 self.v -= arg
 deffinal(self):
 print("final", self.v)
 return self.v
 defvalue(self):
 print("value", self.v)
 return self.v
connection.create_window_function("sumint", SumInt)
for row in connection.execute(
"""
 CREATE TABLE t3(x, y);
 INSERT INTO t3 VALUES('a', 4),
 ('b', 5),
 ('c', 3),
 ('d', 8),
 ('e', 1);
 -- Use the window function
 SELECT x, sumint(y) OVER (
 ORDER BY x ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
 ) AS sum_y
 FROM t3 ORDER BY x;
 """
):
 print("ROW", row)
step 4
step 5
value 9
ROW ('a', 9)
step 3
value 12
ROW ('b', 12)
inverse 4
step 8
value 16
ROW ('c', 16)
inverse 5
step 1
value 12
ROW ('d', 12)
inverse 3
value 9
ROW ('e', 9)
final 9

Defining collations (sorting)

How you sort can depend on the languages or values involved. You register a collation by calling Connection.create_collation().

# This example sorting mechanisms understands some text followed by a
# number and ensures the number portion gets sorted correctly
connection.execute("create table names(name)")
connection.executemany(
 "insert into names values(?)",
 (
 ("file1",),
 ("file7",),
 ("file17",),
 ("file20",),
 ("file3",),
 ),
)
print("Standard sorting")
for row in connection.execute("select * from names order by name"):
 print(row)
defstr_num_collate(
 s1: apsw.SQLiteValue, s2: apsw.SQLiteValue
) -> int:
 # return -1 if s1<s2, +1 if s1>s2 else 0 for equal
 defparts(s: str) -> list:
 "Converts str into list of alternating str and int parts"
 return [
 int(v) if v.isdigit() else v
 for v in re.split(r"(\d+)", s)
 ]
 ps1 = parts(str(s1))
 ps2 = parts(str(s2))
 # compare
 if ps1 < ps2:
 return -1
 if ps1 > ps2:
 return 1
 return 0
connection.create_collation("strnum", str_num_collate)
print("\nUsing strnum")
for row in connection.execute(
 "select * from names order by name collate strnum"
):
 print(row)
Standard sorting
('file1',)
('file17',)
('file20',)
('file3',)
('file7',)
Using strnum
('file1',)
('file3',)
('file7',)
('file17',)
('file20',)

Accessing results by column name

You can access results by column name using dataclasses. APSW provides apsw.ext.DataClassRowFactory for names.

importapsw.ext
connection.execute(
"""
 create table books(id, title, author, year);
 insert into books values(7, 'Animal Farm', 'George Orwell', 1945);
 insert into books values(37, 'The Picture of Dorian Gray', 'Oscar Wilde', 1890);
 """
)
# Normally you use column numbers
for row in connection.execute(
 "select title, id, year from books where author=?",
 ("Oscar Wilde",),
):
 # this is very fragile
 print("title", row[0])
 print("id", row[1])
 print("year", row[2])
# Turn on dataclasses - frozen makes them read-only
connection.row_trace = apsw.ext.DataClassRowFactory(
 dataclass_kwargs={"frozen": True}
)
print("\nNow with dataclasses\n")
# Same query - note using AS to set column name
for row in connection.execute(
"""SELECT title,
 id AS book_id,
 year AS book_year
 FROM books WHERE author = ?""",
 ("Oscar Wilde",),
):
 print("title", row.title)
 print("id", row.book_id)
 print("year", row.book_year)
# clear
connection.row_trace = None
title The Picture of Dorian Gray
id 37
year 1890
Now with dataclasses
title The Picture of Dorian Gray
id 37
year 1890

Type conversion into/out of database

You can use apsw.ext.TypesConverterCursorFactory to do conversion, both for types you define and for other types.

importapsw.ext
registrar = apsw.ext.TypesConverterCursorFactory()
connection.cursor_factory = registrar
# A type we define - deriving from SQLiteTypeAdapter automatically registers conversion
# to a SQLite value
classPoint(apsw.ext.SQLiteTypeAdapter):
 def__init__(self, x, y):
 self.x = x
 self.y = y
 def__repr__(self) -> str:
 return f"Point({self.x}, {self.y})"
 def__eq__(self, other: object) -> bool:
 return (
 isinstance(other, type(self))
 and self.x == other.x
 and self.y == other.y
 )
 defto_sqlite_value(self) -> str:
 # called to convert Point into something SQLite supports
 return f"{self.x};{self.y}"
 # This converter will be registered
 @classmethod
 defconvert_from_sqlite(cls, value: str) -> Point:
 return cls(*(float(part) for part in value.split(";")))
# Existing types
defcomplex_to_sqlite_value(c: complex) -> str:
 return f"{c.real}+{c.imag}"
defdatetime_to_sqlite_value(dt: datetime.datetime) -> float:
 # Represent as floating point UTC value no matter
 # what timezone is used. Also consider other
 # formats like ISO8601.
 return dt.timestamp()
# ... require manual registration
registrar.register_adapter(complex, complex_to_sqlite_value)
registrar.register_adapter(
 datetime.datetime, datetime_to_sqlite_value
)
# conversion from a SQLite value requires registration
registrar.register_converter("POINT", Point.convert_from_sqlite)
# ... and for stdlib types
defsqlite_to_complex(v: str) -> complex:
 return complex(*(float(part) for part in v.split("+")))
defsqlite_to_datetime(v: float) -> datetime.datetime:
 # Keep the UTC values coming back from the database
 # as UTC
 return datetime.datetime.fromtimestamp(v, datetime.timezone.utc)
registrar.register_converter("COMPLEX", sqlite_to_complex)
registrar.register_converter("TIMESTAMP", sqlite_to_datetime)
# note that the type names are case sensitive and must match the
# registration
connection.execute(
 "create table conversion(p POINT, c COMPLEX, t TIMESTAMP)"
)
# convert going into database
test_data = (Point(5.2, 7.6), 3 + 4j, datetime.datetime.now())
connection.execute(
 "insert into conversion values(?, ?, ?)", test_data
)
print("inserted", test_data)
# and coming back out
print("querying data")
for row in connection.execute("select * from conversion"):
 for i, value in enumerate(row):
 print(f"column {i} = {value!r}")
# clear registrar
connection.cursor_factory = apsw.Cursor
inserted (Point(5.2, 7.6), (3+4j), datetime.datetime(2025, 11, 28, 10, 45, 51, 313381))
querying data
column 0 = Point(5.2, 7.6)
column 1 = (3+4j)
column 2 = datetime.datetime(2025, 11, 28, 18, 45, 51, 313381, tzinfo=datetime.timezone.utc)

Runtime Python objects

While only 5 types can be stored, you can pass any Python objects to and from your functions at runtime.

# Python set which isn't a supported SQLite type
# containing items like a complex number and stdout which
# definitely aren't SQLite compatible
py_value = {1, 2, "three", 4 + 5j, sys.stdout}
# Trying to pass it as a value gives TypeError
try:
 print(connection.execute("select ?", (py_value,)).get)
except TypeError as exc:
 print(exc)
# Now wrap it and it works
print(
 "select ?",
 connection.execute("select ?", (apsw.pyobject(py_value),)).get,
)
# It is still null at the SQL level
print(
 "select typeof(?)",
 connection.execute(
 "select typeof(?)", (apsw.pyobject(py_value),)
 ).get,
)
# Lets make a set which SQLite knows nothing about
defmake_set(*args):
 print(f"make_set got {args!r}")
 # this will return a set, so we also need to mark it
 return apsw.pyobject(set(args))
connection.create_scalar_function("make_set", make_set)
print(
 "select make_set(?, ?, ?)",
 connection.execute(
 "select make_set(?, ?, ?)",
 (
 # these aren't SQLite types
 apsw.pyobject(3 + 4j),
 apsw.pyobject(sys.stdin),
 # but a string is
 "hello",
 ),
 ).get,
)
Bad binding argument type supplied - argument #1: type set
select ? {1, 2, <_io.StringIO object at 0x753729983c70>, 'three', (4+5j)}
select typeof(?) null
make_set got ((3+4j), <_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, 'hello')
select make_set(?, ?, ?) {'hello', <_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, (3+4j)}

Query limiting

apsw.ext.query_limit() limits rows and time in a block across all the queries within the block

importapsw.ext
# Use this to make many (virtual) rows
apsw.ext.make_virtual_module(
 connection, "generate_series", apsw.ext.generate_series
)
rows = []
with apsw.ext.query_limit(connection, row_limit=20):
 # 11 rows will come from this
 for (number,) in connection.execute(
 "select * from generate_series(0, 10)"
 ):
 rows.append(number)
 # next query would be 1,000 but we will hit
 # the limit
 for (number,) in connection.execute(
 "select * from generate_series(0, 999)"
 ):
 rows.append(number)
# lets see what we got
print(f"{len(rows)=}")
# We can also time limit
start = time.monotonic()
with apsw.ext.query_limit(connection, timeout=0.2):
 for (number,) in connection.execute(
 "select * from generate_series(0, 1000000000)"
 ):
 pass
print(
 f"After {time.monotonic()-start:.3f} seconds, we hit {number=}"
)
# We used the default "no exception" exception. Lets have an explicit exception.
# with both row and time limits ...
try:
 with apsw.ext.query_limit(
 connection,
 row_limit=1000,
 timeout=1000,
 row_exception=IndexError,
 timeout_exception=TimeoutError,
 ):
 for (number,) in connection.execute(
 "select * from generate_series(0, 1000000000)"
 ):
 pass
except Exception as exc:
 print(f"{exc=}")
len(rows)=20
After 0.200 seconds, we hit number=307329
exc=IndexError('query row limit hit')

Query details

apsw.ext.query_info() can provide a lot of information about a query (without running it)

importapsw.ext
# test tables
connection.execute(
"""
 create table customers(
 id INTEGER PRIMARY KEY,
 name CHAR,
 address CHAR);
 create table orders(
 id INTEGER PRIMARY KEY,
 customer_id INTEGER,
 item MY_OWN_TYPE);
 create index cust_addr on customers(address);
"""
)
query = """
 SELECT * FROM orders
 JOIN customers ON orders.customer_id=customers.id
 WHERE address = ?;
 SELECT 7;"""
# ask for all information available
qd = apsw.ext.query_info(
 connection,
 query,
 actions=True, # which tables/views etc and how they are accessed
 explain=True, # shows low level VDBE
 explain_query_plan=True, # how SQLite solves the query
)
print("query", qd.query)
print("\nbindings_count", qd.bindings_count)
print("\nbindings_names", qd.bindings_names)
print("\nexpanded_sql", qd.expanded_sql)
print("\nfirst_query", qd.first_query)
print("\nquery_remaining", qd.query_remaining)
print("\nis_explain", qd.is_explain)
print("\nis_readonly", qd.is_readonly)
print("\ndescription")
pprint(qd.description)
if hasattr(qd, "description_full"):
 print("\ndescription_full")
 pprint(qd.description_full)
print("\nquery_plan")
pprint(qd.query_plan)
print("\nFirst 5 actions")
pprint(qd.actions[:5])
print("\nFirst 5 explain")
pprint(qd.explain[:5])
query
 SELECT * FROM orders
 JOIN customers ON orders.customer_id=customers.id
 WHERE address = ?;
 SELECT 7;
bindings_count 1
bindings_names (None,)
expanded_sql None
first_query
 SELECT * FROM orders
 JOIN customers ON orders.customer_id=customers.id
 WHERE address = ?;
query_remaining SELECT 7;
is_explain 0
is_readonly True
description
(('id', 'INTEGER'),
 ('customer_id', 'INTEGER'),
 ('item', 'MY_OWN_TYPE'),
 ('id', 'INTEGER'),
 ('name', 'CHAR'),
 ('address', 'CHAR'))
description_full
(('id', 'INTEGER', 'main', 'orders', 'id'),
 ('customer_id', 'INTEGER', 'main', 'orders', 'customer_id'),
 ('item', 'MY_OWN_TYPE', 'main', 'orders', 'item'),
 ('id', 'INTEGER', 'main', 'customers', 'id'),
 ('name', 'CHAR', 'main', 'customers', 'name'),
 ('address', 'CHAR', 'main', 'customers', 'address'))
query_plan
QueryPlan(detail='QUERY PLAN',
 sub=[QueryPlan(detail='SCAN orders', sub=None),
 QueryPlan(detail='SEARCH customers USING INTEGER PRIMARY KEY '
 '(rowid=?)',
 sub=None)])
First 5 actions
[QueryAction(action=21,
 action_name='SQLITE_SELECT',
 column_name=None,
 database_name=None,
 file_name=None,
 function_name=None,
 module_name=None,
 operation=None,
 pragma_name=None,
 pragma_value=None,
 table_name=None,
 trigger_name=None,
 trigger_or_view=None,
 view_name=None),
 QueryAction(action=20,
 action_name='SQLITE_READ',
 column_name='id',
 database_name='main',
 file_name=None,
 function_name=None,
 module_name=None,
 operation=None,
 pragma_name=None,
 pragma_value=None,
 table_name='orders',
 trigger_name=None,
 trigger_or_view=None,
 view_name=None),
 QueryAction(action=20,
 action_name='SQLITE_READ',
 column_name='customer_id',
 database_name='main',
 file_name=None,
 function_name=None,
 module_name=None,
 operation=None,
 pragma_name=None,
 pragma_value=None,
 table_name='orders',
 trigger_name=None,
 trigger_or_view=None,
 view_name=None),
 QueryAction(action=20,
 action_name='SQLITE_READ',
 column_name='item',
 database_name='main',
 file_name=None,
 function_name=None,
 module_name=None,
 operation=None,
 pragma_name=None,
 pragma_value=None,
 table_name='orders',
 trigger_name=None,
 trigger_or_view=None,
 view_name=None),
 QueryAction(action=20,
 action_name='SQLITE_READ',
 column_name='id',
 database_name='main',
 file_name=None,
 function_name=None,
 module_name=None,
 operation=None,
 pragma_name=None,
 pragma_value=None,
 table_name='customers',
 trigger_name=None,
 trigger_or_view=None,
 view_name=None)]
First 5 explain
[VDBEInstruction(addr=0,
 opcode='Init',
 comment=None,
 p1=0,
 p2=17,
 p3=0,
 p4=None,
 p5=0),
 VDBEInstruction(addr=1,
 opcode='OpenRead',
 comment=None,
 p1=0,
 p2=12,
 p3=0,
 p4='3',
 p5=0),
 VDBEInstruction(addr=2,
 opcode='OpenRead',
 comment=None,
 p1=1,
 p2=11,
 p3=0,
 p4='3',
 p5=0),
 VDBEInstruction(addr=3,
 opcode='Rewind',
 comment=None,
 p1=0,
 p2=16,
 p3=0,
 p4=None,
 p5=0),
 VDBEInstruction(addr=4,
 opcode='Column',
 comment=None,
 p1=0,
 p2=1,
 p3=1,
 p4=None,
 p5=0)]

Blob I/O

BLOBS (binary large objects) are supported by SQLite. Note that you cannot change the size of one, but you can allocate one filled with zeroes, and then later open it and read / write the contents similar to a file, without having the entire blob in memory. Use Connection.blob_open() to open a blob.

connection.execute("create table blobby(x,y)")
# Add a blob we will fill in later
connection.execute("insert into blobby values(1, zeroblob(10000))")
# Or as a binding
connection.execute(
 "insert into blobby values(2, ?)", (apsw.zeroblob(20000),)
)
# Open a blob for writing. We need to know the rowid
rowid = connection.execute("select ROWID from blobby where x=1").get
blob = connection.blob_open("main", "blobby", "y", rowid, True)
blob.write(b"hello world")
blob.seek(2000)
blob.read(24)
# seek relative to the end
blob.seek(-32, 2)
blob.write(b"hello world, again")
blob.close()

Backup an open database

You can backup a database that is open. The pages are copied in batches of your choosing and allow continued use of the source database.

# We will copy a disk database into this memory database
destination = apsw.Connection(":memory:")
# Copy into destination
with destination.backup("main", connection, "main") as backup:
 # The source database can change while doing the backup
 # and the backup will still pick up those changes
 while not backup.done:
 backup.step(7) # copy up to 7 pages each time
 # monitor progress
 print(backup.remaining, backup.page_count)
15 22
8 22
1 22
0 22

Authorizer (control what SQL can do)

You can allow, deny, or ignore what SQL does. Use Connection.authorizer to set an authorizer.

defauth(
 operation: int,
 p1: Optional[str],
 p2: Optional[str],
 db_name: Optional[str],
 trigger_or_view: Optional[str],
) -> int:
"""Called when each operation is prepared. We can return SQLITE_OK, SQLITE_DENY or
 SQLITE_IGNORE"""
 # find the operation name
 print(
 apsw.mapping_authorizer_function[operation],
 p1,
 p2,
 db_name,
 trigger_or_view,
 )
 if (
 operation == apsw.SQLITE_CREATE_TABLE
 and p1
 and p1.startswith("private")
 ):
 return apsw.SQLITE_DENY # not allowed to create tables whose names start with private
 return apsw.SQLITE_OK # always allow
connection.authorizer = auth
connection.execute("insert into names values('foo')")
connection.execute("select name from names limit 1")
try:
 connection.execute("create table private_stuff(secret)")
 print("Created secret table!")
except Exception as e:
 print(e)
# Clear authorizer
connection.authorizer = None
SQLITE_INSERT names None main None
SQLITE_SELECT None None None None
SQLITE_READ names name main None
SQLITE_INSERT sqlite_master None main None
SQLITE_CREATE_TABLE private_stuff None main None
not authorized

Progress handler

Some operations (eg joins, sorting) can take many operations to complete. Register a progress handler callback with Connection.set_progress_handler() which lets you provide feedback and allows cancelling.

# create a table with random numbers
with connection:
 connection.execute("create table numbers(x)")
 connection.executemany(
 "insert into numbers values(?)",
 ((random.randint(0, 9999999999),) for _ in range(100)),
 )
defprogress_handler() -> bool:
 print("progress handler called")
 return False # returning True aborts
# register handler every 50 vdbe instructions
connection.set_progress_handler(progress_handler, 50)
# Sorting the numbers to find the biggest
for max_num in connection.execute("select max(x) from numbers"):
 print(max_num)
# Clear handler
connection.set_progress_handler(None)
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
progress handler called
(9923950222,)

File Control

We can get/set low level information using the Connection.file_control() interface. In this example we get the data version. There is a pragma but it doesn’t change for commits on the same connection.

# We use ctypes to provide the correct C level data types and pointers
importctypes
defget_data_version(db):
 # unsigned 32 bit integer
 data_version = ctypes.c_uint32(0)
 ok = db.file_control(
 "main", # or an attached database name
 apsw.SQLITE_FCNTL_DATA_VERSION, # code
 ctypes.addressof(data_version),
 ) # pass C level pointer
 assert ok, "SQLITE_FCNTL_DATA_VERSION was not understood!"
 return data_version.value
# Show starting values
print(
 "fcntl",
 get_data_version(connection),
 "pragma",
 connection.pragma("data_version"),
)
# See the fcntl value versus pragma value
for sql in (
 "create table fcntl_example(x)",
 "begin ; insert into fcntl_example values(3)",
 # we can see the version doesn't change inside a transaction
 "insert into fcntl_example values(4)",
 "commit",
 "pragma user_version=1234",
):
 print(sql)
 connection.execute(sql)
 print(
 "fcntl",
 get_data_version(connection),
 "pragma",
 connection.pragma("data_version"),
 )
fcntl 40 pragma 2
create table fcntl_example(x)
fcntl 41 pragma 2
begin ; insert into fcntl_example values(3)
fcntl 41 pragma 2
insert into fcntl_example values(4)
fcntl 41 pragma 2
commit
fcntl 42 pragma 2
pragma user_version=1234
fcntl 43 pragma 2

Commit hook

A commit hook can allow or veto commits. Register a commit hook with Connection.set_commit_hook().

defmy_commit_hook() -> bool:
 print("in commit hook")
 hour = time.localtime()[3]
 if hour >= 8 and hour < 18:
 print("commits okay at this time")
 return False # let commit go ahead
 print("no commits out of hours")
 return True # abort commits outside of 8am through 6pm
connection.set_commit_hook(my_commit_hook)
try:
 with connection:
 connection.execute(
"""create table example(x,y,z);
 insert into example values (3,4,5)"""
 )
except apsw.ConstraintError as exc:
 print("commit was not allowed")
connection.set_commit_hook(None)
in commit hook
commits okay at this time

Update hook

Update hooks let you know that data has been added, changed, or removed. For example you could use this to discard cached information. Register a hook using Connection.set_update_hook().

defmy_update_hook(
 type: int, db_name: str, table_name: str, rowid: int
) -> None:
 op: str = apsw.mapping_authorizer_function[type]
 print(
 f"Updated: {op} db {db_name}, table {table_name}, rowid {rowid}"
 )
connection.set_update_hook(my_update_hook)
connection.execute("insert into names values(?)", ("file93",))
connection.execute(
 "update names set name=? where name=?", ("file94", "file93")
)
connection.execute("delete from names where name=?", ("file94",))
# Clear the hook
connection.set_update_hook(None)
Updated: SQLITE_INSERT db main, table names, rowid 7
Updated: SQLITE_UPDATE db main, table names, rowid 7
Updated: SQLITE_DELETE db main, table names, rowid 7

Virtual tables

Virtual tables let you provide data on demand as a SQLite table so you can use SQL queries against that data. Writing your own virtual table requires understanding how to return less than all the data via the BestIndex method.

You can export a Python function as a virtual table in 3 lines of code using apsw.ext.make_virtual_module(), being able to provide both positional and keyword arguments.

For the first example you’ll find apsw.ext.generate_series() useful instead.

# Yield a row at a time
deftable_range(start=1, stop=100, step=1):
 for i in range(start, stop + 1, step):
 yield (i,)
# set column names
table_range.columns = ("value",)
# set how to access what table_range returns
table_range.column_access = apsw.ext.VTColumnAccess.By_Index
# register it
apsw.ext.make_virtual_module(connection, "range", table_range)
# see it work. we can provide both positional and keyword
# arguments
query = "SELECT * FROM range(90) WHERE step=2"
print(apsw.ext.format_query_table(connection, query))
# the parameters are hidden columns so '*' doesn't select them
# but you can ask
query = "SELECT *, start, stop, step FROM range(89) WHERE step=3"
print(apsw.ext.format_query_table(connection, query))
# Expose the unicode database.
importunicodedata
# A more complex example exporting unicodedata module
# The methods we will call on each codepoint
unicode_methods = (
 "name",
 "decimal",
 "digit",
 "numeric",
 "category",
 "combining",
 "bidirectional",
 "east_asian_width",
 "mirrored",
 "decomposition",
)
# the function we will turn into a virtual table returning
# each row as a dict
defunicode_data(start=0, stop=sys.maxunicode):
 # some methods raise ValueError on some codepoints
 defcall(meth: str, c: str):
 try:
 return getattr(unicodedata, meth)(c)
 except ValueError:
 return None
 for c in range(start, stop + 1):
 yield {k: call(k, chr(c)) for k in unicode_methods}
# setup column names and access
unicode_data.columns = unicode_methods
unicode_data.column_access = apsw.ext.VTColumnAccess.By_Name
# register
apsw.ext.make_virtual_module(connection, "unicode_data", unicode_data)
# how many codepoints are in each category?
query = """
 SELECT count(*), category FROM unicode_data
 WHERE stop = 0xffff -- BMP only
 GROUP BY category
 ORDER BY category
 LIMIT 10"""
print(apsw.ext.format_query_table(connection, query))
# A more complex example - given a list of directories return information
# about the files within them recursively
defget_files_info(
 directories: str,
 sep: str = os.pathsep,
 *,
 ignore_symlinks: bool = True,
) -> Iterator[dict[str, Any]]:
 for root in directories.split(sep):
 with os.scandir(root) as sd:
 for entry in sd:
 if entry.is_symlink() and ignore_symlinks:
 continue
 if entry.is_dir():
 yield from get_files_info(
 os.path.join(root, entry.name),
 ignore_symlinks=ignore_symlinks,
 )
 elif entry.is_file():
 s = entry.stat()
 yield {
 "directory": root,
 "name": entry.name,
 "extension": os.path.splitext(entry.name)[1],
 **{
 k: getattr(s, k)
 for k in get_files_info.stat_columns
 },
 }
# which stat columns do we want?
get_files_info.stat_columns = tuple(
 n for n in dir(os.stat(".")) if n.startswith("st_")
)
# setup columns and access by providing an example of the first entry returned
(
 get_files_info.columns,
 get_files_info.column_access,
) = apsw.ext.get_column_names(next(get_files_info(".")))
apsw.ext.make_virtual_module(connection, "files_info", get_files_info)
# all the sys.path directories
bindings = (
 os.pathsep.join(
 p
 for p in sys.path
 if os.path.isdir(p)
 # except our current one
 and not os.path.samefile(p, ".")
 ),
)
# Find the 3 biggest files that aren't libraries
query = """SELECT st_size, directory, name
 FROM files_info(?)
 WHERE extension NOT IN ('.a', '.so')
 ORDER BY st_size DESC
 LIMIT 3"""
print(apsw.ext.format_query_table(connection, query, bindings))
# Find the 3 oldest Python files
query = """SELECT DATE(st_ctime, 'auto') AS date, directory, name
 FROM files_info(?)
 WHERE extension='.py'
 ORDER BY st_size DESC
 LIMIT 3"""
print(apsw.ext.format_query_table(connection, query, bindings))
# find space used by filename extension
query = """SELECT extension, SUM(st_size) as total_size
 FROM files_info(?)
 GROUP BY extension
 ORDER BY extension"""
print(apsw.ext.format_query_table(connection, query, bindings))
# unregister a virtual table by passing None
connection.create_module("files_info", None)
┌───────┐
│ value │
│ 90 │
│ 92 │
│ 94 │
│ 96 │
│ 98 │
│ 100 │
└───────┘
┌───────┬───────┬──────┬──────┐
│ value │ start │ stop │ step │
│ 89 │ 89 │ 100 │ 3 │
│ 92 │ 89 │ 100 │ 3 │
│ 95 │ 89 │ 100 │ 3 │
│ 98 │ 89 │ 100 │ 3 │
└───────┴───────┴──────┴──────┘
┌──────────┬──────────┐
│ count(*) │ category │
│ 65 │ Cc │
│ 43 │ Cf │
│ 1432 │ Cn │
│ 6400 │ Co │
│ 2048 │ Cs │
│ 1448 │ Ll │
│ 236 │ Lm │
│ 46126 │ Lo │
│ 31 │ Lt │
│ 1132 │ Lu │
└──────────┴──────────┘
┌─────────┬────────────────────────────────────────────┬───────────────────────┐
│ st_size │ directory │ name │
├─────────┼────────────────────────────────────────────┼───────────────────────┤
│ 558634 │ /usr/lib/python3.14/pydoc_data/__pycache__ │ topics.cpython-314.p- │
│ │ │ yc │
├─────────┼────────────────────────────────────────────┼───────────────────────┤
│ 558057 │ /usr/lib/python3.14/pydoc_data │ topics.py │
├─────────┼────────────────────────────────────────────┼───────────────────────┤
│ 229038 │ /usr/lib/python3.14 │ _pydecimal.py │
└─────────┴────────────────────────────────────────────┴───────────────────────┘
┌────────────┬────────────────────────────────┬───────────────┐
│ date │ directory │ name │
│ 2025年10月31日 │ /usr/lib/python3.14/pydoc_data │ topics.py │
│ 2025年10月31日 │ /usr/lib/python3.14 │ _pydecimal.py │
│ 2025年10月31日 │ /usr/lib/python3.14 │ turtle.py │
└────────────┴────────────────────────────────┴───────────────┘
┌────────────┬────────────┐
│ extension │ total_size │
│ │ 259613 │
│ .a │ 25842524 │
│ .allowlist │ 56 │
│ .bootstrap │ 1063 │
│ .c │ 5406 │
│ .cfg │ 341 │
│ .csh │ 937 │
│ .css │ 1325 │
│ .fish │ 2208 │
│ .in │ 1444 │
│ .ini │ 2037 │
│ .json │ 60181 │
│ .local │ 676 │
│ .o │ 11096 │
│ .patch │ 2682 │
│ .ps1 │ 9031 │
│ .py │ 11249525 │
│ .pyc │ 12223522 │
│ .rst │ 9561 │
│ .sh │ 2752 │
│ .so │ 8745952 │
│ .sql │ 9749 │
│ .stdlib │ 7244 │
│ .supp │ 70 │
│ .txt │ 13804 │
└────────────┴────────────┘

VFS - Virtual File System

VFS lets you control how SQLite accesses storage. APSW makes it easy to "inherit" from an existing VFS and monitor or alter data as it flows through.

URI are shown as a way to receive parameters when opening/creating a database file, and pragmas for receiving parameters once a database is open.

# This example VFS obfuscates the database file contents by xor all
# bytes with 0xa5.
defobfuscate(data: bytes):
 return bytes([x ^ 0xA5 for x in data])
# Inheriting from a base of "" means the default vfs
classObfuscatedVFS(apsw.VFS):
 def__init__(self, vfsname="obfuscated", basevfs=""):
 self.vfs_name = vfsname
 self.base_vfs = basevfs
 super().__init__(self.vfs_name, self.base_vfs)
 # We want to return our own file implementation, but also
 # want it to inherit
 defxOpen(self, name, flags):
 in_flags = []
 for k, v in apsw.mapping_open_flags.items():
 if isinstance(k, int) and flags[0] & k:
 in_flags.append(v)
 print("xOpen flags", " | ".join(in_flags))
 if isinstance(name, apsw.URIFilename):
 print(" uri filename", name.filename())
 # We can look at uri parameters
 print(" fast is", name.uri_parameter("fast"))
 print(" level is", name.uri_int("level", 3))
 print(" warp is", name.uri_boolean("warp", False))
 print(
 " notpresent is", name.uri_parameter("notpresent")
 )
 # all of them
 print(" all uris", name.parameters)
 else:
 print(" filename", name)
 return ObfuscatedVFSFile(self.base_vfs, name, flags)
# The file implementation where we override xRead and xWrite to call our
# encryption routine
classObfuscatedVFSFile(apsw.VFSFile):
 def__init__(self, inheritfromvfsname, filename, flags):
 super().__init__(inheritfromvfsname, filename, flags)
 defxRead(self, amount, offset):
 return obfuscate(super().xRead(amount, offset))
 defxWrite(self, data, offset):
 super().xWrite(obfuscate(data), offset)
 defxFileControl(self, op: int, ptr: int) -> bool:
 if op != apsw.SQLITE_FCNTL_PRAGMA:
 return super().xFileControl(op, ptr)
 # implement our own pragma
 p = apsw.VFSFcntlPragma(ptr)
 print(f"pragma received {p.name} = {p.value}")
 # what do we understand?
 if p.name == "my_custom_pragma":
 p.result = "orange"
 return True
 # We did not understand
 return False
# To register the VFS we just instantiate it
obfuvfs = ObfuscatedVFS()
# Lets see what vfs are now available?
print("VFS available", apsw.vfs_names())
# Make an obfuscated db, passing in some URI parameters
# default open flags
open_flags = apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE
# add in using URI parameters
open_flags |= apsw.SQLITE_OPEN_URI
# uri parameters are after the ? separated by &
obfudb = apsw.Connection(
 "file:myobfudb?fast=speed&level=7&warp=on&another=true",
 flags=open_flags,
 vfs=obfuvfs.vfs_name,
)
# Check it works
obfudb.execute("create table foo(x,y); insert into foo values(1,2)")
# Check it really is obfuscated on disk
print("What is on disk", repr(Path("myobfudb").read_bytes()[:20]))
# And unobfuscating it
print(
 "Unobfuscated disk",
 repr(obfuscate(Path("myobfudb").read_bytes()[:20])),
)
# Custom pragma
print(
 "pragma returned", obfudb.pragma("my_custom_pragma", "my value")
)
# Tidy up
obfudb.close()
os.remove("myobfudb")
VFS available ['unix', 'obfuscated', 'memdb', 'unix-excl', 'unix-dotfile', 'unix-none']
xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_MAIN_DB | SQLITE_OPEN_READWRITE | SQLITE_OPEN_URI
 uri filename /space/apsw/myobfudb
 fast is speed
 level is 7
 warp is True
 notpresent is None
 all uris ('fast', 'level', 'warp', 'another')
pragma received journal_mode = wal
xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_MAIN_JOURNAL | SQLITE_OPEN_READWRITE
 filename /space/apsw/myobfudb-journal
pragma received foreign_keys = ON
pragma received optimize = 65538
xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | SQLITE_OPEN_WAL
 filename /space/apsw/myobfudb-wal
pragma received recursive_triggers = ON
What is on disk b'\xf6\xf4\xe9\xcc\xd1\xc0\x85\xc3\xca\xd7\xc8\xc4\xd1\x85\x96\xa5\xb5\xa5\xa7\xa7'
Unobfuscated disk b'SQLite format 3\x00\x10\x00\x02\x02'
pragma received my_custom_pragma = my value
pragma returned orange

Limits

SQLite lets you see and update various limits via Connection.limit()

# Print some limits
for limit in ("LENGTH", "COLUMN", "ATTACHED"):
 name = "SQLITE_LIMIT_" + limit
 max_name = "SQLITE_MAX_" + limit # compile time limit
 orig = connection.limit(getattr(apsw, name))
 print(name, orig)
 # To get the maximum, set to 0x7fffffff and then read value back
 connection.limit(getattr(apsw, name), 0x7FFFFFFF)
 max = connection.limit(getattr(apsw, name))
 print(max_name, " ", max)
# Set limit for size of a string
connection.execute("create table testlimit(s)")
connection.execute(
 "insert into testlimit values(?)", ("x" * 1024,)
) # 1024 char string
connection.limit(apsw.SQLITE_LIMIT_LENGTH, 1023) # limit is now 1023
try:
 connection.execute(
 "insert into testlimit values(?)", ("y" * 1024,)
 )
 print("string exceeding limit was inserted")
except apsw.TooBigError:
 print("Caught toobig exception")
# reset back to largest value
connection.limit(apsw.SQLITE_LIMIT_LENGTH, 0x7FFFFFFF)
SQLITE_LIMIT_LENGTH 1000000000
SQLITE_MAX_LENGTH 1000000000
SQLITE_LIMIT_COLUMN 2000
SQLITE_MAX_COLUMN 2000
SQLITE_LIMIT_ATTACHED 125
SQLITE_MAX_ATTACHED 125
Caught toobig exception

Shell

APSW includes a shell like the one in SQLite, and is also extensible from Python.

importapsw.shell
# Here we use the shell to do a csv export and then dump part of the
# database
# Export to a StringIO
importio
output = io.StringIO()
shell = apsw.shell.Shell(stdout=output, db=connection)
# How to execute a dot command
shell.process_command(".mode csv")
shell.process_command(".headers on")
# How to execute SQL
shell.process_sql(
"""
 create table csvtest(column1, column2 INTEGER);
 create index faster on csvtest(column1);
 insert into csvtest values(3, 4);
 insert into csvtest values('a b', NULL);
"""
)
# Or let the shell figure out SQL vs dot command
shell.process_complete_line("select * from csvtest")
# see the result
print(output.getvalue())
# reset output
output.seek(0)
# make a dump of the same table
shell.process_command(".dump csvtest%")
# see the result
print("\nDump output\n")
print(output.getvalue())
column1,column2
3,4
a b,
Dump output
-- SQLite dump (by APSW 3.51.1.0)
-- SQLite version 3.51.1
-- Date: Fri Nov 28 10:45:51 2025
-- Tables like: csvtest%
-- Database: /space/apsw/dbfile
-- User: rogerb @ clamps
-- The values of various per-database settings
PRAGMA page_size=4096;
-- PRAGMA encoding='UTF-8';
-- PRAGMA auto_vacuum=NONE;
-- PRAGMA max_page_count=4294967294;
BEGIN TRANSACTION;
-- Table csvtest
DROP TABLE IF EXISTS csvtest;
CREATE TABLE csvtest(column1, column2 INTEGER);
INSERT INTO csvtest VALUES(3,4);
INSERT INTO csvtest VALUES('a b',NULL);
-- Triggers and indices on csvtest
CREATE INDEX faster on csvtest(column1);
-- Database header
pragma user_version=1234;
COMMIT TRANSACTION;

Statistics

SQLite provides statistics by status(). Use Connection.status() for per connection statistics.

current_usage, max_usage = apsw.status(apsw.SQLITE_STATUS_MEMORY_USED)
print(f"SQLite memory usage {current_usage} max {max_usage}")
schema_used, _ = connection.status(apsw.SQLITE_DBSTATUS_SCHEMA_USED)
print(f"{schema_used} bytes used to store schema for this connection")
SQLite memory usage 522192 max 2499320
5360 bytes used to store schema for this connection

Tracing

This shows using Connection.trace_v2()

# From https://www.sqlite.org/lang_with.html
# Outlandish Recursive Query Examples
query = """WITH RECURSIVE
 xaxis(x) AS (VALUES(-2.0) UNION ALL SELECT x+0.05 FROM xaxis WHERE x<1.2),
 yaxis(y) AS (VALUES(-1.0) UNION ALL SELECT y+0.1 FROM yaxis WHERE y<1.0),
 m(iter, cx, cy, x, y) AS (
 SELECT 0, x, y, 0.0, 0.0 FROM xaxis, yaxis
 UNION ALL
 SELECT iter+1, cx, cy, x*x-y*y + cx, 2.0*x*y + cy FROM m
 WHERE (x*x + y*y) < 4.0 AND iter<28
 ),
 m2(iter, cx, cy) AS (
 SELECT max(iter), cx, cy FROM m GROUP BY cx, cy
 ),
 a(t) AS (
 SELECT group_concat( substr(' .+*#', 1+min(iter/7,4), 1), '')
 FROM m2 GROUP BY cy
 )
 SELECT group_concat(rtrim(t),x'0a') FROM a;"""
deftrace_hook(trace: dict) -> None:
 # check the sql and connection are as expected and remove from trace
 # so we don't print them
 assert (
 trace.pop("sql", query) == query
 and trace.pop("connection") is connection
 )
 print("code is ", apsw.mapping_trace_codes[trace["code"]])
 pprint(trace)
connection.trace_v2(
 apsw.SQLITE_TRACE_STMT
 | apsw.SQLITE_TRACE_PROFILE
 | apsw.SQLITE_TRACE_ROW,
 trace_hook,
)
# We will get one each of the trace events
for _ in connection.execute(query):
 pass
# Turn off tracing
connection.trace_v2(0, None)
code is SQLITE_TRACE_STMT
{'code': 1,
 'explain': 0,
 'id': 393889112,
 'readonly': True,
 'total_changes': 146,
 'trigger': False}
code is SQLITE_TRACE_ROW
{'code': 4, 'id': 393889112}
code is SQLITE_TRACE_PROFILE
{'code': 2,
 'id': 393889112,
 'nanoseconds': 17000000,
 'stmt_status': {'SQLITE_STMTSTATUS_AUTOINDEX': 0,
 'SQLITE_STMTSTATUS_FILTER_HIT': 0,
 'SQLITE_STMTSTATUS_FILTER_MISS': 0,
 'SQLITE_STMTSTATUS_FULLSCAN_STEP': 1365,
 'SQLITE_STMTSTATUS_MEMUSED': 15784,
 'SQLITE_STMTSTATUS_REPREPARE': 0,
 'SQLITE_STMTSTATUS_RUN': 1,
 'SQLITE_STMTSTATUS_SORT': 2,
 'SQLITE_STMTSTATUS_VM_STEP': 1015353},
 'total_changes': 146}

System and SQLite resource usage in a block

Use apsw.ext.ShowResourceUsage() to see what resources a block of code does. We use the same query from above.

Only statistics that have changed are shown in the summary. There are 21 SQLite values tracked including caching, and 20 system values.

with apsw.ext.ShowResourceUsage(
 sys.stdout, db=connection, scope="thread"
):
 # some SQLite work
 rows = connection.execute(query).get
 # and some non-SQLite work - the imports cause filesystem access
 importstatistics,tokenize,uuid,fractions,pydoc,decimal
 # and take some wall clock time
 time.sleep(1.3)
 Total CPU consumption 0.024
 Wall clock 1.326
 Block input operations 8
 Page faults with I/O 1
 Maximum resident set size 1,380
 Page faults - no I/O 299
 Involuntary context switches 449
 Voluntary context switches 2
 Time in system mode 0.002
 Time in user mode 0.022
 SQLite full table scan 1,365
 SQLite sort operations 2
 SQLite vm operations 1,015,353
 SQLite statements completed 1
SQLite allocations lookaside full 17,272

SQL statement tracing in a block

Use apsw.ext.Trace() to see SQL statements inside a block of code. This also shows behind the scenes SQL.

# Use None instead of stdout and no information is printed or gathered
with apsw.ext.Trace(
 sys.stdout,
 db=connection,
 vtable=True,
 updates=True,
 transaction=True,
):
 # APSW does a savepoint behind the scenes to wrap the block
 with connection:
 # Some regular SQL
 connection.execute("create table multi(x)")
 # executemany runs the same statement repeatedly
 connection.executemany(
 "insert into multi values(?)", ((x,) for x in range(5))
 )
 # See how many rows were processed
 connection.execute("select * from multi limit 2").fetchall()
 # You can also see how many rows were changed
 connection.execute("delete from multi where x < 4")
 # pragma functions are virtual tables - see how many rows this processes even
 # though only one has 'pow'
 connection.execute(
 "SELECT narg FROM pragma_function_list WHERE name='pow'"
 ).get
 # trigger that causes rollback
 connection.execute("""
 create trigger error after insert on multi
 begin
 update multi set rowid=100+new.rowid where rowid=new.rowid;
 select raise(rollback, 'nope');
 end;
 """)
 with contextlib.suppress(apsw.ConstraintError):
 connection.execute("insert into multi values(54)")
> BEGIN DEFERRED
 Time: 0.000
 !BEGIN
> create table multi(x)
 Time: 0.000
> insert into multi values(?)
 INS 1 (0)
 Time: 0.000 Changes: 1
> insert into multi values(?)
 INS 2 (1)
 Time: 0.000 Changes: 1
> insert into multi values(?)
 INS 3 (2)
 Time: 0.001 Changes: 1
> insert into multi values(?)
 INS 4 (3)
 Time: 0.000 Changes: 1
> insert into multi values(?)
 INS 5 (4)
 Time: 0.000 Changes: 1
> select * from multi limit 2
 Time: 0.000 Rows: 2
> delete from multi where x < 4
 DEL 1 (0)
 DEL 2 (1)
 DEL 3 (2)
 DEL 4 (3)
 Time: 0.000 Changes: 4
> COMMIT
 !COMMIT
 Time: 0.007
> SELECT narg FROM pragma_function_list WHERE name='pow'
V PRAGMA function_list
 Time: 0.000 Rows: 215 VmStep: 1,509 Mem: 71.0KB
< SELECT narg FROM pragma_function_list WHERE name='pow'
 Time: 0.000 Rows: 2 VmStep: 656
 !BEGIN
> create trigger error after insert on multi
 begin
 update ...
 !COMMIT
 Time: 0.002
 !BEGIN
> insert into multi values(54)
 INS 6 (54)
T TRIGGER error
 UPD 6>106 (...)
 !ROLLBACK
< insert into multi values(54)
 Time: 0.000 Rows: 1 Changes: 1

Formatting query results table

apsw.ext.format_query_table() makes it easy to format the results of a query in an automatic adjusting table, colour, sanitizing strings, truncation etc.

# Create a table with some dummy data
connection.execute(
"""CREATE TABLE dummy(quantity, [spaces in name], last);
 INSERT INTO dummy VALUES(3, 'some regular text to make this row interesting', x'030709');
 INSERT INTO dummy VALUES(3.14, 'Tiếng Việt', null);
 INSERT INTO dummy VALUES('', ?, ' ');
""",
 ("special \t\n\f0円 cha\\rs",),
)
query = "SELECT * FROM dummy"
# default
print(apsw.ext.format_query_table(connection, query))
# no unicode boxes and maximum sanitize the text
kwargs = {"use_unicode": False, "string_sanitize": 2}
print(apsw.ext.format_query_table(connection, query, **kwargs))
# lets have unicode boxes and make things narrow
kwargs = {
 "use_unicode": True,
 "string_sanitize": 0,
 "text_width": 30,
}
print(apsw.ext.format_query_table(connection, query, **kwargs))
# have the values in SQL syntax
kwargs = {"quote": True}
print(apsw.ext.format_query_table(connection, query, **kwargs))
┌──────────┬────────────────────────────────────────────────┬─────────────┐
│ quantity │ spaces in name │ last │
├──────────┼────────────────────────────────────────────────┼─────────────┤
│ 3 │ some regular text to make this row interesting │ [ 3 bytes ] │
├──────────┼────────────────────────────────────────────────┼─────────────┤
│ 3.14 │ Tiếng Việt │ (null) │
├──────────┼────────────────────────────────────────────────┼─────────────┤
│ │ special │ │
│ │ 0円 cha\\rs │ │
└──────────┴────────────────────────────────────────────────┴─────────────┘
+----------+------------------------------------------------+-------------+
| quantity | spaces in name | last |
+----------+------------------------------------------------+-------------+
| 3 | some.regular.text.to.make.this.row.interesting | [ 3 bytes ] |
+----------+------------------------------------------------+-------------+
| 3.14 | Ti.ng.Vi.t | (null) |
+----------+------------------------------------------------+-------------+
| | special.. | . |
| | ...cha\rs | |
+----------+------------------------------------------------+-------------+
┌─────┬────────────────┬─────┐
│ qu- │ spaces in name │ la- │
│ an- │ │ st │
│ ti- │ │ │
│ ty │ │ │
├─────┼────────────────┼─────┤
│ 3 │ some regular │ [ 3 │
│ │ text to make │ by- │
│ │ this row │ te- │
│ │ interesting │ s ] │
├─────┼────────────────┼─────┤
│ 3.- │ Tiếng Việt │ (n- │
│ 14 │ │ ul- │
│ │ │ l) │
├─────┼────────────────┼─────┤
│ │ special │ │
│ │ 0円 cha\\rs │ │
└─────┴────────────────┴─────┘
┌──────────┬──────────────────────────────────────────────────┬───────────┐
│ quantity │ spaces in name │ last │
├──────────┼──────────────────────────────────────────────────┼───────────┤
│ 3 │ 'some regular text to make this row interesting' │ X'030709' │
├──────────┼──────────────────────────────────────────────────┼───────────┤
│ 3.14 │ 'Tiếng Việt' │ NULL │
├──────────┼──────────────────────────────────────────────────┼───────────┤
│ '' │ 'special │ ' ' │
│ │ 0円 cha\\rs' │ │
└──────────┴──────────────────────────────────────────────────┴───────────┘

Caching

SQLite has a builtin cache. If you do your own caching then you can find out if it is invalid via pragma for schema changes and Connection.data_version() for table content changes. Any cache is invalid if the values are different - there is no guarantee if they will go up or down.

print(
 "SQLite cache =",
 connection.pragma("cache_size"),
 " page_size = ",
 connection.pragma("page_size"),
)
# Make a second connection to change the same database main
# connection. These also work if the changes were done in a different
# process.
con2 = apsw.Connection(connection.filename)
# See values before change
print("Before values")
print(f'{connection.pragma("schema_version")=}')
print(f"{connection.data_version()=}")
print("\nAfter values")
# add to table from previous section
con2.execute("insert into dummy values(1, 2, 3)")
print(f"{connection.data_version()=}")
# and add a table. changing an existing table definition etc also
# bump the schema version
con2.execute("create table more(x,y,z)")
print(f'{connection.pragma("schema_version")=}')
SQLite cache = -2000 page_size = 4096
Before values
connection.pragma("schema_version")=23
connection.data_version()=61
After values
connection.data_version()=61
connection.pragma("schema_version")=24

The CARRAY extension

The extension makes it easy to provide an array of numbers, strings, or binary blobs during a query. The array will be used without calling back into Python code or acquiring the GIL.

Arrays of numbers can come from binary data, array.array, numpy arrays etc. Arrays of str and blobs are supplied as tuples. All data in the array has to be the same type.

Use apsw.carray() to wrap your data, and provide it as a binding. Note that it has start and stop parameters so you can use a subset of the source data. The format of the data is detected, or an explicit flags parameter can be used.

# We'll use the array module
importarray
# A packed array of 32 bit integers
ids = array.array("i", [1, 73, 9457, 62])
# Simple usage. You would normally use joins, IN etc
print(
 "ordered integers",
 connection.execute(
 "SELECT value FROM CARRAY(?) ORDER BY value",
 (apsw.carray(ids),),
 ).get
)
# Using strings and blobs is just as easy
strings = ("zero", "one", "two", "three", "four")
blobs = (b"\xf3\x72\x94", b"\xf4\x8f\xbf", b"\xf7\xbf\xbf\xbf")
print(
 "ordered strings",
 connection.execute(
 "SELECT value FROM CARRAY(?) ORDER BY value",
 (apsw.carray(strings),),
 ).get
)
# We'll use the start parameter to skip entries
print(
 "ordered strings start=2",
 connection.execute(
 "SELECT value FROM CARRAY(?) ORDER BY value",
 (apsw.carray(strings, start=2),),
 ).get
)
# Find the longest blob
print(
 "longest blob",
 connection.execute(
 "SELECT value FROM CARRAY(?) ORDER BY LENGTH(value) DESC LIMIT 1",
 (apsw.carray(blobs),),
 ).get
)
ordered integers [1, 62, 73, 9457]
ordered strings ['four', 'one', 'three', 'two', 'zero']
ordered strings start=2 ['four', 'three', 'two']
longest blob b'\xf7\xbf\xbf\xbf'

Cleanup

As a general rule you do not need to do any cleanup. Standard Python garbage collection will take of everything. Even if the process crashes with a connection in the middle of a transaction, the next time SQLite opens that database it will automatically rollback the incomplete transaction.

# You can close connections manually
connection.close()