Example/Tour
This code demonstrates usage of APSW. It gives you a good overview of all the things that can be done. Also included is output so you can see what gets printed when you run the code.
There are also specific examples in the classes, functions, and attribute documentation.
#!/usr/bin/env python3 # This code uses Python's optional typing annotations. You can # ignore them and do not need to use them. If you do use them # then you must include this future annotations line first. from__future__import annotations fromtypingimport Optional, Iterator, Any importos importsys importtime importdatetime importapsw importapsw.ext importrandom importre importcontextlib frompathlibimport Path # pretty formatting frompprintimport pprint
Checking APSW and SQLite versions
# Where the extension module is on the filesystem print(" Using APSW file", apsw.__file__) # From the extension print(" APSW version", apsw.apsw_version()) # From the sqlite header file at APSW compile time print("SQLite header version", apsw.SQLITE_VERSION_NUMBER) # The SQLite code running print(" SQLite lib version", apsw.sqlite_lib_version()) # If True then SQLite is incorporated into the extension. # If False then a shared library is being used, or static linking print(" Using amalgamation", apsw.using_amalgamation)
Using APSW file /space/apsw/apsw/__init__.cpython-314-x86_64-linux-gnu.so APSW version 3.51.1.0 SQLite header version 3051001 SQLite lib version 3.51.1 Using amalgamation True
Best Practice
Ensure SQLite usage prevents common mistakes, and gets best performance via apsw.bestpractice
importapsw.bestpractice apsw.bestpractice.apply(apsw.bestpractice.recommended)
Logging
It is a good idea to get SQLite’s logs as you will get more
information about errors. Best practice also includes this.
apsw.ext.log_sqlite() forwards SQLite’s log messages to the
logging module.
apsw.ext.log_sqlite() # You can also write to SQLite's log apsw.log(apsw.SQLITE_ERROR, "A message from Python")
Opening the database
You open the database by using Connection
# Default will create the database if it doesn't exist connection = apsw.Connection("dbfile") # Open existing read-only connection = apsw.Connection( "dbfile", flags=apsw.SQLITE_OPEN_READONLY ) # Open existing read-write (exception if it doesn't exist) connection = apsw.Connection( "dbfile", flags=apsw.SQLITE_OPEN_READWRITE )
Executing SQL
Use Connection.execute() to execute SQL
connection.execute("create table point(x,y,z)") connection.execute("insert into point values(1, 2, 3)") # You can use multiple ; separated statements connection.execute( """ insert into point values(4, 5, 6); create table log(timestamp, event); create table foo(a, b, c); create table important(secret, data); """ ) # read rows for row in connection.execute("select * from point"): print(row)
(1, 2, 3) (4, 5, 6)
Why you use bindings to provide values
It is tempting to compose strings with the values in them, but it is easy to mangle the query especially if values contain punctuation and unicode. It is known as SQL injection. Bindings are the correct way to supply values to queries.
# a simple value event = "system started" # DO NOT DO THIS query = f"insert into log values(0, '{event}')" print("query:", query) # BECAUSE ... a bad guy could provide a value like this event = "bad guy here') ; drop table important; -- comment" # which has effects like this query = f"insert into log values(0, '{event}')" print("bad guy:", query)
query: insert into log values(0, 'system started') bad guy: insert into log values(0, 'bad guy here') ; drop table important; -- comment')
Bindings (sequence)
Bindings can be provided as a sequence such as with a tuple or list. Use ? to show where the values go.
query = "insert into log values(?, ?)" data = (7, "transmission started") connection.execute(query, data) # You can also use numbers after the ? to select # values from the sequence. Note that numbering # starts at 1 query = "select ?1, ?3, ?2" data = ("alpha", "beta", "gamma") for row in connection.execute(query, data): print(row)
('alpha', 'gamma', 'beta')
Bindings (dict)
You can also supply bindings with a dictionary. Use :NAME, @NAME, or $NAME, to provide the key name in the query. Names are case sensitive.
query = "insert into point values(:x, @Y, $z)" data = {"x": 7, "Y": 8, "z": 9} connection.execute(query, data)
Transactions
By default each statement is its own transaction. A transaction finishes by flushing data to storage and waiting for the operating system to confirm it is permanently there (ie will survive a power failure) which takes a while.
# 3 separate transactions connection.execute("insert into point values(2, 2, 2)") connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") # You can use BEGIN / COMMIT to manually make a transaction connection.execute("BEGIN") connection.execute("insert into point values(2, 2, 2)") connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") connection.execute("COMMIT") # Or use `with` that does it automatically with connection: connection.execute("insert into point values(2, 2, 2)") connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)") # Nested transactions are supported with connection: connection.execute("insert into point values(2, 2, 2)") with connection: connection.execute("insert into point values(3, 3, 3)") connection.execute("insert into point values(4, 4, 4)")
executemany
You can execute the same SQL against a sequence using
Connection.executemany()
data = ( (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), ) query = "insert into point values(?,?,?)" # we do it in a transaction with connection: # the query is run for each item in data connection.executemany(query, data)
Pragmas
SQLite has a wide variety of pragmas to control the database configuration and library behaviour. See the Tips for maintaining your schema.
# WAL mode is good for write performance connection.pragma("journal_mode", "wal") # Foreign keys are off by default, so turn them on connection.pragma("foreign_keys", True) # You can use this to see if any other connection (including other processes) has # changed the database connection.pragma("data_version") # Useful at startup to detect some database corruption check = connection.pragma("integrity_check") if check != "ok": print("Integrity check errors", check)
Tracing execution
You can trace execution of SQL statements and their bindings. This involves code changes and is described in more detail here.
There are simpler convenient mechanisms for individual statement tracing, summarising a block of code, and SQLite’s interface which is used by them.
defmy_tracer( cursor: apsw.Cursor, statement: str, bindings: Optional[apsw.Bindings], ) -> bool: "Called just before executing each statement" print("SQL:", statement.strip()) print("Bindings:", bindings) return True # if you return False then execution is aborted # you can trace a single cursor cursor = connection.cursor() cursor.exec_trace = my_tracer cursor.execute( """ drop table if exists bar; create table bar(x,y,z); select * from point where x=?; """, (3,), ) # if set on a connection then all cursors are traced connection.exec_trace = my_tracer # and clearing it connection.exec_trace = None
SQL: drop table if exists bar; Bindings: () SQL: create table bar(x,y,z); Bindings: () SQL: select * from point where x=?; Bindings: (3,)
Tracing returned rows
You can trace returned rows, including modifying what is returned or skipping it completely. See more about tracing.
defrow_tracer( cursor: apsw.Cursor, row: apsw.SQLiteValues ) -> apsw.SQLiteValues: """Called with each row of results before they are handed off. You can return None to cause the row to be skipped or a different set of values to return""" print("Row:", row) return row # you can trace a single cursor cursor = connection.cursor() cursor.row_trace = row_tracer for row in cursor.execute("select x,y from point where x>4"): pass # if set on a connection then all cursors are traced connection.row_trace = row_tracer # and clearing it connection.row_trace = None
Row: (7, 8) Row: (5, 5)
Defining scalar functions
Scalar functions take one or more values and return one value. They
are registered by calling Connection.create_scalar_function().
defilove7(*args: apsw.SQLiteValue) -> int: "A scalar function" print(f"ilove7 got {args} but I love 7") return 7 connection.create_scalar_function("seven", ilove7) for row in connection.execute( "select seven(x,y) from point where x>4" ): print("row", row)
ilove7 got (7, 8) but I love 7 row (7,) ilove7 got (5, 5) but I love 7 row (7,)
Defining aggregate functions
Aggregate functions are called multiple times with matching rows,
and then provide a final value. An example is calculating an
average. They are registered by calling
Connection.create_aggregate_function().
classlongest: # Find which value when represented as a string is # the longest def__init__(self) -> None: self.longest = "" defstep(self, *args: apsw.SQLiteValue) -> None: # Called with each matching row for arg in args: if len(str(arg)) > len(self.longest): self.longest = str(arg) deffinal(self) -> str: # Called at the very end return self.longest connection.create_aggregate_function("longest", longest) print(connection.execute("select longest(event) from log").get)
transmission started
Defining window functions
Window functions input values come from a "window" around a row of interest. Four methods are called as the window moves to add, remove, get the current value, and finalize.
An example is calculating an average of values in the window to
compare to the row. They are registered by calling
Connection.create_window_function().
This is the Python equivalent to the C based example in the SQLite documentation
classSumInt: def__init__(self): self.v = 0 defstep(self, arg): print("step", arg) self.v += arg definverse(self, arg): print("inverse", arg) self.v -= arg deffinal(self): print("final", self.v) return self.v defvalue(self): print("value", self.v) return self.v connection.create_window_function("sumint", SumInt) for row in connection.execute( """ CREATE TABLE t3(x, y); INSERT INTO t3 VALUES('a', 4), ('b', 5), ('c', 3), ('d', 8), ('e', 1); -- Use the window function SELECT x, sumint(y) OVER ( ORDER BY x ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) AS sum_y FROM t3 ORDER BY x; """ ): print("ROW", row)
step 4 step 5 value 9 ROW ('a', 9) step 3 value 12 ROW ('b', 12) inverse 4 step 8 value 16 ROW ('c', 16) inverse 5 step 1 value 12 ROW ('d', 12) inverse 3 value 9 ROW ('e', 9) final 9
Defining collations (sorting)
How you sort can depend on the languages or values involved. You
register a collation by calling Connection.create_collation().
# This example sorting mechanisms understands some text followed by a # number and ensures the number portion gets sorted correctly connection.execute("create table names(name)") connection.executemany( "insert into names values(?)", ( ("file1",), ("file7",), ("file17",), ("file20",), ("file3",), ), ) print("Standard sorting") for row in connection.execute("select * from names order by name"): print(row) defstr_num_collate( s1: apsw.SQLiteValue, s2: apsw.SQLiteValue ) -> int: # return -1 if s1<s2, +1 if s1>s2 else 0 for equal defparts(s: str) -> list: "Converts str into list of alternating str and int parts" return [ int(v) if v.isdigit() else v for v in re.split(r"(\d+)", s) ] ps1 = parts(str(s1)) ps2 = parts(str(s2)) # compare if ps1 < ps2: return -1 if ps1 > ps2: return 1 return 0 connection.create_collation("strnum", str_num_collate) print("\nUsing strnum") for row in connection.execute( "select * from names order by name collate strnum" ): print(row)
Standard sorting ('file1',) ('file17',) ('file20',) ('file3',) ('file7',) Using strnum ('file1',) ('file3',) ('file7',) ('file17',) ('file20',)
Accessing results by column name
You can access results by column name using dataclasses.
APSW provides apsw.ext.DataClassRowFactory for names.
importapsw.ext connection.execute( """ create table books(id, title, author, year); insert into books values(7, 'Animal Farm', 'George Orwell', 1945); insert into books values(37, 'The Picture of Dorian Gray', 'Oscar Wilde', 1890); """ ) # Normally you use column numbers for row in connection.execute( "select title, id, year from books where author=?", ("Oscar Wilde",), ): # this is very fragile print("title", row[0]) print("id", row[1]) print("year", row[2]) # Turn on dataclasses - frozen makes them read-only connection.row_trace = apsw.ext.DataClassRowFactory( dataclass_kwargs={"frozen": True} ) print("\nNow with dataclasses\n") # Same query - note using AS to set column name for row in connection.execute( """SELECT title, id AS book_id, year AS book_year FROM books WHERE author = ?""", ("Oscar Wilde",), ): print("title", row.title) print("id", row.book_id) print("year", row.book_year) # clear connection.row_trace = None
title The Picture of Dorian Gray id 37 year 1890 Now with dataclasses title The Picture of Dorian Gray id 37 year 1890
Type conversion into/out of database
You can use apsw.ext.TypesConverterCursorFactory to do
conversion, both for types you define and for other types.
importapsw.ext registrar = apsw.ext.TypesConverterCursorFactory() connection.cursor_factory = registrar # A type we define - deriving from SQLiteTypeAdapter automatically registers conversion # to a SQLite value classPoint(apsw.ext.SQLiteTypeAdapter): def__init__(self, x, y): self.x = x self.y = y def__repr__(self) -> str: return f"Point({self.x}, {self.y})" def__eq__(self, other: object) -> bool: return ( isinstance(other, type(self)) and self.x == other.x and self.y == other.y ) defto_sqlite_value(self) -> str: # called to convert Point into something SQLite supports return f"{self.x};{self.y}" # This converter will be registered @classmethod defconvert_from_sqlite(cls, value: str) -> Point: return cls(*(float(part) for part in value.split(";"))) # Existing types defcomplex_to_sqlite_value(c: complex) -> str: return f"{c.real}+{c.imag}" defdatetime_to_sqlite_value(dt: datetime.datetime) -> float: # Represent as floating point UTC value no matter # what timezone is used. Also consider other # formats like ISO8601. return dt.timestamp() # ... require manual registration registrar.register_adapter(complex, complex_to_sqlite_value) registrar.register_adapter( datetime.datetime, datetime_to_sqlite_value ) # conversion from a SQLite value requires registration registrar.register_converter("POINT", Point.convert_from_sqlite) # ... and for stdlib types defsqlite_to_complex(v: str) -> complex: return complex(*(float(part) for part in v.split("+"))) defsqlite_to_datetime(v: float) -> datetime.datetime: # Keep the UTC values coming back from the database # as UTC return datetime.datetime.fromtimestamp(v, datetime.timezone.utc) registrar.register_converter("COMPLEX", sqlite_to_complex) registrar.register_converter("TIMESTAMP", sqlite_to_datetime) # note that the type names are case sensitive and must match the # registration connection.execute( "create table conversion(p POINT, c COMPLEX, t TIMESTAMP)" ) # convert going into database test_data = (Point(5.2, 7.6), 3 + 4j, datetime.datetime.now()) connection.execute( "insert into conversion values(?, ?, ?)", test_data ) print("inserted", test_data) # and coming back out print("querying data") for row in connection.execute("select * from conversion"): for i, value in enumerate(row): print(f"column {i} = {value!r}") # clear registrar connection.cursor_factory = apsw.Cursor
inserted (Point(5.2, 7.6), (3+4j), datetime.datetime(2025, 11, 28, 10, 45, 51, 313381)) querying data column 0 = Point(5.2, 7.6) column 1 = (3+4j) column 2 = datetime.datetime(2025, 11, 28, 18, 45, 51, 313381, tzinfo=datetime.timezone.utc)
Runtime Python objects
While only 5 types can be stored, you can pass any Python objects to and from your functions at runtime.
# Python set which isn't a supported SQLite type # containing items like a complex number and stdout which # definitely aren't SQLite compatible py_value = {1, 2, "three", 4 + 5j, sys.stdout} # Trying to pass it as a value gives TypeError try: print(connection.execute("select ?", (py_value,)).get) except TypeError as exc: print(exc) # Now wrap it and it works print( "select ?", connection.execute("select ?", (apsw.pyobject(py_value),)).get, ) # It is still null at the SQL level print( "select typeof(?)", connection.execute( "select typeof(?)", (apsw.pyobject(py_value),) ).get, ) # Lets make a set which SQLite knows nothing about defmake_set(*args): print(f"make_set got {args!r}") # this will return a set, so we also need to mark it return apsw.pyobject(set(args)) connection.create_scalar_function("make_set", make_set) print( "select make_set(?, ?, ?)", connection.execute( "select make_set(?, ?, ?)", ( # these aren't SQLite types apsw.pyobject(3 + 4j), apsw.pyobject(sys.stdin), # but a string is "hello", ), ).get, )
Bad binding argument type supplied - argument #1: type set select ? {1, 2, <_io.StringIO object at 0x753729983c70>, 'three', (4+5j)} select typeof(?) null make_set got ((3+4j), <_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, 'hello') select make_set(?, ?, ?) {'hello', <_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>, (3+4j)}
Query limiting
apsw.ext.query_limit() limits rows and time in a block
across all the queries within the block
importapsw.ext # Use this to make many (virtual) rows apsw.ext.make_virtual_module( connection, "generate_series", apsw.ext.generate_series ) rows = [] with apsw.ext.query_limit(connection, row_limit=20): # 11 rows will come from this for (number,) in connection.execute( "select * from generate_series(0, 10)" ): rows.append(number) # next query would be 1,000 but we will hit # the limit for (number,) in connection.execute( "select * from generate_series(0, 999)" ): rows.append(number) # lets see what we got print(f"{len(rows)=}") # We can also time limit start = time.monotonic() with apsw.ext.query_limit(connection, timeout=0.2): for (number,) in connection.execute( "select * from generate_series(0, 1000000000)" ): pass print( f"After {time.monotonic()-start:.3f} seconds, we hit {number=}" ) # We used the default "no exception" exception. Lets have an explicit exception. # with both row and time limits ... try: with apsw.ext.query_limit( connection, row_limit=1000, timeout=1000, row_exception=IndexError, timeout_exception=TimeoutError, ): for (number,) in connection.execute( "select * from generate_series(0, 1000000000)" ): pass except Exception as exc: print(f"{exc=}")
len(rows)=20 After 0.200 seconds, we hit number=307329 exc=IndexError('query row limit hit')
Query details
apsw.ext.query_info() can provide a lot of information about a
query (without running it)
importapsw.ext # test tables connection.execute( """ create table customers( id INTEGER PRIMARY KEY, name CHAR, address CHAR); create table orders( id INTEGER PRIMARY KEY, customer_id INTEGER, item MY_OWN_TYPE); create index cust_addr on customers(address); """ ) query = """ SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = ?; SELECT 7;""" # ask for all information available qd = apsw.ext.query_info( connection, query, actions=True, # which tables/views etc and how they are accessed explain=True, # shows low level VDBE explain_query_plan=True, # how SQLite solves the query ) print("query", qd.query) print("\nbindings_count", qd.bindings_count) print("\nbindings_names", qd.bindings_names) print("\nexpanded_sql", qd.expanded_sql) print("\nfirst_query", qd.first_query) print("\nquery_remaining", qd.query_remaining) print("\nis_explain", qd.is_explain) print("\nis_readonly", qd.is_readonly) print("\ndescription") pprint(qd.description) if hasattr(qd, "description_full"): print("\ndescription_full") pprint(qd.description_full) print("\nquery_plan") pprint(qd.query_plan) print("\nFirst 5 actions") pprint(qd.actions[:5]) print("\nFirst 5 explain") pprint(qd.explain[:5])
query SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = ?; SELECT 7; bindings_count 1 bindings_names (None,) expanded_sql None first_query SELECT * FROM orders JOIN customers ON orders.customer_id=customers.id WHERE address = ?; query_remaining SELECT 7; is_explain 0 is_readonly True description (('id', 'INTEGER'), ('customer_id', 'INTEGER'), ('item', 'MY_OWN_TYPE'), ('id', 'INTEGER'), ('name', 'CHAR'), ('address', 'CHAR')) description_full (('id', 'INTEGER', 'main', 'orders', 'id'), ('customer_id', 'INTEGER', 'main', 'orders', 'customer_id'), ('item', 'MY_OWN_TYPE', 'main', 'orders', 'item'), ('id', 'INTEGER', 'main', 'customers', 'id'), ('name', 'CHAR', 'main', 'customers', 'name'), ('address', 'CHAR', 'main', 'customers', 'address')) query_plan QueryPlan(detail='QUERY PLAN', sub=[QueryPlan(detail='SCAN orders', sub=None), QueryPlan(detail='SEARCH customers USING INTEGER PRIMARY KEY ' '(rowid=?)', sub=None)]) First 5 actions [QueryAction(action=21, action_name='SQLITE_SELECT', column_name=None, database_name=None, file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name=None, trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='id', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='orders', trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='customer_id', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='orders', trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='item', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='orders', trigger_name=None, trigger_or_view=None, view_name=None), QueryAction(action=20, action_name='SQLITE_READ', column_name='id', database_name='main', file_name=None, function_name=None, module_name=None, operation=None, pragma_name=None, pragma_value=None, table_name='customers', trigger_name=None, trigger_or_view=None, view_name=None)] First 5 explain [VDBEInstruction(addr=0, opcode='Init', comment=None, p1=0, p2=17, p3=0, p4=None, p5=0), VDBEInstruction(addr=1, opcode='OpenRead', comment=None, p1=0, p2=12, p3=0, p4='3', p5=0), VDBEInstruction(addr=2, opcode='OpenRead', comment=None, p1=1, p2=11, p3=0, p4='3', p5=0), VDBEInstruction(addr=3, opcode='Rewind', comment=None, p1=0, p2=16, p3=0, p4=None, p5=0), VDBEInstruction(addr=4, opcode='Column', comment=None, p1=0, p2=1, p3=1, p4=None, p5=0)]
Blob I/O
BLOBS (binary large objects) are supported by SQLite. Note that you
cannot change the size of one, but you can allocate one filled with
zeroes, and then later open it and read / write the contents similar
to a file, without having the entire blob in memory. Use
Connection.blob_open() to open a blob.
connection.execute("create table blobby(x,y)") # Add a blob we will fill in later connection.execute("insert into blobby values(1, zeroblob(10000))") # Or as a binding connection.execute( "insert into blobby values(2, ?)", (apsw.zeroblob(20000),) ) # Open a blob for writing. We need to know the rowid rowid = connection.execute("select ROWID from blobby where x=1").get blob = connection.blob_open("main", "blobby", "y", rowid, True) blob.write(b"hello world") blob.seek(2000) blob.read(24) # seek relative to the end blob.seek(-32, 2) blob.write(b"hello world, again") blob.close()
Backup an open database
You can backup a database that is open. The pages are copied in batches of your choosing and allow continued use of the source database.
# We will copy a disk database into this memory database destination = apsw.Connection(":memory:") # Copy into destination with destination.backup("main", connection, "main") as backup: # The source database can change while doing the backup # and the backup will still pick up those changes while not backup.done: backup.step(7) # copy up to 7 pages each time # monitor progress print(backup.remaining, backup.page_count)
15 22 8 22 1 22 0 22
Progress handler
Some operations (eg joins, sorting) can take many operations to
complete. Register a progress handler callback with
Connection.set_progress_handler() which lets you provide
feedback and allows cancelling.
# create a table with random numbers with connection: connection.execute("create table numbers(x)") connection.executemany( "insert into numbers values(?)", ((random.randint(0, 9999999999),) for _ in range(100)), ) defprogress_handler() -> bool: print("progress handler called") return False # returning True aborts # register handler every 50 vdbe instructions connection.set_progress_handler(progress_handler, 50) # Sorting the numbers to find the biggest for max_num in connection.execute("select max(x) from numbers"): print(max_num) # Clear handler connection.set_progress_handler(None)
progress handler called progress handler called progress handler called progress handler called progress handler called progress handler called progress handler called progress handler called (9923950222,)
File Control
We can get/set low level information using the
Connection.file_control() interface. In this example we get
the data version.
There is a pragma but it
doesn’t change for commits on the same connection.
# We use ctypes to provide the correct C level data types and pointers importctypes defget_data_version(db): # unsigned 32 bit integer data_version = ctypes.c_uint32(0) ok = db.file_control( "main", # or an attached database name apsw.SQLITE_FCNTL_DATA_VERSION, # code ctypes.addressof(data_version), ) # pass C level pointer assert ok, "SQLITE_FCNTL_DATA_VERSION was not understood!" return data_version.value # Show starting values print( "fcntl", get_data_version(connection), "pragma", connection.pragma("data_version"), ) # See the fcntl value versus pragma value for sql in ( "create table fcntl_example(x)", "begin ; insert into fcntl_example values(3)", # we can see the version doesn't change inside a transaction "insert into fcntl_example values(4)", "commit", "pragma user_version=1234", ): print(sql) connection.execute(sql) print( "fcntl", get_data_version(connection), "pragma", connection.pragma("data_version"), )
fcntl 40 pragma 2 create table fcntl_example(x) fcntl 41 pragma 2 begin ; insert into fcntl_example values(3) fcntl 41 pragma 2 insert into fcntl_example values(4) fcntl 41 pragma 2 commit fcntl 42 pragma 2 pragma user_version=1234 fcntl 43 pragma 2
Commit hook
A commit hook can allow or veto commits. Register a commit hook
with Connection.set_commit_hook().
defmy_commit_hook() -> bool: print("in commit hook") hour = time.localtime()[3] if hour >= 8 and hour < 18: print("commits okay at this time") return False # let commit go ahead print("no commits out of hours") return True # abort commits outside of 8am through 6pm connection.set_commit_hook(my_commit_hook) try: with connection: connection.execute( """create table example(x,y,z); insert into example values (3,4,5)""" ) except apsw.ConstraintError as exc: print("commit was not allowed") connection.set_commit_hook(None)
in commit hook commits okay at this time
Update hook
Update hooks let you know that data has been added, changed, or
removed. For example you could use this to discard cached
information. Register a hook using
Connection.set_update_hook().
defmy_update_hook( type: int, db_name: str, table_name: str, rowid: int ) -> None: op: str = apsw.mapping_authorizer_function[type] print( f"Updated: {op} db {db_name}, table {table_name}, rowid {rowid}" ) connection.set_update_hook(my_update_hook) connection.execute("insert into names values(?)", ("file93",)) connection.execute( "update names set name=? where name=?", ("file94", "file93") ) connection.execute("delete from names where name=?", ("file94",)) # Clear the hook connection.set_update_hook(None)
Updated: SQLITE_INSERT db main, table names, rowid 7 Updated: SQLITE_UPDATE db main, table names, rowid 7 Updated: SQLITE_DELETE db main, table names, rowid 7
Virtual tables
Virtual tables let you provide data on demand as a SQLite table so you can use SQL queries against that data. Writing your own virtual table requires understanding how to return less than all the data via the BestIndex method.
You can export a Python function as a virtual table in 3 lines of
code using apsw.ext.make_virtual_module(), being able to
provide both positional and keyword arguments.
For the first example you’ll find apsw.ext.generate_series()
useful instead.
# Yield a row at a time deftable_range(start=1, stop=100, step=1): for i in range(start, stop + 1, step): yield (i,) # set column names table_range.columns = ("value",) # set how to access what table_range returns table_range.column_access = apsw.ext.VTColumnAccess.By_Index # register it apsw.ext.make_virtual_module(connection, "range", table_range) # see it work. we can provide both positional and keyword # arguments query = "SELECT * FROM range(90) WHERE step=2" print(apsw.ext.format_query_table(connection, query)) # the parameters are hidden columns so '*' doesn't select them # but you can ask query = "SELECT *, start, stop, step FROM range(89) WHERE step=3" print(apsw.ext.format_query_table(connection, query)) # Expose the unicode database. importunicodedata # A more complex example exporting unicodedata module # The methods we will call on each codepoint unicode_methods = ( "name", "decimal", "digit", "numeric", "category", "combining", "bidirectional", "east_asian_width", "mirrored", "decomposition", ) # the function we will turn into a virtual table returning # each row as a dict defunicode_data(start=0, stop=sys.maxunicode): # some methods raise ValueError on some codepoints defcall(meth: str, c: str): try: return getattr(unicodedata, meth)(c) except ValueError: return None for c in range(start, stop + 1): yield {k: call(k, chr(c)) for k in unicode_methods} # setup column names and access unicode_data.columns = unicode_methods unicode_data.column_access = apsw.ext.VTColumnAccess.By_Name # register apsw.ext.make_virtual_module(connection, "unicode_data", unicode_data) # how many codepoints are in each category? query = """ SELECT count(*), category FROM unicode_data WHERE stop = 0xffff -- BMP only GROUP BY category ORDER BY category LIMIT 10""" print(apsw.ext.format_query_table(connection, query)) # A more complex example - given a list of directories return information # about the files within them recursively defget_files_info( directories: str, sep: str = os.pathsep, *, ignore_symlinks: bool = True, ) -> Iterator[dict[str, Any]]: for root in directories.split(sep): with os.scandir(root) as sd: for entry in sd: if entry.is_symlink() and ignore_symlinks: continue if entry.is_dir(): yield from get_files_info( os.path.join(root, entry.name), ignore_symlinks=ignore_symlinks, ) elif entry.is_file(): s = entry.stat() yield { "directory": root, "name": entry.name, "extension": os.path.splitext(entry.name)[1], **{ k: getattr(s, k) for k in get_files_info.stat_columns }, } # which stat columns do we want? get_files_info.stat_columns = tuple( n for n in dir(os.stat(".")) if n.startswith("st_") ) # setup columns and access by providing an example of the first entry returned ( get_files_info.columns, get_files_info.column_access, ) = apsw.ext.get_column_names(next(get_files_info("."))) apsw.ext.make_virtual_module(connection, "files_info", get_files_info) # all the sys.path directories bindings = ( os.pathsep.join( p for p in sys.path if os.path.isdir(p) # except our current one and not os.path.samefile(p, ".") ), ) # Find the 3 biggest files that aren't libraries query = """SELECT st_size, directory, name FROM files_info(?) WHERE extension NOT IN ('.a', '.so') ORDER BY st_size DESC LIMIT 3""" print(apsw.ext.format_query_table(connection, query, bindings)) # Find the 3 oldest Python files query = """SELECT DATE(st_ctime, 'auto') AS date, directory, name FROM files_info(?) WHERE extension='.py' ORDER BY st_size DESC LIMIT 3""" print(apsw.ext.format_query_table(connection, query, bindings)) # find space used by filename extension query = """SELECT extension, SUM(st_size) as total_size FROM files_info(?) GROUP BY extension ORDER BY extension""" print(apsw.ext.format_query_table(connection, query, bindings)) # unregister a virtual table by passing None connection.create_module("files_info", None)
┌───────┐ │ value │ │ 90 │ │ 92 │ │ 94 │ │ 96 │ │ 98 │ │ 100 │ └───────┘ ┌───────┬───────┬──────┬──────┐ │ value │ start │ stop │ step │ │ 89 │ 89 │ 100 │ 3 │ │ 92 │ 89 │ 100 │ 3 │ │ 95 │ 89 │ 100 │ 3 │ │ 98 │ 89 │ 100 │ 3 │ └───────┴───────┴──────┴──────┘ ┌──────────┬──────────┐ │ count(*) │ category │ │ 65 │ Cc │ │ 43 │ Cf │ │ 1432 │ Cn │ │ 6400 │ Co │ │ 2048 │ Cs │ │ 1448 │ Ll │ │ 236 │ Lm │ │ 46126 │ Lo │ │ 31 │ Lt │ │ 1132 │ Lu │ └──────────┴──────────┘ ┌─────────┬────────────────────────────────────────────┬───────────────────────┐ │ st_size │ directory │ name │ ├─────────┼────────────────────────────────────────────┼───────────────────────┤ │ 558634 │ /usr/lib/python3.14/pydoc_data/__pycache__ │ topics.cpython-314.p- │ │ │ │ yc │ ├─────────┼────────────────────────────────────────────┼───────────────────────┤ │ 558057 │ /usr/lib/python3.14/pydoc_data │ topics.py │ ├─────────┼────────────────────────────────────────────┼───────────────────────┤ │ 229038 │ /usr/lib/python3.14 │ _pydecimal.py │ └─────────┴────────────────────────────────────────────┴───────────────────────┘ ┌────────────┬────────────────────────────────┬───────────────┐ │ date │ directory │ name │ │ 2025年10月31日 │ /usr/lib/python3.14/pydoc_data │ topics.py │ │ 2025年10月31日 │ /usr/lib/python3.14 │ _pydecimal.py │ │ 2025年10月31日 │ /usr/lib/python3.14 │ turtle.py │ └────────────┴────────────────────────────────┴───────────────┘ ┌────────────┬────────────┐ │ extension │ total_size │ │ │ 259613 │ │ .a │ 25842524 │ │ .allowlist │ 56 │ │ .bootstrap │ 1063 │ │ .c │ 5406 │ │ .cfg │ 341 │ │ .csh │ 937 │ │ .css │ 1325 │ │ .fish │ 2208 │ │ .in │ 1444 │ │ .ini │ 2037 │ │ .json │ 60181 │ │ .local │ 676 │ │ .o │ 11096 │ │ .patch │ 2682 │ │ .ps1 │ 9031 │ │ .py │ 11249525 │ │ .pyc │ 12223522 │ │ .rst │ 9561 │ │ .sh │ 2752 │ │ .so │ 8745952 │ │ .sql │ 9749 │ │ .stdlib │ 7244 │ │ .supp │ 70 │ │ .txt │ 13804 │ └────────────┴────────────┘
VFS - Virtual File System
VFS lets you control how SQLite accesses storage. APSW makes it easy to "inherit" from an existing VFS and monitor or alter data as it flows through.
URI are shown as a way to receive parameters
when opening/creating a database file, and pragmas
for receiving parameters once a database is open.
# This example VFS obfuscates the database file contents by xor all # bytes with 0xa5. defobfuscate(data: bytes): return bytes([x ^ 0xA5 for x in data]) # Inheriting from a base of "" means the default vfs classObfuscatedVFS(apsw.VFS): def__init__(self, vfsname="obfuscated", basevfs=""): self.vfs_name = vfsname self.base_vfs = basevfs super().__init__(self.vfs_name, self.base_vfs) # We want to return our own file implementation, but also # want it to inherit defxOpen(self, name, flags): in_flags = [] for k, v in apsw.mapping_open_flags.items(): if isinstance(k, int) and flags[0] & k: in_flags.append(v) print("xOpen flags", " | ".join(in_flags)) if isinstance(name, apsw.URIFilename): print(" uri filename", name.filename()) # We can look at uri parameters print(" fast is", name.uri_parameter("fast")) print(" level is", name.uri_int("level", 3)) print(" warp is", name.uri_boolean("warp", False)) print( " notpresent is", name.uri_parameter("notpresent") ) # all of them print(" all uris", name.parameters) else: print(" filename", name) return ObfuscatedVFSFile(self.base_vfs, name, flags) # The file implementation where we override xRead and xWrite to call our # encryption routine classObfuscatedVFSFile(apsw.VFSFile): def__init__(self, inheritfromvfsname, filename, flags): super().__init__(inheritfromvfsname, filename, flags) defxRead(self, amount, offset): return obfuscate(super().xRead(amount, offset)) defxWrite(self, data, offset): super().xWrite(obfuscate(data), offset) defxFileControl(self, op: int, ptr: int) -> bool: if op != apsw.SQLITE_FCNTL_PRAGMA: return super().xFileControl(op, ptr) # implement our own pragma p = apsw.VFSFcntlPragma(ptr) print(f"pragma received {p.name} = {p.value}") # what do we understand? if p.name == "my_custom_pragma": p.result = "orange" return True # We did not understand return False # To register the VFS we just instantiate it obfuvfs = ObfuscatedVFS() # Lets see what vfs are now available? print("VFS available", apsw.vfs_names()) # Make an obfuscated db, passing in some URI parameters # default open flags open_flags = apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE # add in using URI parameters open_flags |= apsw.SQLITE_OPEN_URI # uri parameters are after the ? separated by & obfudb = apsw.Connection( "file:myobfudb?fast=speed&level=7&warp=on&another=true", flags=open_flags, vfs=obfuvfs.vfs_name, ) # Check it works obfudb.execute("create table foo(x,y); insert into foo values(1,2)") # Check it really is obfuscated on disk print("What is on disk", repr(Path("myobfudb").read_bytes()[:20])) # And unobfuscating it print( "Unobfuscated disk", repr(obfuscate(Path("myobfudb").read_bytes()[:20])), ) # Custom pragma print( "pragma returned", obfudb.pragma("my_custom_pragma", "my value") ) # Tidy up obfudb.close() os.remove("myobfudb")
VFS available ['unix', 'obfuscated', 'memdb', 'unix-excl', 'unix-dotfile', 'unix-none'] xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_MAIN_DB | SQLITE_OPEN_READWRITE | SQLITE_OPEN_URI uri filename /space/apsw/myobfudb fast is speed level is 7 warp is True notpresent is None all uris ('fast', 'level', 'warp', 'another') pragma received journal_mode = wal xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_MAIN_JOURNAL | SQLITE_OPEN_READWRITE filename /space/apsw/myobfudb-journal pragma received foreign_keys = ON pragma received optimize = 65538 xOpen flags SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | SQLITE_OPEN_WAL filename /space/apsw/myobfudb-wal pragma received recursive_triggers = ON What is on disk b'\xf6\xf4\xe9\xcc\xd1\xc0\x85\xc3\xca\xd7\xc8\xc4\xd1\x85\x96\xa5\xb5\xa5\xa7\xa7' Unobfuscated disk b'SQLite format 3\x00\x10\x00\x02\x02' pragma received my_custom_pragma = my value pragma returned orange
Limits
SQLite lets you see and update various limits via
Connection.limit()
# Print some limits for limit in ("LENGTH", "COLUMN", "ATTACHED"): name = "SQLITE_LIMIT_" + limit max_name = "SQLITE_MAX_" + limit # compile time limit orig = connection.limit(getattr(apsw, name)) print(name, orig) # To get the maximum, set to 0x7fffffff and then read value back connection.limit(getattr(apsw, name), 0x7FFFFFFF) max = connection.limit(getattr(apsw, name)) print(max_name, " ", max) # Set limit for size of a string connection.execute("create table testlimit(s)") connection.execute( "insert into testlimit values(?)", ("x" * 1024,) ) # 1024 char string connection.limit(apsw.SQLITE_LIMIT_LENGTH, 1023) # limit is now 1023 try: connection.execute( "insert into testlimit values(?)", ("y" * 1024,) ) print("string exceeding limit was inserted") except apsw.TooBigError: print("Caught toobig exception") # reset back to largest value connection.limit(apsw.SQLITE_LIMIT_LENGTH, 0x7FFFFFFF)
SQLITE_LIMIT_LENGTH 1000000000 SQLITE_MAX_LENGTH 1000000000 SQLITE_LIMIT_COLUMN 2000 SQLITE_MAX_COLUMN 2000 SQLITE_LIMIT_ATTACHED 125 SQLITE_MAX_ATTACHED 125 Caught toobig exception
Shell
APSW includes a shell like the one in SQLite, and is also extensible from Python.
importapsw.shell # Here we use the shell to do a csv export and then dump part of the # database # Export to a StringIO importio output = io.StringIO() shell = apsw.shell.Shell(stdout=output, db=connection) # How to execute a dot command shell.process_command(".mode csv") shell.process_command(".headers on") # How to execute SQL shell.process_sql( """ create table csvtest(column1, column2 INTEGER); create index faster on csvtest(column1); insert into csvtest values(3, 4); insert into csvtest values('a b', NULL); """ ) # Or let the shell figure out SQL vs dot command shell.process_complete_line("select * from csvtest") # see the result print(output.getvalue()) # reset output output.seek(0) # make a dump of the same table shell.process_command(".dump csvtest%") # see the result print("\nDump output\n") print(output.getvalue())
column1,column2 3,4 a b, Dump output -- SQLite dump (by APSW 3.51.1.0) -- SQLite version 3.51.1 -- Date: Fri Nov 28 10:45:51 2025 -- Tables like: csvtest% -- Database: /space/apsw/dbfile -- User: rogerb @ clamps -- The values of various per-database settings PRAGMA page_size=4096; -- PRAGMA encoding='UTF-8'; -- PRAGMA auto_vacuum=NONE; -- PRAGMA max_page_count=4294967294; BEGIN TRANSACTION; -- Table csvtest DROP TABLE IF EXISTS csvtest; CREATE TABLE csvtest(column1, column2 INTEGER); INSERT INTO csvtest VALUES(3,4); INSERT INTO csvtest VALUES('a b',NULL); -- Triggers and indices on csvtest CREATE INDEX faster on csvtest(column1); -- Database header pragma user_version=1234; COMMIT TRANSACTION;
Statistics
SQLite provides statistics by status(). Use Connection.status()
for per connection statistics.
current_usage, max_usage = apsw.status(apsw.SQLITE_STATUS_MEMORY_USED) print(f"SQLite memory usage {current_usage} max {max_usage}") schema_used, _ = connection.status(apsw.SQLITE_DBSTATUS_SCHEMA_USED) print(f"{schema_used} bytes used to store schema for this connection")
SQLite memory usage 522192 max 2499320 5360 bytes used to store schema for this connection
Tracing
This shows using Connection.trace_v2()
# From https://www.sqlite.org/lang_with.html # Outlandish Recursive Query Examples query = """WITH RECURSIVE xaxis(x) AS (VALUES(-2.0) UNION ALL SELECT x+0.05 FROM xaxis WHERE x<1.2), yaxis(y) AS (VALUES(-1.0) UNION ALL SELECT y+0.1 FROM yaxis WHERE y<1.0), m(iter, cx, cy, x, y) AS ( SELECT 0, x, y, 0.0, 0.0 FROM xaxis, yaxis UNION ALL SELECT iter+1, cx, cy, x*x-y*y + cx, 2.0*x*y + cy FROM m WHERE (x*x + y*y) < 4.0 AND iter<28 ), m2(iter, cx, cy) AS ( SELECT max(iter), cx, cy FROM m GROUP BY cx, cy ), a(t) AS ( SELECT group_concat( substr(' .+*#', 1+min(iter/7,4), 1), '') FROM m2 GROUP BY cy ) SELECT group_concat(rtrim(t),x'0a') FROM a;""" deftrace_hook(trace: dict) -> None: # check the sql and connection are as expected and remove from trace # so we don't print them assert ( trace.pop("sql", query) == query and trace.pop("connection") is connection ) print("code is ", apsw.mapping_trace_codes[trace["code"]]) pprint(trace) connection.trace_v2( apsw.SQLITE_TRACE_STMT | apsw.SQLITE_TRACE_PROFILE | apsw.SQLITE_TRACE_ROW, trace_hook, ) # We will get one each of the trace events for _ in connection.execute(query): pass # Turn off tracing connection.trace_v2(0, None)
code is SQLITE_TRACE_STMT {'code': 1, 'explain': 0, 'id': 393889112, 'readonly': True, 'total_changes': 146, 'trigger': False} code is SQLITE_TRACE_ROW {'code': 4, 'id': 393889112} code is SQLITE_TRACE_PROFILE {'code': 2, 'id': 393889112, 'nanoseconds': 17000000, 'stmt_status': {'SQLITE_STMTSTATUS_AUTOINDEX': 0, 'SQLITE_STMTSTATUS_FILTER_HIT': 0, 'SQLITE_STMTSTATUS_FILTER_MISS': 0, 'SQLITE_STMTSTATUS_FULLSCAN_STEP': 1365, 'SQLITE_STMTSTATUS_MEMUSED': 15784, 'SQLITE_STMTSTATUS_REPREPARE': 0, 'SQLITE_STMTSTATUS_RUN': 1, 'SQLITE_STMTSTATUS_SORT': 2, 'SQLITE_STMTSTATUS_VM_STEP': 1015353}, 'total_changes': 146}
System and SQLite resource usage in a block
Use apsw.ext.ShowResourceUsage() to see what resources a
block of code does. We use the same query from above.
Only statistics that have changed are shown in the summary. There are 21 SQLite values tracked including caching, and 20 system values.
with apsw.ext.ShowResourceUsage( sys.stdout, db=connection, scope="thread" ): # some SQLite work rows = connection.execute(query).get # and some non-SQLite work - the imports cause filesystem access importstatistics,tokenize,uuid,fractions,pydoc,decimal # and take some wall clock time time.sleep(1.3)
Total CPU consumption 0.024 Wall clock 1.326 Block input operations 8 Page faults with I/O 1 Maximum resident set size 1,380 Page faults - no I/O 299 Involuntary context switches 449 Voluntary context switches 2 Time in system mode 0.002 Time in user mode 0.022 SQLite full table scan 1,365 SQLite sort operations 2 SQLite vm operations 1,015,353 SQLite statements completed 1 SQLite allocations lookaside full 17,272
SQL statement tracing in a block
Use apsw.ext.Trace() to see SQL statements inside a block of
code. This also shows behind the scenes SQL.
# Use None instead of stdout and no information is printed or gathered with apsw.ext.Trace( sys.stdout, db=connection, vtable=True, updates=True, transaction=True, ): # APSW does a savepoint behind the scenes to wrap the block with connection: # Some regular SQL connection.execute("create table multi(x)") # executemany runs the same statement repeatedly connection.executemany( "insert into multi values(?)", ((x,) for x in range(5)) ) # See how many rows were processed connection.execute("select * from multi limit 2").fetchall() # You can also see how many rows were changed connection.execute("delete from multi where x < 4") # pragma functions are virtual tables - see how many rows this processes even # though only one has 'pow' connection.execute( "SELECT narg FROM pragma_function_list WHERE name='pow'" ).get # trigger that causes rollback connection.execute(""" create trigger error after insert on multi begin update multi set rowid=100+new.rowid where rowid=new.rowid; select raise(rollback, 'nope'); end; """) with contextlib.suppress(apsw.ConstraintError): connection.execute("insert into multi values(54)")
> BEGIN DEFERRED Time: 0.000 !BEGIN > create table multi(x) Time: 0.000 > insert into multi values(?) INS 1 (0) Time: 0.000 Changes: 1 > insert into multi values(?) INS 2 (1) Time: 0.000 Changes: 1 > insert into multi values(?) INS 3 (2) Time: 0.001 Changes: 1 > insert into multi values(?) INS 4 (3) Time: 0.000 Changes: 1 > insert into multi values(?) INS 5 (4) Time: 0.000 Changes: 1 > select * from multi limit 2 Time: 0.000 Rows: 2 > delete from multi where x < 4 DEL 1 (0) DEL 2 (1) DEL 3 (2) DEL 4 (3) Time: 0.000 Changes: 4 > COMMIT !COMMIT Time: 0.007 > SELECT narg FROM pragma_function_list WHERE name='pow' V PRAGMA function_list Time: 0.000 Rows: 215 VmStep: 1,509 Mem: 71.0KB < SELECT narg FROM pragma_function_list WHERE name='pow' Time: 0.000 Rows: 2 VmStep: 656 !BEGIN > create trigger error after insert on multi begin update ... !COMMIT Time: 0.002 !BEGIN > insert into multi values(54) INS 6 (54) T TRIGGER error UPD 6>106 (...) !ROLLBACK < insert into multi values(54) Time: 0.000 Rows: 1 Changes: 1
Formatting query results table
apsw.ext.format_query_table() makes it easy
to format the results of a query in an automatic
adjusting table, colour, sanitizing strings,
truncation etc.
# Create a table with some dummy data connection.execute( """CREATE TABLE dummy(quantity, [spaces in name], last); INSERT INTO dummy VALUES(3, 'some regular text to make this row interesting', x'030709'); INSERT INTO dummy VALUES(3.14, 'Tiếng Việt', null); INSERT INTO dummy VALUES('', ?, ' '); """, ("special \t\n\f0円 cha\\rs",), ) query = "SELECT * FROM dummy" # default print(apsw.ext.format_query_table(connection, query)) # no unicode boxes and maximum sanitize the text kwargs = {"use_unicode": False, "string_sanitize": 2} print(apsw.ext.format_query_table(connection, query, **kwargs)) # lets have unicode boxes and make things narrow kwargs = { "use_unicode": True, "string_sanitize": 0, "text_width": 30, } print(apsw.ext.format_query_table(connection, query, **kwargs)) # have the values in SQL syntax kwargs = {"quote": True} print(apsw.ext.format_query_table(connection, query, **kwargs))
┌──────────┬────────────────────────────────────────────────┬─────────────┐ │ quantity │ spaces in name │ last │ ├──────────┼────────────────────────────────────────────────┼─────────────┤ │ 3 │ some regular text to make this row interesting │ [ 3 bytes ] │ ├──────────┼────────────────────────────────────────────────┼─────────────┤ │ 3.14 │ Tiếng Việt │ (null) │ ├──────────┼────────────────────────────────────────────────┼─────────────┤ │ │ special │ │ │ │ 0円 cha\\rs │ │ └──────────┴────────────────────────────────────────────────┴─────────────┘ +----------+------------------------------------------------+-------------+ | quantity | spaces in name | last | +----------+------------------------------------------------+-------------+ | 3 | some.regular.text.to.make.this.row.interesting | [ 3 bytes ] | +----------+------------------------------------------------+-------------+ | 3.14 | Ti.ng.Vi.t | (null) | +----------+------------------------------------------------+-------------+ | | special.. | . | | | ...cha\rs | | +----------+------------------------------------------------+-------------+ ┌─────┬────────────────┬─────┐ │ qu- │ spaces in name │ la- │ │ an- │ │ st │ │ ti- │ │ │ │ ty │ │ │ ├─────┼────────────────┼─────┤ │ 3 │ some regular │ [ 3 │ │ │ text to make │ by- │ │ │ this row │ te- │ │ │ interesting │ s ] │ ├─────┼────────────────┼─────┤ │ 3.- │ Tiếng Việt │ (n- │ │ 14 │ │ ul- │ │ │ │ l) │ ├─────┼────────────────┼─────┤ │ │ special │ │ │ │ 0円 cha\\rs │ │ └─────┴────────────────┴─────┘ ┌──────────┬──────────────────────────────────────────────────┬───────────┐ │ quantity │ spaces in name │ last │ ├──────────┼──────────────────────────────────────────────────┼───────────┤ │ 3 │ 'some regular text to make this row interesting' │ X'030709' │ ├──────────┼──────────────────────────────────────────────────┼───────────┤ │ 3.14 │ 'Tiếng Việt' │ NULL │ ├──────────┼──────────────────────────────────────────────────┼───────────┤ │ '' │ 'special │ ' ' │ │ │ 0円 cha\\rs' │ │ └──────────┴──────────────────────────────────────────────────┴───────────┘
Caching
SQLite has a builtin cache. If you
do your own caching then you can find out if it is invalid via
pragma for
schema changes and Connection.data_version() for table content
changes. Any cache is invalid if the values are different - there
is no guarantee if they will go up or down.
print( "SQLite cache =", connection.pragma("cache_size"), " page_size = ", connection.pragma("page_size"), ) # Make a second connection to change the same database main # connection. These also work if the changes were done in a different # process. con2 = apsw.Connection(connection.filename) # See values before change print("Before values") print(f'{connection.pragma("schema_version")=}') print(f"{connection.data_version()=}") print("\nAfter values") # add to table from previous section con2.execute("insert into dummy values(1, 2, 3)") print(f"{connection.data_version()=}") # and add a table. changing an existing table definition etc also # bump the schema version con2.execute("create table more(x,y,z)") print(f'{connection.pragma("schema_version")=}')
SQLite cache = -2000 page_size = 4096 Before values connection.pragma("schema_version")=23 connection.data_version()=61 After values connection.data_version()=61 connection.pragma("schema_version")=24
The CARRAY extension
The extension makes it easy to provide an array of numbers, strings, or binary blobs during a query. The array will be used without calling back into Python code or acquiring the GIL.
Arrays of numbers can come from binary data, array.array,
numpy arrays
etc. Arrays of str and blobs are supplied as tuples.
All data in the array has to be the same type.
Use apsw.carray() to wrap your data, and provide it as a
binding. Note that it has start and stop parameters so you
can use a subset of the source data. The format of the data is
detected, or an explicit flags parameter can be used.
# We'll use the array module importarray # A packed array of 32 bit integers ids = array.array("i", [1, 73, 9457, 62]) # Simple usage. You would normally use joins, IN etc print( "ordered integers", connection.execute( "SELECT value FROM CARRAY(?) ORDER BY value", (apsw.carray(ids),), ).get ) # Using strings and blobs is just as easy strings = ("zero", "one", "two", "three", "four") blobs = (b"\xf3\x72\x94", b"\xf4\x8f\xbf", b"\xf7\xbf\xbf\xbf") print( "ordered strings", connection.execute( "SELECT value FROM CARRAY(?) ORDER BY value", (apsw.carray(strings),), ).get ) # We'll use the start parameter to skip entries print( "ordered strings start=2", connection.execute( "SELECT value FROM CARRAY(?) ORDER BY value", (apsw.carray(strings, start=2),), ).get ) # Find the longest blob print( "longest blob", connection.execute( "SELECT value FROM CARRAY(?) ORDER BY LENGTH(value) DESC LIMIT 1", (apsw.carray(blobs),), ).get )
ordered integers [1, 62, 73, 9457] ordered strings ['four', 'one', 'three', 'two', 'zero'] ordered strings start=2 ['four', 'three', 'two'] longest blob b'\xf7\xbf\xbf\xbf'
Cleanup
As a general rule you do not need to do any cleanup. Standard Python garbage collection will take of everything. Even if the process crashes with a connection in the middle of a transaction, the next time SQLite opens that database it will automatically rollback the incomplete transaction.
# You can close connections manually connection.close()