1/*-------------------------------------------------------------------------
4 * logical replication protocol functions
6 * Copyright (c) 2015-2025, PostgreSQL Global Development Group
9 * src/backend/replication/logical/proto.c
11 *-------------------------------------------------------------------------
24 * Protocol message flags.
26 #define LOGICALREP_IS_REPLICA_IDENTITY 1
28 #define MESSAGE_TRANSACTIONAL (1<<0)
29 #define TRUNCATE_CASCADE (1<<0)
30 #define TRUNCATE_RESTART_SEQS (1<<1)
34 PublishGencolsType include_gencols_type);
38 PublishGencolsType include_gencols_type);
46 * Write BEGIN to the output stream.
60 * Read transaction BEGIN from the stream.
68 elog(
ERROR,
"final_lsn not set in begin message");
75 * Write COMMIT to the output stream.
85 /* send the flags field (unused for now) */
95 * Read transaction COMMIT from the stream.
100 /* read flags (unused for now) */
104 elog(
ERROR,
"unrecognized flags %u in commit message", flags);
113 * Write BEGIN PREPARE to the output stream.
131 * Read transaction BEGIN PREPARE from the stream.
139 elog(
ERROR,
"prepare_lsn not set in begin prepare message");
142 elog(
ERROR,
"end_lsn not set in begin prepare message");
146 /* read gid (copy it into a pre-allocated buffer) */
151 * The core functionality for logicalrep_write_prepare and
152 * logicalrep_write_stream_prepare.
163 * This should only ever happen for two-phase commit transactions, in
164 * which case we expect to have a valid GID.
170 /* send the flags field */
184 * Write PREPARE to the output stream.
195 * The core functionality for logicalrep_read_prepare and
196 * logicalrep_read_stream_prepare.
206 elog(
ERROR,
"unrecognized flags %u in %s message", flags, msgtype);
211 elog(
ERROR,
"prepare_lsn is not set in %s message", msgtype);
214 elog(
ERROR,
"end_lsn is not set in %s message", msgtype);
218 elog(
ERROR,
"invalid two-phase transaction ID in %s message", msgtype);
220 /* read gid (copy it into a pre-allocated buffer) */
225 * Read transaction PREPARE from the stream.
234 * Write COMMIT PREPARED to the output stream.
245 * This should only ever happen for two-phase commit transactions, in
246 * which case we expect to have a valid GID.
250 /* send the flags field */
264 * Read transaction COMMIT PREPARED from the stream.
273 elog(
ERROR,
"unrecognized flags %u in commit prepared message", flags);
278 elog(
ERROR,
"commit_lsn is not set in commit prepared message");
281 elog(
ERROR,
"end_lsn is not set in commit prepared message");
285 /* read gid (copy it into a pre-allocated buffer) */
290 * Write ROLLBACK PREPARED to the output stream.
302 * This should only ever happen for two-phase commit transactions, in
303 * which case we expect to have a valid GID.
307 /* send the flags field */
322 * Read transaction ROLLBACK PREPARED from the stream.
332 elog(
ERROR,
"unrecognized flags %u in rollback prepared message", flags);
337 elog(
ERROR,
"prepare_end_lsn is not set in rollback prepared message");
340 elog(
ERROR,
"rollback_end_lsn is not set in rollback prepared message");
345 /* read gid (copy it into a pre-allocated buffer) */
350 * Write STREAM PREPARE to the output stream.
362 * Read STREAM PREPARE from the stream.
371 * Write ORIGIN to the output stream.
387 * Read ORIGIN from the output stream.
400 * Write INSERT to the output stream.
406 PublishGencolsType include_gencols_type)
410 /* transaction ID (if not valid, we're not streaming) */
414 /* use Oid as relation identifier */
419 include_gencols_type);
423 * Read INSERT from stream.
425 * Fills the new tuple.
433 /* read the relation id */
438 elog(
ERROR,
"expected new tuple but got %d",
447 * Write UPDATE to the output stream.
453 PublishGencolsType include_gencols_type)
457 Assert(rel->
rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
458 rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
459 rel->
rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
461 /* transaction ID (if not valid, we're not streaming) */
465 /* use Oid as relation identifier */
470 if (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
475 include_gencols_type);
480 include_gencols_type);
484 * Read UPDATE from stream.
494 /* read the relation id */
497 /* read and verify action */
500 elog(
ERROR,
"expected action 'N', 'O' or 'K', got %c",
503 /* check for old tuple */
507 *has_oldtuple =
true;
512 *has_oldtuple =
false;
514 /* check for new tuple */
516 elog(
ERROR,
"expected action 'N', got %c",
525 * Write DELETE to the output stream.
531 PublishGencolsType include_gencols_type)
533 Assert(rel->
rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
534 rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
535 rel->
rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
539 /* transaction ID (if not valid, we're not streaming) */
543 /* use Oid as relation identifier */
546 if (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
552 include_gencols_type);
556 * Read DELETE from stream.
558 * Fills the old tuple.
566 /* read the relation id */
569 /* read and verify action */
580 * Write TRUNCATE to the output stream.
587 bool cascade,
bool restart_seqs)
594 /* transaction ID (if not valid, we're not streaming) */
600 /* encode and send truncate flags */
607 for (
i = 0;
i < nrelids;
i++)
612 * Read TRUNCATE from stream.
616 bool *cascade,
bool *restart_seqs)
625 /* read and decode truncate flags */
630 for (
i = 0;
i < nrelids;
i++)
637 * Write MESSAGE to stream
641 bool transactional,
const char *prefix,
Size sz,
648 /* encode and send message flags */
652 /* transaction ID (if not valid, we're not streaming) */
664 * Write relation description to the output stream.
669 PublishGencolsType include_gencols_type)
675 /* transaction ID (if not valid, we're not streaming) */
679 /* use Oid as relation identifier */
682 /* send qualified relation name */
687 /* send replica identity */
690 /* send the attribute info */
695 * Read the relation info from stream and return as LogicalRepRelation.
704 /* Read relation name from stream */
708 /* Read the replica identity. */
711 /* Get attribute description */
718 * Write type info to the output stream.
720 * This function will always write base type info.
731 /* transaction ID (if not valid, we're not streaming) */
737 elog(
ERROR,
"cache lookup failed for type %u", basetypoid);
740 /* use Oid as type identifier */
743 /* send qualified type name */
751 * Read type info from the output stream.
758 /* Read type name from stream */
764 * Write a tuple to the outputstream, in the most efficient format possible.
769 PublishGencolsType include_gencols_type)
784 include_gencols_type))
795 /* Write the values */
803 include_gencols_type))
815 * Unchanged toasted datum. (Note that we don't promise to detect
816 * unchanged data in general; this is just a cheap check to avoid
817 * sending large values unnecessarily.)
825 elog(
ERROR,
"cache lookup failed for type %u", att->atttypid);
829 * Send in binary if requested and type has suitable send function.
858 * Read tuple in logical replication format from stream.
866 /* Get number of attributes */
869 /* Allocate space for per-column values; zero out unused StringInfoDatas */
872 tuple->
ncols = natts;
875 for (
i = 0;
i < natts;
i++)
888 /* nothing more to do */
891 /* we don't receive the value of an unchanged column */
902 * NUL termination is required for LOGICALREP_COLUMN_TEXT mode
903 * as input functions require that. For
904 * LOGICALREP_COLUMN_BINARY it's not technically required, but
912 elog(
ERROR,
"unrecognized data representation type '%c'", kind);
918 * Write relation attribute metadata to the stream.
922 PublishGencolsType include_gencols_type)
932 /* send number of live attributes */
938 include_gencols_type))
945 /* fetch bitmap of REPLICATION IDENTITY attributes */
946 replidentfull = (rel->
rd_rel->relreplident == REPLICA_IDENTITY_FULL);
950 /* send the attributes */
957 include_gencols_type))
960 /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
971 /* attribute type id */
982 * Read relation attribute metadata from the stream.
994 attnames =
palloc(natts *
sizeof(
char *));
997 /* read the attributes */
998 for (
i = 0;
i < natts;
i++)
1002 /* Check for replica identity column */
1007 /* attribute name */
1010 /* attribute type id */
1013 /* we ignore attribute mode for now */
1024 * Write the namespace name or empty string for pg_catalog (to save space).
1029 if (
nspid == PG_CATALOG_NAMESPACE)
1035 if (nspname == NULL)
1036 elog(
ERROR,
"cache lookup failed for namespace %u",
1044 * Read the namespace name while treating empty string as pg_catalog.
1051 if (nspname[0] ==
'0円')
1052 nspname =
"pg_catalog";
1058 * Write the information for the start stream message to the output stream.
1068 /* transaction ID (we're starting to stream, so must be valid) */
1071 /* 1 if this is the first streaming segment for this xid */
1076 * Read the information about the start stream message from output stream.
1092 * Write the stop stream message to the output stream.
1101 * Write STREAM COMMIT to the output stream.
1113 /* transaction ID */
1116 /* send the flags field (unused for now) */
1126 * Read STREAM COMMIT from the output stream.
1136 /* read flags (unused for now) */
1140 elog(
ERROR,
"unrecognized flags %u in commit message", flags);
1151 * Write STREAM ABORT to the output stream. Note that xid and subxid will be
1152 * same for the top-level transaction abort.
1154 * If write_abort_info is true, send the abort_lsn and abort_time fields,
1166 /* transaction ID */
1170 if (write_abort_info)
1178 * Read STREAM ABORT from the output stream.
1180 * If read_abort_info is true, read the abort_lsn and abort_time fields,
1186 bool read_abort_info)
1193 if (read_abort_info)
1206 * Get string representing LogicalRepMsgType.
1211 static char err_unknown[20];
1236 return "BEGIN PREPARE";
1240 return "COMMIT PREPARED";
1242 return "ROLLBACK PREPARED";
1244 return "STREAM START";
1246 return "STREAM STOP";
1248 return "STREAM COMMIT";
1250 return "STREAM ABORT";
1252 return "STREAM PREPARE";
1256 * This message provides context in the error raised when applying a
1257 * logical message. So we can't throw an error here. Return an unknown
1258 * indicator value so that the original error is still reported.
1266 * Check if the column 'att' of a table should be published.
1268 * 'columns' represents the publication column list (if any) for that table.
1270 * 'include_gencols_type' value indicates whether generated columns should be
1271 * published when there is no column list. Typically, this will have the same
1272 * value as the 'publish_generated_columns' publication parameter.
1274 * Note that generated columns can be published only when present in a
1275 * publication column list, or when include_gencols_type is
1276 * PUBLISH_GENCOLS_STORED.
1280 PublishGencolsType include_gencols_type)
1282 if (att->attisdropped)
1285 /* If a column list is provided, publish only the cols in that list. */
1289 /* All non-generated columns are always published. */
1290 if (!att->attgenerated)
1294 * Stored generated columns are only published when the user sets
1295 * publish_generated_columns as stored.
1297 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED)
1298 return include_gencols_type == PUBLISH_GENCOLS_STORED;
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define OidIsValid(objectId)
char * OidOutputFunctionCall(Oid functionId, Datum val)
bytea * OidSendFunctionCall(Oid functionId, Datum val)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
List * lappend_oid(List *list, Oid datum)
#define LOGICALREP_COLUMN_UNCHANGED
@ LOGICAL_REP_MSG_TRUNCATE
@ LOGICAL_REP_MSG_STREAM_STOP
@ LOGICAL_REP_MSG_STREAM_PREPARE
@ LOGICAL_REP_MSG_STREAM_ABORT
@ LOGICAL_REP_MSG_BEGIN_PREPARE
@ LOGICAL_REP_MSG_STREAM_START
@ LOGICAL_REP_MSG_PREPARE
@ LOGICAL_REP_MSG_RELATION
@ LOGICAL_REP_MSG_MESSAGE
@ LOGICAL_REP_MSG_ROLLBACK_PREPARED
@ LOGICAL_REP_MSG_COMMIT_PREPARED
@ LOGICAL_REP_MSG_STREAM_COMMIT
#define LOGICALREP_COLUMN_NULL
#define LOGICALREP_COLUMN_BINARY
#define LOGICALREP_COLUMN_TEXT
Oid getBaseType(Oid typid)
char * get_namespace_name(Oid nspid)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
FormData_pg_attribute * Form_pg_attribute
FormData_pg_type * Form_pg_type
size_t strlcpy(char *dst, const char *src, size_t siz)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
void logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
static void logicalrep_write_namespace(StringInfo out, Oid nspid)
#define TRUNCATE_RESTART_SEQS
LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data)
static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_data)
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup)
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
List * logicalrep_read_truncate(StringInfo in, bool *cascade, bool *restart_seqs)
void logicalrep_read_stream_abort(StringInfo in, LogicalRepStreamAbortData *abort_data, bool read_abort_info)
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
#define MESSAGE_TRANSACTIONAL
char * logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
#define LOGICALREP_IS_REPLICA_IDENTITY
static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data)
LogicalRepRelation * logicalrep_read_rel(StringInfo in)
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
const char * logicalrep_message_type(LogicalRepMsgType action)
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
TransactionId logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
static const char * logicalrep_read_namespace(StringInfo in)
void logicalrep_write_stream_stop(StringInfo out)
TransactionId logicalrep_read_stream_start(StringInfo in, bool *first_segment)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIdentityKeyBitmap(Relation relation)
#define rbtxn_is_prepared(txn)
static void initStringInfoFromString(StringInfo str, char *data, int len)
XLogRecPtr prepare_end_lsn
XLogRecPtr rollback_end_lsn
TimestampTz rollback_time
StringInfoData * colvalues
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static void slot_getallattrs(TupleTableSlot *slot)
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
static Size VARSIZE(const void *PTR)
static char * VARDATA(const void *PTR)
#define InvalidXLogRecPtr