PostgreSQL Source Code git master
Data Structures | Macros | Typedefs | Enumerations | Functions | Variables
reorderbuffer.h File Reference
#include "access/htup_details.h"
#include "lib/ilist.h"
#include "lib/pairingheap.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
Include dependency graph for reorderbuffer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

 
struct   ReorderBufferTXN
 
struct   ReorderBuffer
 

Macros

#define  PG_LOGICAL_DIR   "pg_logical"
 
#define  PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"
 
#define  PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"
 
#define  RBTXN_HAS_CATALOG_CHANGES   0x0001
 
#define  RBTXN_IS_SUBXACT   0x0002
 
#define  RBTXN_IS_SERIALIZED   0x0004
 
#define  RBTXN_IS_SERIALIZED_CLEAR   0x0008
 
#define  RBTXN_IS_STREAMED   0x0010
 
#define  RBTXN_HAS_PARTIAL_CHANGE   0x0020
 
#define  RBTXN_IS_PREPARED   0x0040
 
#define  RBTXN_SKIPPED_PREPARE   0x0080
 
#define  RBTXN_HAS_STREAMABLE_CHANGE   0x0100
 
#define  RBTXN_SENT_PREPARE   0x0200
 
#define  RBTXN_IS_COMMITTED   0x0400
 
#define  RBTXN_IS_ABORTED   0x0800
 
#define  RBTXN_DISTR_INVAL_OVERFLOWED   0x1000
 
 
 
#define  rbtxn_is_known_subxact(txn)
 
#define  rbtxn_is_serialized(txn)
 
 
#define  rbtxn_has_partial_change(txn)
 
 
#define  rbtxn_is_streamed(txn)
 
#define  rbtxn_is_prepared(txn)
 
#define  rbtxn_sent_prepare(txn)
 
#define  rbtxn_is_committed(txn)
 
#define  rbtxn_is_aborted(txn)
 
#define  rbtxn_skip_prepared(txn)
 
 
#define  rbtxn_is_toptxn(txn)
 
#define  rbtxn_is_subtxn(txn)
 
#define  rbtxn_get_toptxn(txn)
 

Typedefs

 
 
 
typedef struct ReorderBuffer  ReorderBuffer
 
 
typedef void(*  ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
typedef void(*  ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
typedef void(*  ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(*  ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
 
typedef void(*  ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(*  ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
typedef void(*  ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
typedef void(*  ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
 
typedef void(*  ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
 
typedef void(*  ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
typedef void(*  ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
typedef void(*  ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
 
typedef void(*  ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
typedef void(*  ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
 

Enumerations

 
 

Functions

 
 
 
 
 
void  ReorderBufferFreeChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
 
 
void  ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void  ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
void  ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void  ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
 
 
 
void  ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bool  ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
 
void  ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
 
 
 
 
 
void  StartupReorderBuffer (void)
 

Variables

 
 

Macro Definition Documentation

PG_LOGICAL_DIR

#define PG_LOGICAL_DIR   "pg_logical"

Definition at line 22 of file reorderbuffer.h.

PG_LOGICAL_MAPPINGS_DIR

#define PG_LOGICAL_MAPPINGS_DIR   PG_LOGICAL_DIR "/mappings"

Definition at line 23 of file reorderbuffer.h.

PG_LOGICAL_SNAPSHOTS_DIR

#define PG_LOGICAL_SNAPSHOTS_DIR   PG_LOGICAL_DIR "/snapshots"

Definition at line 24 of file reorderbuffer.h.

RBTXN_DISTR_INVAL_OVERFLOWED

#define RBTXN_DISTR_INVAL_OVERFLOWED   0x1000

Definition at line 179 of file reorderbuffer.h.

rbtxn_distr_inval_overflowed

#define rbtxn_distr_inval_overflowed (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
)
#define RBTXN_DISTR_INVAL_OVERFLOWED
Definition: reorderbuffer.h:179

Definition at line 270 of file reorderbuffer.h.

rbtxn_get_toptxn

#define rbtxn_get_toptxn (   txn )
Value:
( \
rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
)

Definition at line 288 of file reorderbuffer.h.

RBTXN_HAS_CATALOG_CHANGES

#define RBTXN_HAS_CATALOG_CHANGES   0x0001

Definition at line 167 of file reorderbuffer.h.

rbtxn_has_catalog_changes

#define rbtxn_has_catalog_changes (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
)
#define RBTXN_HAS_CATALOG_CHANGES
Definition: reorderbuffer.h:167

Definition at line 184 of file reorderbuffer.h.

RBTXN_HAS_PARTIAL_CHANGE

#define RBTXN_HAS_PARTIAL_CHANGE   0x0020

Definition at line 172 of file reorderbuffer.h.

rbtxn_has_partial_change

#define rbtxn_has_partial_change (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
#define RBTXN_HAS_PARTIAL_CHANGE
Definition: reorderbuffer.h:172

Definition at line 208 of file reorderbuffer.h.

RBTXN_HAS_STREAMABLE_CHANGE

#define RBTXN_HAS_STREAMABLE_CHANGE   0x0100

Definition at line 175 of file reorderbuffer.h.

rbtxn_has_streamable_change

#define rbtxn_has_streamable_change (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
)
#define RBTXN_HAS_STREAMABLE_CHANGE
Definition: reorderbuffer.h:175

Definition at line 214 of file reorderbuffer.h.

RBTXN_IS_ABORTED

#define RBTXN_IS_ABORTED   0x0800

Definition at line 178 of file reorderbuffer.h.

rbtxn_is_aborted

#define rbtxn_is_aborted (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_ABORTED) != 0 \
)
#define RBTXN_IS_ABORTED
Definition: reorderbuffer.h:178

Definition at line 258 of file reorderbuffer.h.

RBTXN_IS_COMMITTED

#define RBTXN_IS_COMMITTED   0x0400

Definition at line 177 of file reorderbuffer.h.

rbtxn_is_committed

#define rbtxn_is_committed (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_COMMITTED) != 0 \
)
#define RBTXN_IS_COMMITTED
Definition: reorderbuffer.h:177

Definition at line 252 of file reorderbuffer.h.

rbtxn_is_known_subxact

#define rbtxn_is_known_subxact (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
)
#define RBTXN_IS_SUBXACT
Definition: reorderbuffer.h:168

Definition at line 190 of file reorderbuffer.h.

RBTXN_IS_PREPARED

#define RBTXN_IS_PREPARED   0x0040

Definition at line 173 of file reorderbuffer.h.

rbtxn_is_prepared

#define rbtxn_is_prepared (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
)
#define RBTXN_IS_PREPARED
Definition: reorderbuffer.h:173

Definition at line 240 of file reorderbuffer.h.

RBTXN_IS_SERIALIZED

#define RBTXN_IS_SERIALIZED   0x0004

Definition at line 169 of file reorderbuffer.h.

rbtxn_is_serialized

#define rbtxn_is_serialized (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
#define RBTXN_IS_SERIALIZED
Definition: reorderbuffer.h:169

Definition at line 196 of file reorderbuffer.h.

RBTXN_IS_SERIALIZED_CLEAR

#define RBTXN_IS_SERIALIZED_CLEAR   0x0008

Definition at line 170 of file reorderbuffer.h.

rbtxn_is_serialized_clear

#define rbtxn_is_serialized_clear (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
#define RBTXN_IS_SERIALIZED_CLEAR
Definition: reorderbuffer.h:170

Definition at line 202 of file reorderbuffer.h.

RBTXN_IS_STREAMED

#define RBTXN_IS_STREAMED   0x0010

Definition at line 171 of file reorderbuffer.h.

rbtxn_is_streamed

#define rbtxn_is_streamed (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
#define RBTXN_IS_STREAMED
Definition: reorderbuffer.h:171

Definition at line 228 of file reorderbuffer.h.

rbtxn_is_subtxn

#define rbtxn_is_subtxn (   txn )
Value:
( \
(txn)->toptxn != NULL \
)

Definition at line 282 of file reorderbuffer.h.

RBTXN_IS_SUBXACT

#define RBTXN_IS_SUBXACT   0x0002

Definition at line 168 of file reorderbuffer.h.

rbtxn_is_toptxn

#define rbtxn_is_toptxn (   txn )
Value:
( \
(txn)->toptxn == NULL \
)

Definition at line 276 of file reorderbuffer.h.

RBTXN_PREPARE_STATUS_MASK

#define RBTXN_PREPARE_STATUS_MASK   (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)

Definition at line 181 of file reorderbuffer.h.

RBTXN_SENT_PREPARE

#define RBTXN_SENT_PREPARE   0x0200

Definition at line 176 of file reorderbuffer.h.

rbtxn_sent_prepare

#define rbtxn_sent_prepare (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_SENT_PREPARE) != 0 \
)
#define RBTXN_SENT_PREPARE
Definition: reorderbuffer.h:176

Definition at line 246 of file reorderbuffer.h.

rbtxn_skip_prepared

#define rbtxn_skip_prepared (   txn )
Value:
( \
((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
)
#define RBTXN_SKIPPED_PREPARE
Definition: reorderbuffer.h:174

Definition at line 264 of file reorderbuffer.h.

RBTXN_SKIPPED_PREPARE

#define RBTXN_SKIPPED_PREPARE   0x0080

Definition at line 174 of file reorderbuffer.h.

Typedef Documentation

ReorderBuffer

typedef struct ReorderBuffer ReorderBuffer

Definition at line 471 of file reorderbuffer.h.

ReorderBufferApplyChangeCB

typedef void(* ReorderBufferApplyChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 474 of file reorderbuffer.h.

ReorderBufferApplyTruncateCB

typedef void(* ReorderBufferApplyTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 480 of file reorderbuffer.h.

ReorderBufferBeginCB

typedef void(* ReorderBufferBeginCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 487 of file reorderbuffer.h.

ReorderBufferBeginPrepareCB

typedef void(* ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn)

Definition at line 504 of file reorderbuffer.h.

ReorderBufferChange

ReorderBufferChangeType

ReorderBufferCommitCB

typedef void(* ReorderBufferCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 491 of file reorderbuffer.h.

ReorderBufferCommitPreparedCB

typedef void(* ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 513 of file reorderbuffer.h.

ReorderBufferMessageCB

typedef void(* ReorderBufferMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 496 of file reorderbuffer.h.

ReorderBufferPrepareCB

typedef void(* ReorderBufferPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 508 of file reorderbuffer.h.

ReorderBufferRollbackPreparedCB

typedef void(* ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)

Definition at line 518 of file reorderbuffer.h.

ReorderBufferStreamAbortCB

typedef void(* ReorderBufferStreamAbortCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)

Definition at line 534 of file reorderbuffer.h.

ReorderBufferStreamChangeCB

typedef void(* ReorderBufferStreamChangeCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)

Definition at line 549 of file reorderbuffer.h.

ReorderBufferStreamCommitCB

typedef void(* ReorderBufferStreamCommitCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)

Definition at line 544 of file reorderbuffer.h.

ReorderBufferStreamMessageCB

typedef void(* ReorderBufferStreamMessageCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)

Definition at line 555 of file reorderbuffer.h.

ReorderBufferStreamPrepareCB

typedef void(* ReorderBufferStreamPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)

Definition at line 539 of file reorderbuffer.h.

ReorderBufferStreamStartCB

typedef void(* ReorderBufferStreamStartCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)

Definition at line 524 of file reorderbuffer.h.

ReorderBufferStreamStopCB

typedef void(* ReorderBufferStreamStopCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)

Definition at line 529 of file reorderbuffer.h.

ReorderBufferStreamTruncateCB

typedef void(* ReorderBufferStreamTruncateCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)

Definition at line 563 of file reorderbuffer.h.

ReorderBufferTXN

ReorderBufferUpdateProgressTxnCB

typedef void(* ReorderBufferUpdateProgressTxnCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr lsn)

Definition at line 570 of file reorderbuffer.h.

Enumeration Type Documentation

DebugLogicalRepStreamingMode

Enumerator
DEBUG_LOGICAL_REP_STREAMING_BUFFERED 
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE 

Definition at line 31 of file reorderbuffer.h.

ReorderBufferChangeType

Enumerator
REORDER_BUFFER_CHANGE_INSERT 
REORDER_BUFFER_CHANGE_UPDATE 
REORDER_BUFFER_CHANGE_DELETE 
REORDER_BUFFER_CHANGE_MESSAGE 
REORDER_BUFFER_CHANGE_INVALIDATION 
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT 
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID 
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM 
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT 
REORDER_BUFFER_CHANGE_TRUNCATE 

Definition at line 50 of file reorderbuffer.h.

51{
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
Definition: reorderbuffer.h:60
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
Definition: reorderbuffer.h:57
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53

Function Documentation

ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 3086 of file reorderbuffer.c.

3088{
3089 ReorderBufferTXN *txn;
3090
3091 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3092 false);
3093
3094 /* unknown, nothing to remove */
3095 if (txn == NULL)
3096 return;
3097
3098 txn->abort_time = abort_time;
3099
3100 /* For streamed transactions notify the remote node about the abort. */
3101 if (rbtxn_is_streamed(txn))
3102 {
3103 rb->stream_abort(rb, txn, lsn);
3104
3105 /*
3106 * We might have decoded changes for this transaction that could load
3107 * the cache as per the current transaction's view (consider DDL's
3108 * happened in this transaction). We don't want the decoding of future
3109 * transactions to use those cache entries so execute only the inval
3110 * messages in this transaction.
3111 */
3112 if (txn->ninvalidations > 0)
3114 txn->invalidations);
3115 }
3116
3117 /* cosmetic... */
3118 txn->final_lsn = lsn;
3119
3120 /* remove potential on-disk data, and deallocate */
3121 ReorderBufferCleanupTXN(rb, txn);
3122}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
Definition: reorderbuffer.c:652
#define rbtxn_is_streamed(txn)
Definition: reorderbuffer.h:228
uint32 ninvalidations
Definition: reorderbuffer.h:429
SharedInvalidationMessage * invalidations
Definition: reorderbuffer.h:430
TimestampTz abort_time
Definition: reorderbuffer.h:361
XLogRecPtr final_lsn
Definition: reorderbuffer.h:332
ReorderBufferStreamAbortCB stream_abort
Definition: reorderbuffer.h:630
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBuffer::stream_abort.

Referenced by DecodeAbort().

ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBufferrb,
TransactionId  oldestRunningXid 
)

Definition at line 3132 of file reorderbuffer.c.

3133{
3135
3136 /*
3137 * Iterate through all (potential) toplevel TXNs and abort all that are
3138 * older than what possibly can be running. Once we've found the first
3139 * that is alive we stop, there might be some that acquired an xid earlier
3140 * but started writing later, but it's unlikely and they will be cleaned
3141 * up in a later call to this function.
3142 */
3144 {
3145 ReorderBufferTXN *txn;
3146
3147 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3148
3149 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3150 {
3151 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3152
3153 /* Notify the remote node about the crash/immediate restart. */
3154 if (rbtxn_is_streamed(txn))
3155 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3156
3157 /* remove potential on-disk data, and deallocate this tx */
3158 ReorderBufferCleanupTXN(rb, txn);
3159 }
3160 else
3161 return;
3162 }
3163}
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:226
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
TransactionId xid
Definition: reorderbuffer.h:299
dlist_head toplevel_by_lsn
Definition: reorderbuffer.h:585
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

ReorderBufferAddDistributedInvalidations()

void ReorderBufferAddDistributedInvalidations ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
)

Definition at line 3584 of file reorderbuffer.c.

3587{
3588 ReorderBufferTXN *txn;
3589 MemoryContext oldcontext;
3590
3591 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3592
3593 oldcontext = MemoryContextSwitchTo(rb->context);
3594
3595 /*
3596 * Collect all the invalidations under the top transaction, if available,
3597 * so that we can execute them all together. See comments
3598 * ReorderBufferAddInvalidations.
3599 */
3600 txn = rbtxn_get_toptxn(txn);
3601
3602 Assert(nmsgs > 0);
3603
3605 {
3606 /*
3607 * Check the transaction has enough space for storing distributed
3608 * invalidation messages.
3609 */
3611 {
3612 /*
3613 * Mark the invalidation message as overflowed and free up the
3614 * messages accumulated so far.
3615 */
3617
3619 {
3621 txn->invalidations_distributed = NULL;
3623 }
3624 }
3625 else
3628 msgs, nmsgs);
3629 }
3630
3631 /* Queue the invalidation messages into the transaction */
3632 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3633
3634 MemoryContextSwitchTo(oldcontext);
3635}
Assert(PointerIsAligned(start, uint64))
void pfree(void *pointer)
Definition: mcxt.c:1594
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static void ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, uint32 *ninvals_out, SharedInvalidationMessage *msgs_new, Size nmsgs_new)
#define MAX_DISTR_INVAL_MSG_PER_TXN
Definition: reorderbuffer.c:125
static void ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
#define rbtxn_get_toptxn(txn)
Definition: reorderbuffer.h:288
#define rbtxn_distr_inval_overflowed(txn)
Definition: reorderbuffer.h:270
uint32 ninvalidations_distributed
Definition: reorderbuffer.h:435
SharedInvalidationMessage * invalidations_distributed
Definition: reorderbuffer.h:436
MemoryContext context
Definition: reorderbuffer.h:656

References Assert(), ReorderBuffer::context, ReorderBufferTXN::invalidations_distributed, MAX_DISTR_INVAL_MSG_PER_TXN, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations_distributed, pfree(), RBTXN_DISTR_INVAL_OVERFLOWED, rbtxn_distr_inval_overflowed, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildDistributeSnapshotAndInval().

ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
)

Definition at line 3543 of file reorderbuffer.c.

3546{
3547 ReorderBufferTXN *txn;
3548 MemoryContext oldcontext;
3549
3550 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3551
3552 oldcontext = MemoryContextSwitchTo(rb->context);
3553
3554 /*
3555 * Collect all the invalidations under the top transaction, if available,
3556 * so that we can execute them all together. See comments atop this
3557 * function.
3558 */
3559 txn = rbtxn_get_toptxn(txn);
3560
3561 Assert(nmsgs > 0);
3562
3564 &txn->ninvalidations,
3565 msgs, nmsgs);
3566
3567 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3568
3569 MemoryContextSwitchTo(oldcontext);
3570}

References Assert(), ReorderBuffer::context, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), and ReorderBufferTXNByXid().

Referenced by xact_decode().

ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3356 of file reorderbuffer.c.

3358{
3360
3361 change->data.command_id = cid;
3363
3364 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3365}
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
Definition: reorderbuffer.c:809
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
Definition: reorderbuffer.c:506
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
CommandId command_id
Definition: reorderbuffer.h:136
union ReorderBufferChange::@114 data

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferAllocChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3455 of file reorderbuffer.c.

3459{
3461 ReorderBufferTXN *txn;
3462
3463 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3464
3465 change->data.tuplecid.locator = locator;
3466 change->data.tuplecid.tid = tid;
3467 change->data.tuplecid.cmin = cmin;
3468 change->data.tuplecid.cmax = cmax;
3469 change->data.tuplecid.combocid = combocid;
3470 change->lsn = lsn;
3471 change->txn = txn;
3473
3474 dlist_push_tail(&txn->tuplecids, &change->node);
3475 txn->ntuplecids++;
3476}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
struct ReorderBufferChange::@114::@118 tuplecid
ItemPointerData tid
Definition: reorderbuffer.h:145
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
Definition: reorderbuffer.h:144
CommandId combocid
Definition: reorderbuffer.h:148
XLogRecPtr lsn
Definition: reorderbuffer.h:78
dlist_head tuplecids
Definition: reorderbuffer.h:404

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferAllocChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3307 of file reorderbuffer.c.

3309{
3311
3312 change->data.snapshot = snap;
3314
3315 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3316}

References ReorderBufferChange::action, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, ReorderBufferAllocChange(), ReorderBufferQueueChange(), and ReorderBufferChange::snapshot.

Referenced by SnapBuildDistributeSnapshotAndInval().

ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )

Definition at line 324 of file reorderbuffer.c.

325{
326 ReorderBuffer *buffer;
327 HASHCTL hash_ctl;
328 MemoryContext new_ctx;
329
330 Assert(MyReplicationSlot != NULL);
331
332 /* allocate memory in own context, to have better accountability */
334 "ReorderBuffer",
336
337 buffer =
338 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
339
340 memset(&hash_ctl, 0, sizeof(hash_ctl));
341
342 buffer->context = new_ctx;
343
344 buffer->change_context = SlabContextCreate(new_ctx,
345 "Change",
347 sizeof(ReorderBufferChange));
348
349 buffer->txn_context = SlabContextCreate(new_ctx,
350 "TXN",
352 sizeof(ReorderBufferTXN));
353
354 /*
355 * To minimize memory fragmentation caused by long-running transactions
356 * with changes spanning multiple memory blocks, we use a single
357 * fixed-size memory block for decoded tuple storage. The performance
358 * testing showed that the default memory block size maintains logical
359 * decoding performance without causing fragmentation due to concurrent
360 * transactions. One might think that we can use the max size as
361 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
362 * the memory fragmentation.
363 */
364 buffer->tup_context = GenerationContextCreate(new_ctx,
365 "Tuples",
369
370 hash_ctl.keysize = sizeof(TransactionId);
371 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
372 hash_ctl.hcxt = buffer->context;
373
374 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
376
378 buffer->by_txn_last_txn = NULL;
379
380 buffer->outbuf = NULL;
381 buffer->outbufsize = 0;
382 buffer->size = 0;
383
384 /* txn_heap is ordered by transaction size */
386
387 buffer->spillTxns = 0;
388 buffer->spillCount = 0;
389 buffer->spillBytes = 0;
390 buffer->streamTxns = 0;
391 buffer->streamCount = 0;
392 buffer->streamBytes = 0;
393 buffer->totalTxns = 0;
394 buffer->totalBytes = 0;
395
397
398 dlist_init(&buffer->toplevel_by_lsn);
400 dclist_init(&buffer->catchange_txns);
401
402 /*
403 * Ensure there's no stale data from prior uses of this slot, in case some
404 * prior exit avoided calling ReorderBufferFree. Failure to do this can
405 * produce duplicated txns, and it's very cheap if there's nothing there.
406 */
408
409 return buffer;
410}
#define NameStr(name)
Definition: c.h:751
uint32 TransactionId
Definition: c.h:657
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:162
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1229
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
Definition: hsearch.h:66
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
HTAB * by_txn
Definition: reorderbuffer.h:579
int64 streamBytes
Definition: reorderbuffer.h:691
int64 streamCount
Definition: reorderbuffer.h:690
dlist_head txns_by_base_snapshot_lsn
Definition: reorderbuffer.h:594
int64 totalBytes
Definition: reorderbuffer.h:698
int64 streamTxns
Definition: reorderbuffer.h:689
char * outbuf
Definition: reorderbuffer.h:668
dclist_head catchange_txns
Definition: reorderbuffer.h:599
int64 spillCount
Definition: reorderbuffer.h:685
int64 spillBytes
Definition: reorderbuffer.h:686
MemoryContext change_context
Definition: reorderbuffer.h:661
ReorderBufferTXN * by_txn_last_txn
Definition: reorderbuffer.h:606
TransactionId by_txn_last_xid
Definition: reorderbuffer.h:605
MemoryContext tup_context
Definition: reorderbuffer.h:663
int64 totalTxns
Definition: reorderbuffer.h:697
pairingheap * txn_heap
Definition: reorderbuffer.h:675
MemoryContext txn_context
Definition: reorderbuffer.h:662
XLogRecPtr current_restart_decoding_lsn
Definition: reorderbuffer.h:665
int64 spillTxns
Definition: reorderbuffer.h:684
ReplicationSlotPersistentData data
Definition: slot.h:192
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

ReorderBufferAllocChange()

ReorderBufferChange * ReorderBufferAllocChange ( ReorderBufferrb )

Definition at line 506 of file reorderbuffer.c.

507{
508 ReorderBufferChange *change;
509
510 change = (ReorderBufferChange *)
512
513 memset(change, 0, sizeof(ReorderBufferChange));
514 return change;
515}

References ReorderBuffer::change_context, and MemoryContextAlloc().

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddNewTupleCids(), ReorderBufferAddSnapshot(), ReorderBufferQueueInvalidations(), ReorderBufferQueueMessage(), and ReorderBufferRestoreChange().

ReorderBufferAllocRelids()

Oid * ReorderBufferAllocRelids ( ReorderBufferrb,
int  nrelids 
)

Definition at line 624 of file reorderbuffer.c.

625{
626 Oid *relids;
627 Size alloc_len;
628
629 alloc_len = sizeof(Oid) * nrelids;
630
631 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
632
633 return relids;
634}
size_t Size
Definition: c.h:610
unsigned int Oid
Definition: postgres_ext.h:32

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

ReorderBufferAllocTupleBuf()

HeapTuple ReorderBufferAllocTupleBuf ( ReorderBufferrb,
Size  tuple_len 
)

Definition at line 591 of file reorderbuffer.c.

592{
593 HeapTuple tuple;
594 Size alloc_len;
595
596 alloc_len = tuple_len + SizeofHeapTupleHeader;
597
599 HEAPTUPLESIZE + alloc_len);
600 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
601
602 return tuple;
603}
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBufferrb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1098 of file reorderbuffer.c.

1100{
1101 ReorderBufferTXN *txn;
1102 ReorderBufferTXN *subtxn;
1103 bool new_top;
1104 bool new_sub;
1105
1106 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1107 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1108
1109 if (!new_sub)
1110 {
1111 if (rbtxn_is_known_subxact(subtxn))
1112 {
1113 /* already associated, nothing to do */
1114 return;
1115 }
1116 else
1117 {
1118 /*
1119 * We already saw this transaction, but initially added it to the
1120 * list of top-level txns. Now that we know it's not top-level,
1121 * remove it from there.
1122 */
1123 dlist_delete(&subtxn->node);
1124 }
1125 }
1126
1127 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1128 subtxn->toplevel_xid = xid;
1129 Assert(subtxn->nsubtxns == 0);
1130
1131 /* set the reference to top-level transaction */
1132 subtxn->toptxn = txn;
1133
1134 /* add to subtransaction list */
1135 dlist_push_tail(&txn->subtxns, &subtxn->node);
1136 txn->nsubtxns++;
1137
1138 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1140
1141 /* Verify LSN-ordering invariant */
1143}
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
Definition: reorderbuffer.c:941
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define rbtxn_is_known_subxact(txn)
Definition: reorderbuffer.h:190
TransactionId toplevel_xid
Definition: reorderbuffer.h:302
struct ReorderBufferTXN * toptxn
Definition: reorderbuffer.h:340
dlist_node node
Definition: reorderbuffer.h:444
dlist_head subtxns
Definition: reorderbuffer.h:422

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2883 of file reorderbuffer.c.

2887{
2888 ReorderBufferTXN *txn;
2889
2890 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2891 false);
2892
2893 /* unknown transaction, nothing to replay */
2894 if (txn == NULL)
2895 return;
2896
2897 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2898 origin_id, origin_lsn);
2899}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBufferrb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1218 of file reorderbuffer.c.

1221{
1222 ReorderBufferTXN *subtxn;
1223
1224 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1225 InvalidXLogRecPtr, false);
1226
1227 /*
1228 * No need to do anything if that subtxn didn't contain any changes
1229 */
1230 if (!subtxn)
1231 return;
1232
1233 subtxn->final_lsn = commit_lsn;
1234 subtxn->end_lsn = end_lsn;
1235
1236 /*
1237 * Assign this subxact as a child of the toplevel xact (no-op if already
1238 * done.)
1239 */
1241}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
XLogRecPtr end_lsn
Definition: reorderbuffer.h:337

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 3000 of file reorderbuffer.c.

3005{
3006 ReorderBufferTXN *txn;
3007 XLogRecPtr prepare_end_lsn;
3008 TimestampTz prepare_time;
3009
3010 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
3011
3012 /* unknown transaction, nothing to do */
3013 if (txn == NULL)
3014 return;
3015
3016 /*
3017 * By this time the txn has the prepare record information, remember it to
3018 * be later used for rollback.
3019 */
3020 prepare_end_lsn = txn->end_lsn;
3021 prepare_time = txn->prepare_time;
3022
3023 /* add the gid in the txn */
3024 txn->gid = pstrdup(gid);
3025
3026 /*
3027 * It is possible that this transaction is not decoded at prepare time
3028 * either because by that time we didn't have a consistent snapshot, or
3029 * two_phase was not enabled, or it was decoded earlier but we have
3030 * restarted. We only need to send the prepare if it was not decoded
3031 * earlier. We don't need to decode the xact for aborts if it is not done
3032 * already.
3033 */
3034 if ((txn->final_lsn < two_phase_at) && is_commit)
3035 {
3036 /*
3037 * txn must have been marked as a prepared transaction and skipped but
3038 * not sent a prepare. Also, the prepare info must have been updated
3039 * in txn even if we skip prepare.
3040 */
3044
3045 /*
3046 * By this time the txn has the prepare record information and it is
3047 * important to use that so that downstream gets the accurate
3048 * information. If instead, we have passed commit information here
3049 * then downstream can behave as it has already replayed commit
3050 * prepared after the restart.
3051 */
3052 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3053 txn->prepare_time, txn->origin_id, txn->origin_lsn);
3054 }
3055
3056 txn->final_lsn = commit_lsn;
3057 txn->end_lsn = end_lsn;
3058 txn->commit_time = commit_time;
3059 txn->origin_id = origin_id;
3060 txn->origin_lsn = origin_lsn;
3061
3062 if (is_commit)
3063 rb->commit_prepared(rb, txn, commit_lsn);
3064 else
3065 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3066
3067 /* cleanup: make sure there's no cache pollution */
3069 txn->invalidations);
3070 ReorderBufferCleanupTXN(rb, txn);
3071}
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE_STATUS_MASK
Definition: reorderbuffer.h:181
TimestampTz commit_time
Definition: reorderbuffer.h:359
RepOriginId origin_id
Definition: reorderbuffer.h:350
XLogRecPtr origin_lsn
Definition: reorderbuffer.h:351
TimestampTz prepare_time
Definition: reorderbuffer.h:360
ReorderBufferCommitPreparedCB commit_prepared
Definition: reorderbuffer.h:622
ReorderBufferRollbackPreparedCB rollback_prepared
Definition: reorderbuffer.h:623
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, and ReorderBufferTXN::txn_flags.

Referenced by DecodeAbort(), and DecodeCommit().

ReorderBufferForget()

void ReorderBufferForget ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3179 of file reorderbuffer.c.

3180{
3181 ReorderBufferTXN *txn;
3182
3183 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3184 false);
3185
3186 /* unknown, nothing to forget */
3187 if (txn == NULL)
3188 return;
3189
3190 /* this transaction mustn't be streamed */
3192
3193 /* cosmetic... */
3194 txn->final_lsn = lsn;
3195
3196 /*
3197 * Process only cache invalidation messages in this transaction if there
3198 * are any. Even if we're not interested in the transaction's contents, it
3199 * could have manipulated the catalog and we need to update the caches
3200 * according to that.
3201 */
3202 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3204 txn->invalidations);
3205 else
3206 Assert(txn->ninvalidations == 0);
3207
3208 /* remove potential on-disk data, and deallocate */
3209 ReorderBufferCleanupTXN(rb, txn);
3210}
Snapshot base_snapshot
Definition: reorderbuffer.h:369

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

ReorderBufferFree()

void ReorderBufferFree ( ReorderBufferrb )

Definition at line 416 of file reorderbuffer.c.

417{
418 MemoryContext context = rb->context;
419
420 /*
421 * We free separately allocated data by entirely scrapping reorderbuffer's
422 * memory context.
423 */
424 MemoryContextDelete(context);
425
426 /* Free disk space used by unconsumed reorder buffers */
428}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

ReorderBufferFreeChange()

void ReorderBufferFreeChange ( ReorderBufferrb,
ReorderBufferChangechange,
bool  upd_mem 
)

Definition at line 521 of file reorderbuffer.c.

523{
524 /* update memory accounting info */
525 if (upd_mem)
526 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
528
529 /* free contained data */
530 switch (change->action)
531 {
536 if (change->data.tp.newtuple)
537 {
539 change->data.tp.newtuple = NULL;
540 }
541
542 if (change->data.tp.oldtuple)
543 {
545 change->data.tp.oldtuple = NULL;
546 }
547 break;
549 if (change->data.msg.prefix != NULL)
550 pfree(change->data.msg.prefix);
551 change->data.msg.prefix = NULL;
552 if (change->data.msg.message != NULL)
553 pfree(change->data.msg.message);
554 change->data.msg.message = NULL;
555 break;
557 if (change->data.inval.invalidations)
558 pfree(change->data.inval.invalidations);
559 change->data.inval.invalidations = NULL;
560 break;
562 if (change->data.snapshot)
563 {
565 change->data.snapshot = NULL;
566 }
567 break;
568 /* no data in addition to the struct itself */
570 if (change->data.truncate.relids != NULL)
571 {
573 change->data.truncate.relids = NULL;
574 }
575 break;
580 break;
581 }
582
583 pfree(change);
584}
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
Definition: reorderbuffer.c:640
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
Definition: reorderbuffer.c:609
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
struct ReorderBufferChange::@114::@116 truncate
HeapTuple newtuple
Definition: reorderbuffer.h:106
struct ReorderBufferChange::@114::@117 msg
struct ReorderBufferChange::@114::@115 tp
HeapTuple oldtuple
Definition: reorderbuffer.h:104
SharedInvalidationMessage * invalidations
Definition: reorderbuffer.h:155
struct ReorderBufferChange::@114::@119 inval

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeRelids(), ReorderBufferFreeSnap(), ReorderBufferFreeTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

ReorderBufferFreeRelids()

void ReorderBufferFreeRelids ( ReorderBufferrb,
Oidrelids 
)

Definition at line 640 of file reorderbuffer.c.

641{
642 pfree(relids);
643}

References pfree().

Referenced by ReorderBufferFreeChange().

ReorderBufferFreeTupleBuf()

void ReorderBufferFreeTupleBuf ( HeapTuple  tuple )

Definition at line 609 of file reorderbuffer.c.

610{
611 pfree(tuple);
612}

References pfree().

Referenced by ReorderBufferFreeChange().

ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBufferrb )

Definition at line 3692 of file reorderbuffer.c.

3693{
3694 dlist_iter iter;
3695 TransactionId *xids = NULL;
3696 size_t xcnt = 0;
3697
3698 /* Quick return if the list is empty */
3699 if (dclist_count(&rb->catchange_txns) == 0)
3700 return NULL;
3701
3702 /* Initialize XID array */
3703 xids = (TransactionId *) palloc(sizeof(TransactionId) *
3705 dclist_foreach(iter, &rb->catchange_txns)
3706 {
3708 catchange_node,
3709 iter.cur);
3710
3712
3713 xids[xcnt++] = txn->xid;
3714 }
3715
3716 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3717
3718 Assert(xcnt == dclist_count(&rb->catchange_txns));
3719 return xids;
3720}
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
void * palloc(Size size)
Definition: mcxt.c:1365
#define qsort(a, b, c, d)
Definition: port.h:479
#define rbtxn_has_catalog_changes(txn)
Definition: reorderbuffer.h:184
Definition: ilist.h:178
dlist_node * cur
Definition: ilist.h:179
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

ReorderBufferGetInvalidations()

uint32 ReorderBufferGetInvalidations ( ReorderBufferrb,
TransactionId  xid,
)

Definition at line 5616 of file reorderbuffer.c.

5618{
5619 ReorderBufferTXN *txn;
5620
5621 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5622 false);
5623
5624 if (txn == NULL)
5625 return 0;
5626
5627 *msgs = txn->invalidations;
5628
5629 return txn->ninvalidations;
5630}

References ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, and ReorderBufferTXNByXid().

Referenced by SnapBuildDistributeSnapshotAndInval().

ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBufferrb )

Definition at line 1043 of file reorderbuffer.c.

1044{
1045 ReorderBufferTXN *txn;
1046
1048
1050 return NULL;
1051
1053
1056 return txn;
1057}
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
XLogRecPtr first_lsn
Definition: reorderbuffer.h:315

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBufferrb )

Definition at line 1071 of file reorderbuffer.c.

1072{
1073 ReorderBufferTXN *txn;
1074
1076
1078 return InvalidTransactionId;
1079
1080 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1082 return txn->base_snapshot->xmin;
1083}
TransactionId xmin
Definition: snapshot.h:153

References AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, dlist_head_element, dlist_is_empty(), InvalidTransactionId, ReorderBuffer::txns_by_base_snapshot_lsn, and SnapshotData::xmin.

Referenced by SnapBuildProcessRunningXacts().

ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBufferrb,
uint32  ninvalidations,
SharedInvalidationMessageinvalidations 
)

Definition at line 3252 of file reorderbuffer.c.

3254{
3255 bool use_subtxn = IsTransactionOrTransactionBlock();
3258 int i;
3259
3260 if (use_subtxn)
3262
3263 /*
3264 * Force invalidations to happen outside of a valid transaction - that way
3265 * entries will just be marked as invalid without accessing the catalog.
3266 * That's advantageous because we don't need to setup the full state
3267 * necessary for catalog access.
3268 */
3269 if (use_subtxn)
3271
3272 for (i = 0; i < ninvalidations; i++)
3273 LocalExecuteInvalidationMessage(&invalidations[i]);
3274
3275 if (use_subtxn)
3276 {
3279 CurrentResourceOwner = cowner;
3280 }
3281}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:823
i
int i
Definition: isn.c:77
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5001
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4706
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4808
void AbortCurrentTransaction(void)
Definition: xact.c:3463

References AbortCurrentTransaction(), BeginInternalSubTransaction(), CurrentMemoryContext, CurrentResourceOwner, i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), MemoryContextSwitchTo(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3221 of file reorderbuffer.c.

3222{
3223 ReorderBufferTXN *txn;
3224
3225 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3226 false);
3227
3228 /* unknown, nothing to do */
3229 if (txn == NULL)
3230 return;
3231
3232 /*
3233 * Process cache invalidation messages if there are any. Even if we're not
3234 * interested in the transaction's contents, it could have manipulated the
3235 * catalog and we need to update the caches according to that.
3236 */
3237 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3239 txn->invalidations);
3240 else
3241 Assert(txn->ninvalidations == 0);
3242}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBufferrb,
TransactionId  xid,
char *  gid 
)

Definition at line 2959 of file reorderbuffer.c.

2961{
2962 ReorderBufferTXN *txn;
2963
2964 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2965 false);
2966
2967 /* unknown transaction, nothing to replay */
2968 if (txn == NULL)
2969 return;
2970
2971 /*
2972 * txn must have been marked as a prepared transaction and must have
2973 * neither been skipped nor sent a prepare. Also, the prepare info must
2974 * have been updated in it by now.
2975 */
2978
2979 txn->gid = pstrdup(gid);
2980
2981 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2982 txn->prepare_time, txn->origin_id, txn->origin_lsn);
2983
2984 /*
2985 * Send a prepare if not already done so. This might occur if we have
2986 * detected a concurrent abort while replaying the non-streaming
2987 * transaction.
2988 */
2989 if (!rbtxn_sent_prepare(txn))
2990 {
2991 rb->prepare(rb, txn, txn->final_lsn);
2993 }
2994}
#define rbtxn_sent_prepare(txn)
Definition: reorderbuffer.h:246
ReorderBufferPrepareCB prepare
Definition: reorderbuffer.h:621

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferReplay(), ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3294 of file reorderbuffer.c.

3295{
3296 /* many records won't have an xid assigned, centralize check here */
3297 if (xid != InvalidTransactionId)
3298 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3299}

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChangechange,
bool  toast_insert 
)

Definition at line 809 of file reorderbuffer.c.

811{
812 ReorderBufferTXN *txn;
813
814 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
815
816 /*
817 * If we have detected that the transaction is aborted while streaming the
818 * previous changes or by checking its CLOG, there is no point in
819 * collecting further changes for it.
820 */
821 if (rbtxn_is_aborted(txn))
822 {
823 /*
824 * We don't need to update memory accounting for this change as we
825 * have not added it to the queue yet.
826 */
827 ReorderBufferFreeChange(rb, change, false);
828 return;
829 }
830
831 /*
832 * The changes that are sent downstream are considered streamable. We
833 * remember such transactions so that only those will later be considered
834 * for streaming.
835 */
836 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
842 {
843 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
844
846 }
847
848 change->lsn = lsn;
849 change->txn = txn;
850
852 dlist_push_tail(&txn->changes, &change->node);
853 txn->nentries++;
854 txn->nentries_mem++;
855
856 /* update memory accounting information */
857 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
859
860 /* process partial change */
861 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
862
863 /* check the memory limits and evict something if needed */
865}
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
Definition: reorderbuffer.c:521
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
Definition: reorderbuffer.c:740
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define rbtxn_is_aborted(txn)
Definition: reorderbuffer.h:258
uint64 nentries_mem
Definition: reorderbuffer.h:391
dlist_head changes
Definition: reorderbuffer.h:397

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, rbtxn_is_aborted, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferFreeChange(), ReorderBufferProcessPartialChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), ReorderBufferQueueInvalidations(), and ReorderBufferQueueMessage().

ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBufferrb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 872 of file reorderbuffer.c.

876{
877 if (transactional)
878 {
879 MemoryContext oldcontext;
880 ReorderBufferChange *change;
881
883
884 /*
885 * We don't expect snapshots for transactional changes - we'll use the
886 * snapshot derived later during apply (unless the change gets
887 * skipped).
888 */
889 Assert(!snap);
890
891 oldcontext = MemoryContextSwitchTo(rb->context);
892
893 change = ReorderBufferAllocChange(rb);
895 change->data.msg.prefix = pstrdup(prefix);
896 change->data.msg.message_size = message_size;
897 change->data.msg.message = palloc(message_size);
898 memcpy(change->data.msg.message, message, message_size);
899
900 ReorderBufferQueueChange(rb, xid, lsn, change, false);
901
902 MemoryContextSwitchTo(oldcontext);
903 }
904 else
905 {
906 ReorderBufferTXN *txn = NULL;
907 volatile Snapshot snapshot_now = snap;
908
909 /* Non-transactional changes require a valid snapshot. */
910 Assert(snapshot_now);
911
912 if (xid != InvalidTransactionId)
913 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
914
915 /* setup snapshot to allow catalog access */
916 SetupHistoricSnapshot(snapshot_now, NULL);
917 PG_TRY();
918 {
919 rb->message(rb, txn, lsn, false, prefix, message_size, message);
920
922 }
923 PG_CATCH();
924 {
926 PG_RE_THROW();
927 }
928 PG_END_TRY();
929 }
930}
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1683
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1667
ReorderBufferMessageCB message
Definition: reorderbuffer.h:615

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferAllocChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2906 of file reorderbuffer.c.

2910{
2911 ReorderBufferTXN *txn;
2912
2913 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2914
2915 /* unknown transaction, nothing to do */
2916 if (txn == NULL)
2917 return false;
2918
2919 /*
2920 * Remember the prepare information to be later used by commit prepared in
2921 * case we skip doing prepare.
2922 */
2923 txn->final_lsn = prepare_lsn;
2924 txn->end_lsn = end_lsn;
2925 txn->prepare_time = prepare_time;
2926 txn->origin_id = origin_id;
2927 txn->origin_lsn = origin_lsn;
2928
2929 /* Mark this transaction as a prepared transaction */
2932
2933 return true;
2934}

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3325 of file reorderbuffer.c.

3327{
3328 ReorderBufferTXN *txn;
3329 bool is_new;
3330
3331 Assert(snap != NULL);
3332
3333 /*
3334 * Fetch the transaction to operate on. If we know it's a subtransaction,
3335 * operate on its top-level transaction instead.
3336 */
3337 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3338 if (rbtxn_is_known_subxact(txn))
3339 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3340 NULL, InvalidXLogRecPtr, false);
3341 Assert(txn->base_snapshot == NULL);
3342
3343 txn->base_snapshot = snap;
3344 txn->base_snapshot_lsn = lsn;
3346
3348}
XLogRecPtr base_snapshot_lsn
Definition: reorderbuffer.h:370
dlist_node base_snapshot_node
Definition: reorderbuffer.h:371

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBufferrb,
XLogRecPtr  ptr 
)

Definition at line 1086 of file reorderbuffer.c.

1087{
1089}

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBufferrb,
TransactionId  xid 
)

Definition at line 2938 of file reorderbuffer.c.

2939{
2940 ReorderBufferTXN *txn;
2941
2942 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2943
2944 /* unknown transaction, nothing to do */
2945 if (txn == NULL)
2946 return;
2947
2948 /* txn must have been marked as a prepared transaction */
2951}

References Assert(), InvalidXLogRecPtr, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBufferrb,
TransactionId  xid 
)

Definition at line 3744 of file reorderbuffer.c.

3745{
3746 ReorderBufferTXN *txn;
3747
3748 txn = ReorderBufferTXNByXid(rb, xid, false,
3749 NULL, InvalidXLogRecPtr, false);
3750
3751 /* transaction isn't known yet, ergo no snapshot */
3752 if (txn == NULL)
3753 return false;
3754
3755 /* a known subtxn? operate on top-level txn instead */
3756 if (rbtxn_is_known_subxact(txn))
3757 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3758 NULL, InvalidXLogRecPtr, false);
3759
3760 return txn->base_snapshot != NULL;
3761}

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeSnapshotAndInval(), and SnapBuildProcessChange().

ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBufferrb,
TransactionId  xid 
)

Definition at line 3727 of file reorderbuffer.c.

3728{
3729 ReorderBufferTXN *txn;
3730
3731 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3732 false);
3733 if (txn == NULL)
3734 return false;
3735
3736 return rbtxn_has_catalog_changes(txn);
3737}

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBufferrb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3654 of file reorderbuffer.c.

3656{
3657 ReorderBufferTXN *txn;
3658
3659 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3660
3661 if (!rbtxn_has_catalog_changes(txn))
3662 {
3665 }
3666
3667 /*
3668 * Mark top-level transaction as having catalog changes too if one of its
3669 * children has so that the ReorderBufferBuildTupleCidHash can
3670 * conveniently check just top-level transaction and decide whether to
3671 * build the hash table or not.
3672 */
3673 if (rbtxn_is_subtxn(txn))
3674 {
3675 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3676
3677 if (!rbtxn_has_catalog_changes(toptxn))
3678 {
3681 }
3682 }
3683}
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
Definition: reorderbuffer.h:282
dlist_node catchange_node
Definition: reorderbuffer.h:449

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4922 of file reorderbuffer.c.

4923{
4924 DIR *logical_dir;
4925 struct dirent *logical_de;
4926
4927 logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4928 while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4929 {
4930 if (strcmp(logical_de->d_name, ".") == 0 ||
4931 strcmp(logical_de->d_name, "..") == 0)
4932 continue;
4933
4934 /* if it cannot be a slot, skip the directory */
4935 if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
4936 continue;
4937
4938 /*
4939 * ok, has to be a surviving logical slot, iterate and delete
4940 * everything starting with xid-*
4941 */
4943 }
4944 FreeDir(logical_dir);
4945}
int FreeDir(DIR *dir)
Definition: fd.c:3022
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2904
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2970
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:272
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

Variable Documentation

debug_logical_replication_streaming

PGDLLIMPORT int debug_logical_replication_streaming
extern

logical_decoding_work_mem

PGDLLIMPORT int logical_decoding_work_mem
extern

Definition at line 225 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

AltStyle によって変換されたページ (->オリジナル) /