1/*-------------------------------------------------------------------------
4 * example logical decoding output plugin
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
9 * contrib/test_decoding/test_decoding.c
11 *-------------------------------------------------------------------------
26 .
name =
"test_decoding",
40 * Maintain the per-transaction level variables to track whether the
41 * transaction and or streams have written any changes. In streaming mode the
42 * transaction can be decoded in streams so along with maintaining whether the
43 * transaction has written any changes, we also need to track whether the
44 * current stream has written any changes. This is required so that if user
45 * has requested to skip the empty transactions we can skip the empty streams
46 * even though the transaction has written some changes.
70 int nrelations,
Relation relations[],
76 bool transactional,
const char *prefix,
77 Size sz,
const char *message);
116 bool transactional,
const char *prefix,
117 Size sz,
const char *message);
120 int nrelations,
Relation relations[],
126 /* other plugins can perform things here */
129/* specify output plugin callbacks */
157/* initialize this plugin */
164 bool enable_streaming =
false;
168 "text conversion context",
170 data->include_xids =
true;
171 data->include_timestamp =
false;
172 data->skip_empty_xacts =
false;
173 data->only_local =
false;
186 if (strcmp(elem->
defname,
"include-xids") == 0)
188 /* if option does not provide a value, it means its value is true */
189 if (elem->
arg == NULL)
190 data->include_xids =
true;
193 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
197 else if (strcmp(elem->
defname,
"include-timestamp") == 0)
199 if (elem->
arg == NULL)
200 data->include_timestamp =
true;
203 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
207 else if (strcmp(elem->
defname,
"force-binary") == 0)
211 if (elem->
arg == NULL)
215 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
222 else if (strcmp(elem->
defname,
"skip-empty-xacts") == 0)
225 if (elem->
arg == NULL)
226 data->skip_empty_xacts =
true;
229 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
233 else if (strcmp(elem->
defname,
"only-local") == 0)
236 if (elem->
arg == NULL)
237 data->only_local =
true;
240 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
244 else if (strcmp(elem->
defname,
"include-rewrites") == 0)
247 if (elem->
arg == NULL)
251 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
255 else if (strcmp(elem->
defname,
"stream-changes") == 0)
257 if (elem->
arg == NULL)
261 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 errmsg(
"could not parse value \"%s\" for parameter \"%s\"",
268 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 errmsg(
"option \"%s\" = \"%s\" is unknown",
278/* cleanup this plugin's resources */
284 /* cleanup our own resources via memory context reset */
300 * If asked to skip empty transactions, we'll emit BEGIN at the point
301 * where the first operation is received for this transaction.
303 if (
data->skip_empty_xacts)
313 if (
data->include_xids)
332 if (
data->skip_empty_xacts && !xact_wrote_changes)
336 if (
data->include_xids)
341 if (
data->include_timestamp)
348/* BEGIN PREPARE callback */
360 * If asked to skip empty transactions, we'll emit BEGIN at the point
361 * where the first operation is received for this transaction.
363 if (
data->skip_empty_xacts)
369/* PREPARE callback */
378 * If asked to skip empty transactions, we'll emit PREPARE at the point
379 * where the first operation is received for this transaction.
389 if (
data->include_xids)
392 if (
data->include_timestamp)
399/* COMMIT PREPARED callback */
411 if (
data->include_xids)
414 if (
data->include_timestamp)
421/* ROLLBACK PREPARED callback */
435 if (
data->include_xids)
438 if (
data->include_timestamp)
446 * Filter out two-phase transactions.
448 * Each plugin can implement its own filtering logic. Here we demonstrate a
449 * simple logic by checking the GID. If the GID contains the "_nodecode"
450 * substring, then we filter it out.
456 if (strstr(gid,
"_nodecode") != NULL)
474 * Print literal `outputstr' already represented as string of type `typid'
475 * into stringbuf `s'.
477 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
478 * if standard_conforming_strings were enabled.
494 /* NB: We don't care about Inf, NaN et al. */
504 if (strcmp(outputstr,
"t") == 0)
512 for (valptr = outputstr; *valptr; valptr++)
525/* print the tuple 'tuple' into the StringInfo s */
531 /* print all columns individually */
532 for (natt = 0; natt < tupdesc->
natts; natt++)
535 Oid typid;
/* type of current attribute */
536 Oid typoutput;
/* output function */
538 Datum origval;
/* possibly toasted Datum */
539 bool isnull;
/* column is null? */
544 * don't print dropped columns, we can't be sure everything is
547 if (attr->attisdropped)
551 * Don't print system columns, oid will already have been printed if
554 if (attr->attnum < 0)
557 typid = attr->atttypid;
559 /* get Datum from tuple */
560 origval =
heap_getattr(tuple, natt + 1, tupdesc, &isnull);
562 if (isnull && skip_nulls)
565 /* print attribute name */
569 /* print attribute type */
574 /* query output function */
576 &typoutput, &typisvarlena);
578 /* print separator */
586 else if (!typisvarlena)
591 Datum val;
/* definitely detoasted Datum */
600 * callback for individual changed tuples
615 /* output BEGIN if we haven't yet */
625 /* Avoid leaking memory by using and resetting our own context */
633 class_form->relrewrite ?
635 NameStr(class_form->relname)));
670 /* if there was no PK, we only know that a delete happened */
673 /* In DELETE, only the replica identity is present; display that */
701 /* output BEGIN if we haven't yet */
708 /* Avoid leaking memory by using and resetting our own context */
715 for (
i = 0;
i < nrelations;
i++)
722 NameStr(relations[
i]->rd_rel->relname)));
747 const char *prefix,
Size sz,
const char *message)
754 /* output BEGIN if we haven't yet for transactional messages */
763 transactional, prefix, sz);
776 * Allocate the txn plugin data for the first stream in the transaction.
787 if (
data->skip_empty_xacts)
796 if (
data->include_xids)
814 if (
data->include_xids)
829 * stream abort can be sent for an individual subtransaction but we
830 * maintain the output_plugin_private only under the toptxn so if this is
831 * not the toptxn then fetch the toptxn.
844 if (
data->skip_empty_xacts && !xact_wrote_changes)
848 if (
data->include_xids)
868 if (
data->include_xids)
875 if (
data->include_timestamp)
894 if (
data->skip_empty_xacts && !xact_wrote_changes)
899 if (
data->include_xids)
904 if (
data->include_timestamp)
912 * In streaming mode, we don't display the changes as the transaction can abort
913 * at a later point in time. We don't want users to see the changes until the
914 * transaction is committed.
925 /* output stream start if we haven't yet */
933 if (
data->include_xids)
941 * In streaming mode, we don't display the contents for transactional messages
942 * as the transaction can abort at a later point in time. We don't want users to
943 * see the message contents until the transaction is committed.
948 const char *prefix,
Size sz,
const char *message)
950 /* Output stream start if we haven't yet for transactional messages. */
968 transactional, prefix, sz);
972 appendStringInfo(ctx->
out,
"streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 transactional, prefix, sz);
981 * In streaming mode, we don't display the detailed information of Truncate.
982 * See pg_decode_stream_change.
986 int nrelations,
Relation relations[],
999 if (
data->include_xids)
const char * timestamptz_to_str(TimestampTz t)
bool parse_bool(const char *value, bool *result)
#define SQL_STR_DOUBLE(ch, escape_backslash)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
char * OidOutputFunctionCall(Oid functionId, Datum val)
#define PG_DETOAST_DATUM(datum)
Assert(PointerIsAligned(start, uint64))
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
char * get_rel_name(Oid relid)
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Oid get_rel_namespace(Oid relid)
char * get_namespace_name(Oid nspid)
void MemoryContextReset(MemoryContext context)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define IsA(nodeptr, _type_)
#define InvalidRepOriginId
@ OUTPUT_PLUGIN_BINARY_OUTPUT
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_class * Form_pg_class
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetForm(relation)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void * output_plugin_private
List * output_plugin_options
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
struct ReorderBufferChange::@114::@116 truncate
ReorderBufferChangeType action
struct ReorderBufferChange::@114::@115 tp
union ReorderBufferChange::@114 data
void * output_plugin_private
bool stream_wrote_changes
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void print_literal(StringInfo s, Oid typid, char *outputstr)
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
PG_MODULE_MAGIC_EXT(.name="test_decoding",.version=PG_VERSION)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)