3 * PostgreSQL logical replay/reorder buffer management.
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 * src/include/replication/reorderbuffer.h
10#define REORDERBUFFER_H
21/* paths for logical decoding data (relative to installation's $PGDATA) */
22 #define PG_LOGICAL_DIR "pg_logical"
23 #define PG_LOGICAL_MAPPINGS_DIR PG_LOGICAL_DIR "/mappings"
24 #define PG_LOGICAL_SNAPSHOTS_DIR PG_LOGICAL_DIR "/snapshots"
30/* possible values for debug_logical_replication_streaming */
38 * Types of the change passed to a 'change' callback.
40 * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
41 * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
42 * changes. Users of the decoding facilities will never see changes with
43 * *_INTERNAL_* actions.
45 * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
46 * changes concern "speculative insertions", their confirmation, and abort
47 * respectively. They're used by INSERT .. ON CONFLICT .. UPDATE. Users of
48 * logical decoding don't have to care about these.
66/* forward declaration */
70 * a single 'change', can be an insert (with one tuple), an update (old, new),
73 * The same struct is also used internally for other purposes but that should
74 * never be visible outside reorderbuffer.c.
80 /* The type of change. */
83 /* Transaction this change belongs to. */
89 * Context data for the change. Which part of the union is valid depends
94 /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
97 /* relation that has been changed */
100 /* no previously reassembled toast chunks are necessary anymore */
103 /* valid for DELETE || UPDATE */
105 /* valid for INSERT || UPDATE */
110 * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
111 * set of relations to be truncated.
121 /* Message with arbitrary data. */
129 /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
133 * New command id for existing snapshot in a catalog changing tx. Set
134 * when action == *_INTERNAL_COMMAND_ID.
139 * New cid mapping for catalog changing transaction, set when action
140 * == *_INTERNAL_TUPLECID.
160 * While in use this is how a change is linked into a transactions,
161 * otherwise it's the preallocated list.
166/* ReorderBufferTXN txn_flags */
167 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
168 #define RBTXN_IS_SUBXACT 0x0002
169 #define RBTXN_IS_SERIALIZED 0x0004
170 #define RBTXN_IS_SERIALIZED_CLEAR 0x0008
171 #define RBTXN_IS_STREAMED 0x0010
172 #define RBTXN_HAS_PARTIAL_CHANGE 0x0020
173 #define RBTXN_IS_PREPARED 0x0040
174 #define RBTXN_SKIPPED_PREPARE 0x0080
175 #define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
176 #define RBTXN_SENT_PREPARE 0x0200
177 #define RBTXN_IS_COMMITTED 0x0400
178 #define RBTXN_IS_ABORTED 0x0800
179 #define RBTXN_DISTR_INVAL_OVERFLOWED 0x1000
181 #define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
183/* Does the transaction have catalog changes? */
184 #define rbtxn_has_catalog_changes(txn) \
186 ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
189/* Is the transaction known as a subxact? */
190 #define rbtxn_is_known_subxact(txn) \
192 ((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
195/* Has this transaction been spilled to disk? */
196 #define rbtxn_is_serialized(txn) \
198 ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
201/* Has this transaction ever been spilled to disk? */
202 #define rbtxn_is_serialized_clear(txn) \
204 ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
207/* Does this transaction contain partial changes? */
208 #define rbtxn_has_partial_change(txn) \
210 ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
213/* Does this transaction contain streamable changes? */
214 #define rbtxn_has_streamable_change(txn) \
216 ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
220 * Has this transaction been streamed to downstream?
222 * (It's not possible to deduce this from nentries and nentries_mem for
223 * various reasons. For example, all changes may be in subtransactions in
224 * which case we'd have nentries==0 for the toplevel one, which would say
225 * nothing about the streaming. So we maintain this flag, but only for the
226 * toplevel transaction.)
228 #define rbtxn_is_streamed(txn) \
230 ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
234 * Is this a prepared transaction?
236 * Being true means that this transaction should be prepared instead of
237 * committed. To check whether a prepare or a stream_prepare has already
238 * been sent for this transaction, we need to use rbtxn_sent_prepare().
240 #define rbtxn_is_prepared(txn) \
242 ((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
245/* Has a prepare or stream_prepare already been sent? */
246 #define rbtxn_sent_prepare(txn) \
248 ((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
251/* Is this transaction committed? */
252 #define rbtxn_is_committed(txn) \
254 ((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
257/* Is this transaction aborted? */
258 #define rbtxn_is_aborted(txn) \
260 ((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
263/* prepare for this transaction skipped? */
264 #define rbtxn_skip_prepared(txn) \
266 ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
269/* Is the array of distributed inval messages overflowed? */
270 #define rbtxn_distr_inval_overflowed(txn) \
272 ((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
275/* Is this a top-level transaction? */
276 #define rbtxn_is_toptxn(txn) \
278 (txn)->toptxn == NULL \
281/* Is this a subtransaction? */
282 #define rbtxn_is_subtxn(txn) \
284 (txn)->toptxn != NULL \
287/* Get the top-level transaction of this (sub)transaction. */
288 #define rbtxn_get_toptxn(txn) \
290 rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
298 /* The transaction's transaction id, can be a toplevel or sub xid. */
301 /* Xid of top-level transaction, if known */
305 * Global transaction id required for identification of prepared
311 * LSN of the first data carrying, WAL record with knowledge about this
312 * xid. This is allowed to *not* be first record adorned with this xid, if
313 * the previous records aren't relevant for logical decoding.
318 * LSN of the record that lead to this xact to be prepared or committed or
319 * aborted. This can be a
320 * * plain commit record
321 * * plain commit record, of a parent transaction
322 * * prepared transaction
323 * * prepared transaction commit
324 * * plain abort record
325 * * prepared transaction abort
327 * This can also become set to earlier values than transaction end when
328 * a transaction is spilled to disk; specifically it's set to the LSN of
329 * the latest change written to disk so far.
335 * LSN pointing to the end of the commit record + 1.
339 /* Toplevel transaction for this subxact (NULL for top-level). */
343 * LSN of the last lsn at which snapshot information reside, so we can
344 * restart decoding from there and fully recover this transaction from
349 /* origin of the change that caused this transaction */
354 * Commit or Prepare time, only known when we read the actual commit or
365 * The base snapshot is used to decode all changes until either this
366 * transaction modifies the catalog, or another catalog-modifying
367 * transaction commits.
374 * Snapshot/CID from the previous streaming run. Only valid for already
375 * streamed transactions (NULL/InvalidCommandId otherwise).
381 * How many ReorderBufferChange's do we have in this txn.
383 * Changes in subtransactions are *not* included but tracked separately.
388 * How many of the above entries are stored in memory in contrast to being
394 * List of ReorderBufferChange structs, including new Snapshots, new
395 * CommandIds and command invalidation messages.
400 * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
401 * Those are always assigned to the toplevel transaction. (Keep track of
402 * #entries to create a hash of the right size)
408 * On-demand built hash for looking up the above values.
413 * Hash containing (potentially partial) toast entries. NULL if no toast
414 * tuples have been found for the current change.
419 * non-hierarchical list of subtransactions that are *not* aborted. Only
420 * used in toplevel transactions.
426 * Stored cache invalidations. This is not a linked list because we get
427 * all the invalidations at once.
433 * Stores cache invalidation messages distributed by other transactions.
439 * Position in one of two lists:
440 * * list of subtransactions if we are *known* to be subxact
441 * * list of toplevel xacts (can be an as-yet unknown subxact)
447 * A node in the list of catalog modifying transactions
457 * Size of this transaction (changes currently in memory, in bytes).
461 /* Size of top-transaction including sub-transactions. */
465 * Private data pointer of the output plugin.
470/* so we can define the callbacks used inside struct ReorderBuffer itself */
473/* change callback signature */
479/* truncate callback signature */
486/* begin callback signature */
490/* commit callback signature */
495/* message callback signature */
500 const char *prefix,
Size sz,
503/* begin prepare callback signature */
507/* prepare callback signature */
512/* commit prepared callback signature */
517/* rollback prepared callback signature */
523/* start streaming transaction callback signature */
528/* stop streaming transaction callback signature */
533/* discard streamed transaction callback signature */
538/* prepare streamed transaction callback signature */
543/* commit streamed transaction callback signature */
548/* stream change callback signature */
554/* stream message callback signature */
559 const char *prefix,
Size sz,
562/* stream truncate callback signature */
569/* update progress txn callback signature */
577 * xid => ReorderBufferTXN lookup table
582 * Transactions that could be a toplevel xact, ordered by LSN of the first
583 * record bearing that xid.
588 * Transactions and subtransactions that have a base snapshot, ordered by
589 * LSN of the record which caused us to first obtain the base snapshot.
590 * This is not the same as toplevel_by_lsn, because we only set the base
591 * snapshot on the first logical-decoding-relevant record (eg. heap
592 * writes), whereas the initial LSN could be set by other operations.
597 * Transactions and subtransactions that have modified system catalogs.
602 * one-entry sized cache for by_txn. Very frequently the same txn gets
603 * looked up over and over again.
609 * Callbacks to be called when a transactions commits.
618 * Callbacks to be called when streaming a transaction at prepare time.
626 * Callbacks to be called when streaming a transaction.
638 * Callback to be called when updating progress during sending data of a
639 * transaction (and its subtransactions) to the output plugin.
644 * Pointer that will be passed untouched to the callbacks.
649 * Saved output plugin option
654 * Private memory context.
659 * Memory contexts for specific types objects
667 /* buffer for disk<->memory conversions */
671 /* memory accounting */
674 /* Max-heap for sizes of all top-level and sub transactions */
678 * Statistics about transactions spilled to disk.
680 * A single transaction may be spilled repeatedly, which is why we keep
681 * two different counters. For spilling, the transaction counter includes
682 * both toplevel transactions and subtransactions.
688 /* Statistics about transactions streamed to the decoding output plugin */
694 * Statistics about all the transactions sent to the decoding output
720 bool transactional,
const char *prefix,
721 Size message_size,
const char *message);
730 char *gid,
bool is_commit);
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
void(* ReorderBufferCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
PGDLLIMPORT int logical_decoding_work_mem
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
PGDLLIMPORT int debug_logical_replication_streaming
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
void(* ReorderBufferUpdateProgressTxnCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)
void(* ReorderBufferStreamCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
void(* ReorderBufferStreamStartCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
void(* ReorderBufferApplyChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
struct ReorderBufferTXN ReorderBufferTXN
void(* ReorderBufferStreamPrepareCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void(* ReorderBufferStreamChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
DebugLogicalRepStreamingMode
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void(* ReorderBufferCommitPreparedCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void(* ReorderBufferStreamMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
void(* ReorderBufferBeginCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBuffer * ReorderBufferAllocate(void)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
void(* ReorderBufferMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
struct ReorderBufferChange ReorderBufferChange
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void(* ReorderBufferApplyTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
void(* ReorderBufferBeginPrepareCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
void(* ReorderBufferStreamAbortCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
void(* ReorderBufferPrepareCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
void(* ReorderBufferRollbackPreparedCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferFree(ReorderBuffer *rb)
@ REORDER_BUFFER_CHANGE_INVALIDATION
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_MESSAGE
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
@ REORDER_BUFFER_CHANGE_TRUNCATE
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
@ REORDER_BUFFER_CHANGE_UPDATE
void(* ReorderBufferStreamStopCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
void StartupReorderBuffer(void)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
void(* ReorderBufferStreamTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
struct ReorderBufferChange::@114::@118 tuplecid
struct ReorderBufferChange::@114::@116 truncate
ReorderBufferChangeType action
bool clear_toast_afterwards
struct ReorderBufferChange::@114::@117 msg
struct ReorderBufferTXN * txn
struct ReorderBufferChange::@114::@115 tp
union ReorderBufferChange::@114 data
SharedInvalidationMessage * invalidations
struct ReorderBufferChange::@114::@119 inval
XLogRecPtr restart_decoding_lsn
pairingheap_node txn_node
XLogRecPtr base_snapshot_lsn
TransactionId toplevel_xid
dlist_node catchange_node
SharedInvalidationMessage * invalidations
struct ReorderBufferTXN * toptxn
void * output_plugin_private
uint32 ninvalidations_distributed
dlist_node base_snapshot_node
SharedInvalidationMessage * invalidations_distributed
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
dlist_head txns_by_base_snapshot_lsn
dclist_head catchange_txns
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
MemoryContext tup_context
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
dlist_head toplevel_by_lsn
ReorderBufferBeginCB begin
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn