1/*-------------------------------------------------------------------------
5 * Support functions for using logical decoding and management of
6 * logical replication slots via SQL.
9 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
12 * src/backend/replication/logical/logicalfuncs.c
13 *-------------------------------------------------------------------------
39/* Private data for writing out data */
49 * Prepare for an output plugin write.
59 * Perform output plugin write into tuplestore.
69 /* SQL Datums can only be of a limited length... */
71 elog(
ERROR,
"too much output for sql interface");
75 memset(nulls, 0,
sizeof(nulls));
80 * Assert ctx->out is in database encoding when we're writing textual
88 /* ick, but cstring_to_text_with_len works for bytea perfectly fine */
96 * Helper function for the various SQL callable logical decoding functions.
122 (
errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
123 errmsg(
"slot name must not be null")));
138 (
errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
139 errmsg(
"options array must not be null")));
142 /* state to write output to */
145 p->binary_output = binary;
150 /* Deconstruct options array */
155 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
156 errmsg(
"array must be one-dimensional")));
161 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
162 errmsg(
"array must not contain nulls")));
176 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
177 errmsg(
"array must have even number of elements")));
179 for (
i = 0;
i < nelems;
i += 2)
193 * Compute the current end-of-wal.
204 /* restart at slot's confirmed_flush */
217 * Check whether the output plugin writes textual output if that's
223 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
224 errmsg(
"logical decoding output plugin \"%s\" produces binary output, but function \"%s\" expects textual data",
229 * Wait for specified streaming replication standby servers (if any)
230 * to confirm receipt of WAL up to wait_for_wal_lsn.
233 wait_for_wal_lsn = end_of_wal;
235 wait_for_wal_lsn =
Min(upto_lsn, end_of_wal);
239 ctx->output_writer_private = p;
242 * Decoding of WAL must start at restart_lsn so that the entirety of
243 * xacts that committed after the slot's confirmed_flush can be
244 * accumulated into reorder buffers.
248 /* invalidate non-timetravel entries */
251 /* Decode until we run out of records */
252 while (ctx->reader->EndRecPtr < end_of_wal)
259 elog(
ERROR,
"could not find record for logical decoding: %s", errm);
262 * The {begin_txn,change,commit_txn}_wrapper callbacks above will
263 * store the description into our tuplestore.
270 * We used to have bugs where logical decoding would fail to
271 * preserve the resource owner. Verify that that doesn't
272 * happen anymore. XXX this could be removed once it's been
280 upto_lsn <= ctx->reader->EndRecPtr)
282 if (upto_nchanges != 0 &&
283 upto_nchanges <= p->returned_rows)
289 * Next time, start where we left off. (Hunting things, the family
297 * If only the confirmed_flush_lsn has changed the slot won't get
298 * marked as dirty by the above. Callers on the walsender
299 * interface are expected to keep track of their own progress and
300 * don't need it written out. But SQL-interface users cannot
301 * specify their own start positions and it's harder for them to
302 * keep track of their progress, so we should make more of an
303 * effort to save it for them.
305 * Dirty the slot so it's written out at the next checkpoint.
306 * We'll still lose its position on crash, as documented, but it's
307 * better than always losing the position even on clean restart.
312 /* free context, call shutdown callback */
320 /* clear all timetravel entries */
331 * SQL function returning the changestream as text, consuming the data.
340 * SQL function returning the changestream as text, only peeking ahead.
349 * SQL function returning the changestream in binary, consuming the data.
358 * SQL function returning the changestream in binary, only peeking ahead.
368 * SQL function for writing logical decoding message into WAL.
380 transactional, flush);
387 /* bytea and text are compatible */
#define PG_GETARG_ARRAYTYPE_P(n)
bool array_contains_nulls(ArrayType *array)
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define PG_USED_FOR_ASSERTS_ONLY
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_BYTEA_PP(n)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_NAME(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Assert(PointerIsAligned(start, uint64))
void InvalidateSystemCaches(void)
List * lappend(List *list, void *datum)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void CheckLogicalDecodingRequirements(void)
static void LogicalOutputPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
static void LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS)
Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
struct DecodingOutputState DecodingOutputState
static Datum pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
DefElem * makeDefElem(char *name, Node *arg, int location)
int GetDatabaseEncoding(void)
bool pg_verify_mbstr(int encoding, const char *mbstr, int len, bool noError)
void * palloc0(Size size)
XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional, bool flush)
#define CHECK_FOR_INTERRUPTS()
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static Datum LSNGetDatum(XLogRecPtr X)
static Datum PointerGetDatum(const void *X)
static Datum TransactionIdGetDatum(TransactionId X)
char * format_procedure(Oid procedure_oid)
ResourceOwner CurrentResourceOwner
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
void ReplicationSlotMarkDirty(void)
ReplicationSlot * MyReplicationSlot
void CheckSlotPermissions(void)
void ReplicationSlotRelease(void)
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
void resetStringInfo(StringInfo str)
Tuplestorestate * tupstore
MemoryContext ecxt_per_query_memory
void * output_writer_private
ReplicationSlotPersistentData data
Tuplestorestate * setResult
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
String * makeString(char *str)
static Size VARSIZE_ANY_EXHDR(const void *PTR)
static char * VARDATA_ANY(const void *PTR)
text * cstring_to_text_with_len(const char *s, int len)
char * text_to_cstring(const text *t)
bool RecoveryInProgress(void)
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)