1/* -------------------------------------------------------------------------
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
16 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
19 * src/test/modules/worker_spi/worker_spi.c
21 * -------------------------------------------------------------------------
25/* These are always necessary for a bgworker */
31/* these headers are used by this particular worker's code */
55/* value cached, fetched from shared memory */
65 * Initialize workspace for a worker process: create the schema if it doesn't
82 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
90 elog(
FATAL,
"SPI_execute failed: error code %d", ret);
106 "CREATE SCHEMA \"%s\" "
107 "CREATE TABLE \"%s\" ("
108 " type text CHECK (type IN ('total', 'delta')), "
110 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
111 "WHERE type = 'total'",
114 /* set statement start time */
121 elog(
FATAL,
"failed to create my schema");
150 /* fetch database and role OIDs, these are set for a dynamic worker */
152 memcpy(&dboid, p,
sizeof(
Oid));
154 memcpy(&roleoid, p,
sizeof(
Oid));
156 memcpy(&flags, p,
sizeof(
bits32));
158 /* Establish signal handlers before unblocking signals. */
162 /* We're now ready to receive signals */
165 /* Connect to our database */
172 elog(
LOG,
"%s initialized with %s.%s",
177 * Quote identifiers passed to us. Note that this must be done after
178 * initialize_worker_spi, because that routine assumes the names are not
181 * Note some memory might be leaked here.
188 "WITH deleted AS (DELETE "
190 "WHERE type = 'delta' RETURNING value), "
191 "total AS (SELECT coalesce(sum(value), 0) as sum "
194 "SET value = %s.value + total.sum "
195 "FROM total WHERE type = 'total' "
196 "RETURNING %s.value",
203 * Main loop: do this until SIGTERM is received and processed by
210 /* First time, allocate or get the custom wait event */
215 * Background workers mustn't call usleep() or any direct equivalent:
216 * instead, they may wait on their process latch, which sleeps as
217 * necessary, but is awakened if postmaster dies. That way the
218 * background process goes away immediately in an emergency.
229 * In case of a SIGHUP, just reload the configuration.
238 * Start a transaction on which we can run queries. Note that each
239 * StartTransactionCommand() call should be preceded by a
240 * SetCurrentStatementStartTimestamp() call, which sets both the time
241 * for the statement we're about the run, and also the transaction
242 * start time. Also, each other query sent to SPI should probably be
243 * preceded by SetCurrentStatementStartTimestamp(), so that statement
244 * start time is always up to date.
246 * The SPI_connect() call lets us run queries through the SPI manager,
247 * and the PushActiveSnapshot() call creates an "active" snapshot
248 * which is necessary for queries to have MVCC data to work on.
250 * The pgstat_report_activity() call makes our activity visible
251 * through the pgstat views.
260 /* We can now execute queries via SPI */
264 elog(
FATAL,
"cannot select from table %s.%s: error code %d",
276 elog(
LOG,
"%s: count in %s.%s is now %d",
282 * And finish our transaction.
296 * Entrypoint of this module.
298 * We register more than one worker process here, to demonstrate how that can
306 /* get the configuration */
309 * These GUCs are defined even if this library is not loaded with
310 * shared_preload_libraries, for worker_spi_launch().
313 "Duration between each check (in seconds).",
326 "Database to connect to.",
335 "Role to connect with.",
347 "Number of workers.",
361 /* set up common data for all our workers */
362 memset(&worker, 0,
sizeof(worker));
372 * Now fill in worker-specific data, and do the actual registrations.
374 * bgw_extra can optionally include a database OID, a role OID and a set
375 * of flags. This is left empty here to fallback to the related GUCs at
376 * startup (0 for the bgworker flags).
389 * Dynamically launch an SPI worker.
408 memset(&worker, 0,
sizeof(worker));
418 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
421 /* extract flags, if any */
425 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
426 errmsg(
"flags array must be one-dimensional")));
430 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
431 errmsg(
"flags array must not contain nulls")));
436 for (
i = 0;
i < nelems;
i++)
440 if (strcmp(optname,
"ALLOWCONN") == 0)
442 else if (strcmp(optname,
"ROLELOGINCHECK") == 0)
446 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
447 errmsg(
"incorrect flag value found in array")));
451 * Register database and role to use for the worker started in bgw_extra.
452 * If none have been provided, this will fall back to the GUCs at startup.
458 * worker_spi_role is NULL by default, so this gives to worker_spi_main()
459 * an invalid OID in this case.
465 memcpy(p, &dboid,
sizeof(
Oid));
467 memcpy(p, &roleoid,
sizeof(
Oid));
469 memcpy(p, &flags,
sizeof(
bits32));
478 (
errcode(ERRCODE_INSUFFICIENT_RESOURCES),
479 errmsg(
"could not start background process"),
480 errhint(
"More details may be available in the server log.")));
483 (
errcode(ERRCODE_INSUFFICIENT_RESOURCES),
484 errmsg(
"cannot start background processes without postmaster"),
485 errhint(
"Kill all remaining database processes and restart the database.")));
Oid get_role_oid(const char *rolname, bool missing_ok)
#define PG_GETARG_ARRAYTYPE_P(n)
bool array_contains_nulls(ArrayType *array)
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void RegisterBackgroundWorker(BackgroundWorker *worker)
BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
void BackgroundWorkerUnblockSignals(void)
void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 flags)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
#define BGWORKER_BYPASS_ROLELOGINCHECK
@ BgWorkerStart_RecoveryFinished
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_BYPASS_ALLOWCONN
#define BGWORKER_SHMEM_ACCESS
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid get_database_oid(const char *dbname, bool missing_ok)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_RETURN_INT32(x)
#define PG_GETARG_INT32(n)
void ProcessConfigFile(GucContext context)
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
void MarkGUCPrefixReserved(const char *className)
void DefineCustomIntVariable(const char *name, const char *short_desc, const char *long_desc, int *valueAddr, int bootValue, int minValue, int maxValue, GucContext context, int flags, GucIntCheckHook check_hook, GucIntAssignHook assign_hook, GucShowHook show_hook)
Assert(PointerIsAligned(start, uint64))
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
char * pstrdup(const char *in)
#define CHECK_FOR_INTERRUPTS()
bool process_shared_preload_libraries_in_progress
static const struct lconv_member_info table[]
long pgstat_report_stat(bool force)
const char * debug_query_string
static int64 DatumGetInt64(Datum X)
static Datum Int32GetDatum(int32 X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
SPITupleTable * SPI_tuptable
int SPI_execute(const char *src, bool read_only, long tcount)
Datum SPI_getbinval(HeapTuple tuple, TupleDesc tupdesc, int fnumber, bool *isnull)
#define SPI_OK_UPDATE_RETURNING
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void initStringInfo(StringInfo str)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[MAXPGPATH]
uint32 WaitEventExtensionNew(const char *wait_event_name)
#define WL_EXIT_ON_PM_DEATH
static int worker_spi_naptime
static void initialize_worker_spi(worktable *table)
Datum worker_spi_launch(PG_FUNCTION_ARGS)
static uint32 worker_spi_wait_event_main
static char * worker_spi_database
static int worker_spi_total_workers
struct worktable worktable
PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg)
static char * worker_spi_role
PG_FUNCTION_INFO_V1(worker_spi_launch)
void StartTransactionCommand(void)
void SetCurrentStatementStartTimestamp(void)
void CommitTransactionCommand(void)