1/*-------------------------------------------------------------------------
4 * Logical replication progress tracking support.
6 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
9 * src/backend/replication/logical/origin.c
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
45 * There are several levels of locking at work:
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
65 * ---------------------------------------------------------------------------
92#include "utils/fmgroids.h"
99/* paths for replication origin checkpoint files */
100 #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
107 * Replay progress of a single remote node.
112 * Local identifier for the remote node.
117 * Location of the latest commit from the remote side.
122 * Remember the local lsn of the commit record so we can XLogFlush() to it
123 * during a checkpoint so we know the commit record actually is safe on
129 * PID of backend that's acquired slot, or 0 if none.
134 * Condition variable that's signaled when acquired_by changes.
139 * Lock protecting remote_lsn and local_lsn.
145 * On disk version of ReplicationState.
156 /* Tranche to use for per-origin LWLocks */
158 /* Array of length max_active_replication_origins */
162/* external variables */
168 * Base address into a shared memory array of replication states of size
169 * max_active_replication_origins.
174 * Actual shared memory block (replication_states[] is now part of this).
179 * We keep a pointer to this backend's ReplicationState to avoid having to
180 * search the replication_states array in replorigin_session_advance for each
181 * remote commit. (Ownership of a backend's own entry can only be changed by
186/* Magic for on disk files. */
187 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
194 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 errmsg(
"cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
199 (
errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 errmsg(
"cannot manipulate replication origins during recovery")));
205 * IsReservedOriginName
206 * True iff name is either "none" or "any".
215/* ---------------------------------------------------------------------------
216 * Functions for working with replication origins themselves.
217 * ---------------------------------------------------------------------------
221 * Check for a persistent replication origin identified by name.
223 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
239 roident =
ident->roident;
242 else if (!missing_ok)
244 (
errcode(ERRCODE_UNDEFINED_OBJECT),
245 errmsg(
"replication origin \"%s\" does not exist",
252 * Create a replication origin.
254 * Needs to be called in a transaction.
268 * To avoid needing a TOAST table for pg_replication_origin, we limit
269 * replication origin names to 512 bytes. This should be more than enough
270 * for all practical use.
274 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 errmsg(
"replication origin name is too long"),
276 errdetail(
"Replication origin names must be no longer than %d bytes.",
284 * We need the numeric replication origin to be 16bit wide, so we cannot
285 * rely on the normal oid allocation. Instead we simply scan
286 * pg_replication_origin for the first unused id. That's not particularly
287 * efficient, but this should be a fairly infrequent operation - we can
288 * easily spend a bit more code on this when it turns out it needs to be
291 * We handle concurrency by taking an exclusive lock (allowing reads!)
292 * over the table for the duration of the search. Because we use a "dirty
293 * snapshot" we can read rows that other in-progress sessions have
294 * written, even though they would be invisible with normal snapshots. Due
295 * to the exclusive lock there's no danger that new rows can appear while
303 * We want to be able to access pg_replication_origin without setting up a
304 * snapshot. To make that safe, it needs to not have a TOAST table, since
305 * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 * its only varlena column is roname, which we limit to 512 bytes to avoid
307 * needing out-of-line storage. If you add a TOAST table to this catalog,
308 * be sure to set up a snapshot everywhere it might be needed. For more
309 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
315 bool nulls[Natts_pg_replication_origin];
322 Anum_pg_replication_origin_roident,
338 * Ok, found an unused roident, insert the new row and do a CCI,
339 * so our callers can look it up if they want to.
341 memset(&nulls, 0,
sizeof(nulls));
344 values[Anum_pg_replication_origin_roname - 1] = roname_d;
353 /* now release lock again, */
358 (
errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 errmsg(
"could not find free replication origin ID")));
366 * Helper function to drop a replication origin.
374 * Clean up the slot state info, if there is any matching slot.
383 if (
state->roident == roident)
385 /* found our slot, is it busy? */
386 if (
state->acquired_by != 0)
392 (
errcode(ERRCODE_OBJECT_IN_USE),
393 errmsg(
"could not drop replication origin with ID %d, in use by PID %d",
395 state->acquired_by)));
398 * We must wait and then retry. Since we don't know which CV
399 * to wait on until here, we can't readily use
400 * ConditionVariablePrepareToSleep (calling it here would be
401 * wrong, since we could miss the signal if we did so); just
402 * use ConditionVariableSleep directly.
404 cv = &
state->origin_cv;
412 /* first make a WAL log entry */
422 /* then clear the in-memory slot */
434 * Drop replication origin (by name).
436 * Needs to be called in a transaction.
451 /* Lock the origin to prevent concurrent drops. */
459 elog(
ERROR,
"cache lookup failed for replication origin with ID %d",
463 * We don't need to retain the locks if the origin is already dropped.
474 * Now, we can delete the catalog entry.
481 /* We keep the lock on pg_replication_origin until commit */
486 * Lookup replication origin via its oid and return the name.
488 * The external name is palloc'd in the calling context.
490 * Returns true if the origin is known, false otherwise.
519 (
errcode(ERRCODE_UNDEFINED_OBJECT),
520 errmsg(
"replication origin with ID %d does not exist",
528/* ---------------------------------------------------------------------------
529 * Functions for handling replication progress.
530 * ---------------------------------------------------------------------------
579/* ---------------------------------------------------------------------------
580 * Perform a checkpoint of each replication origin's progress with respect to
581 * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 * if the transactions were originally committed asynchronously.
585 * We store checkpoints in the following format:
586 * +-------+------------------------+------------------+-----+--------+
587 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 * +-------+------------------------+------------------+-----+--------+
590 * So its just the magic, followed by the statically sized
591 * ReplicationStateOnDisk structs. Note that the maximum number of
592 * ReplicationState is determined by max_active_replication_origins.
593 * ---------------------------------------------------------------------------
610 /* make sure no old temp file is remaining */
611 if (unlink(tmppath) < 0 && errno != ENOENT)
614 errmsg(
"could not remove file \"%s\": %m",
618 * no other backend can perform this at the same time; only one checkpoint
619 * can happen at a time.
622 O_CREAT | O_EXCL | O_WRONLY |
PG_BINARY);
626 errmsg(
"could not create file \"%s\": %m",
631 if ((
write(tmpfd, &magic,
sizeof(magic))) !=
sizeof(magic))
633 /* if write didn't set errno, assume problem is no disk space */
638 errmsg(
"could not write to file \"%s\": %m",
643 /* prevent concurrent creations/drops */
646 /* write actual data */
656 /* zero, to avoid uninitialized padding bytes */
657 memset(&disk_state, 0,
sizeof(disk_state));
668 /* make sure we only write out a commit that's persistent */
672 if ((
write(tmpfd, &disk_state,
sizeof(disk_state))) !=
675 /* if write didn't set errno, assume problem is no disk space */
680 errmsg(
"could not write to file \"%s\": %m",
689 /* write out the CRC */
694 /* if write didn't set errno, assume problem is no disk space */
699 errmsg(
"could not write to file \"%s\": %m",
706 errmsg(
"could not close file \"%s\": %m",
709 /* fsync, rename to permanent file, fsync file and directory */
714 * Recover replication replay status from checkpoint data saved earlier by
715 * CheckPointReplicationOrigin.
717 * This only needs to be called at startup and *not* during every checkpoint
718 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 * state thereafter can be recovered by looking at commit records.
732 /* don't want to overwrite already existing state */
733#ifdef USE_ASSERT_CHECKING
734 static bool already_started =
false;
737 already_started =
true;
745 elog(
DEBUG2,
"starting up replication origin progress state");
750 * might have had max_active_replication_origins == 0 last run, or we just
751 * brought up a standby.
753 if (
fd < 0 && errno == ENOENT)
758 errmsg(
"could not open file \"%s\": %m",
761 /* verify magic, that is written even if nothing was active */
762 readBytes =
read(
fd, &magic,
sizeof(magic));
763 if (readBytes !=
sizeof(magic))
768 errmsg(
"could not read file \"%s\": %m",
773 errmsg(
"could not read file \"%s\": read %d of %zu",
774 path, readBytes,
sizeof(magic))));
780 (
errmsg(
"replication checkpoint has wrong magic %u instead of %u",
783 /* we can skip locking here, no other access is possible */
785 /* recover individual states, until there are no more to be found */
790 readBytes =
read(
fd, &disk_state,
sizeof(disk_state));
792 /* no further data */
793 if (readBytes ==
sizeof(
crc))
795 /* not pretty, but simple ... */
804 errmsg(
"could not read file \"%s\": %m",
808 if (readBytes !=
sizeof(disk_state))
812 errmsg(
"could not read file \"%s\": read %d of %zu",
813 path, readBytes,
sizeof(disk_state))));
820 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 errmsg(
"could not find free replication state, increase \"max_active_replication_origins\"")));
823 /* copy data to shared memory */
829 errmsg(
"recovered replication state of node %d to %X/%08X",
834 /* now check checksum */
839 errmsg(
"replication slot checkpoint has wrong checksum %u, expected %u",
845 errmsg(
"could not close file \"%s\": %m",
863 xlrec->
force /* backward */ ,
864 false /* WAL log */ );
891 elog(
PANIC,
"replorigin_redo: unknown op code %u", info);
897 * Tell the replication origin progress machinery that a commit from 'node'
898 * that originated at the LSN remote_commit on the remote node was replayed
899 * successfully and that we don't need to do so again. In combination with
900 * setting up replorigin_session_origin_lsn and replorigin_session_origin
901 * that ensures we won't lose knowledge about that after a crash if the
902 * transaction had a persistent effect (think of asynchronous commits).
904 * local_commit needs to be a local LSN of the commit so that we can make sure
905 * upon a checkpoint that enough WAL has been persisted to disk.
907 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
908 * unless running in recovery.
913 bool go_backward,
bool wal_log)
921 /* we don't track DoNotReplicateId */
926 * XXX: For the case where this is called by WAL replay, it'd be more
927 * efficient to restore into a backend local hashtable and only dump into
928 * shmem after recovery is finished. Let's wait with implementing that
929 * till it's shown to be a measurable expense
932 /* Lock exclusively, as we may have to create a new table entry. */
936 * Search for either an existing slot for the origin, or a free one we can
943 /* remember where to insert if necessary */
947 free_state = curstate;
958 replication_state = curstate;
962 /* Make sure it's not used by somebody else */
966 (
errcode(ERRCODE_OBJECT_IN_USE),
967 errmsg(
"replication origin with ID %d is already active for PID %d",
975 if (replication_state == NULL && free_state == NULL)
977 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 errmsg(
"could not find free replication state slot for replication origin with ID %d",
980 errhint(
"Increase \"max_active_replication_origins\" and try again.")));
982 if (replication_state == NULL)
984 /* initialize new slot */
986 replication_state = free_state;
989 replication_state->
roident = node;
995 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996 * and the standby gets the message. Primarily this will be called during
997 * WAL replay (of commit records) where no WAL logging is necessary.
1005 xlrec.
force = go_backward;
1014 * Due to - harmless - race conditions during a checkpoint we could see
1015 * values here that are older than the ones we already have in memory. We
1016 * could also see older values for prepared transactions when the prepare
1017 * is sent at a later point of time along with commit prepared and there
1018 * are other transactions commits between prepare and commit prepared. See
1019 * ReorderBufferFinishPrepared. Don't overwrite those.
1021 if (go_backward || replication_state->
remote_lsn < remote_commit)
1022 replication_state->
remote_lsn = remote_commit;
1024 (go_backward || replication_state->
local_lsn < local_commit))
1025 replication_state->
local_lsn = local_commit;
1029 * Release *after* changing the LSNs, slot isn't acquired and thus could
1030 * otherwise be dropped anytime.
1043 /* prevent slots from being concurrently dropped */
1052 if (
state->roident == node)
1056 remote_lsn =
state->remote_lsn;
1057 local_lsn =
state->local_lsn;
1074 * Tear down a (possibly) configured session replication origin during process
1102 * Setup a replication origin in the shared memory struct if it doesn't
1103 * already exist and cache access to the specific ReplicationSlot so the
1104 * array doesn't have to be searched when calling
1105 * replorigin_session_advance().
1107 * Normally only one such cached origin can exist per process so the cached
1108 * value can only be set again after the previous value is torn down with
1109 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1110 * (meaning the slot is not allowed to be already acquired by another process).
1112 * However, sometimes multiple processes can safely re-use the same origin slot
1113 * (for example, multiple parallel apply processes can safely use the same
1114 * origin, provided they maintain commit order by allowing only one process to
1115 * commit at a time). For this case the first process must pass acquired_by =
1116 * 0, and then the other processes sharing that same origin can pass
1117 * acquired_by = PID of the first process.
1122 static bool registered_cleanup;
1126 if (!registered_cleanup)
1129 registered_cleanup =
true;
1136 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 errmsg(
"cannot setup replication origin when one is already setup")));
1139 /* Lock exclusively, as we may have to create a new table entry. */
1143 * Search for either an existing slot for the origin, or a free one we can
1150 /* remember where to insert if necessary */
1159 if (curstate->
roident != node)
1162 else if (curstate->
acquired_by != 0 && acquired_by == 0)
1165 (
errcode(ERRCODE_OBJECT_IN_USE),
1166 errmsg(
"replication origin with ID %d is already active for PID %d",
1173 (
errcode(ERRCODE_OBJECT_IN_USE),
1174 errmsg(
"could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 node, acquired_by)));
1178 /* ok, found slot */
1186 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1187 errmsg(
"could not find free replication state slot for replication origin with ID %d",
1189 errhint(
"Increase \"max_active_replication_origins\" and try again.")));
1194 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1195 errmsg(
"cannot use PID %d for inactive replication origin with ID %d",
1196 acquired_by, node)));
1198 /* initialize new slot */
1208 if (acquired_by == 0)
1215 /* probably this one is pointless */
1220 * Reset replay state previously setup in this session.
1222 * This function may only be called if an origin was setup with
1223 * replorigin_session_setup().
1234 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1235 errmsg(
"no replication origin is configured")));
1249 * Do the same work replorigin_advance() does, just on the session's
1250 * configured origin.
1252 * This is noticeably cheaper than using replorigin_advance().
1269 * Ask the machinery about the point up to which we successfully replayed
1270 * changes from an already setup replication origin.
1293/* ---------------------------------------------------------------------------
1294 * SQL functions for working with replication origin.
1296 * These mostly should be fairly short wrappers around more generic functions.
1297 * ---------------------------------------------------------------------------
1301 * Create replication origin for the passed in name, and return the assigned
1315 * Replication origins "any and "none" are reserved for system options.
1316 * The origins "pg_xxx" are reserved for internal use.
1320 (
errcode(ERRCODE_RESERVED_NAME),
1321 errmsg(
"replication origin name \"%s\" is reserved",
1323 errdetail(
"Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1324 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1327 * If built with appropriate switch, whine when regression-testing
1328 * conventions for replication origin names are violated.
1330#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1331 if (strncmp(
name,
"regress_", 8) != 0)
1332 elog(
WARNING,
"replication origins created by regression test cases should have names starting with \"regress_\"");
1343 * Drop replication origin.
1362 * Return oid of a replication origin.
1383 * Setup a replication origin for this session.
1407 * Reset previously setup origin in this session
1424 * Has a replication origin been setup for this session.
1436 * Return the replication progress for origin setup in the current session.
1438 * If 'flush' is set to true it is ensured that the returned value corresponds
1439 * to a local transaction that has been flushed. This is useful if asynchronous
1440 * commits are used when replaying replicated transactions.
1452 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1453 errmsg(
"no replication origin is configured")));
1472 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1473 errmsg(
"no replication origin is configured")));
1502 /* lock to prevent the replication origin from vanishing */
1508 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1509 * xact hasn't committed yet. This is why this function should be used to
1510 * set up the initial replication state, but not for replay.
1513 true /* go backward */ ,
true /* WAL log */ );
1522 * Return the replication progress for an individual replication origin.
1524 * If 'flush' is set to true it is ensured that the returned value corresponds
1525 * to a local transaction that has been flushed. This is useful if asynchronous
1526 * commits are used when replaying replicated transactions.
1560 /* we want to return 0 rows if slot is set to zero */
1565 /* prevent slots from being concurrently dropped */
1569 * Iterate through all possible replication_states, display if they are
1570 * filled. Note that we do not take any locks, so slightly corrupted/out
1571 * of date values are a possibility.
1582 /* unused slot, nothing to display */
1587 memset(nulls, 1,
sizeof(nulls));
1593 * We're not preventing the origin to be dropped concurrently, so
1594 * silently accept that it might be gone.
1619#undef REPLICATION_ORIGIN_PROGRESS_COLS
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
#define OidIsValid(objectId)
bool IsReservedName(const char *name)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
int CloseTransientFile(int fd)
int OpenTransientFile(const char *fileName, int fileFlags)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_DATUM(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
#define PG_RETURN_BOOL(x)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockInitialize(LWLock *lock, int tranche_id)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
static ReplicationStateCtl * replication_states_ctl
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Size ReplicationOriginShmemSize(void)
RepOriginId replorigin_create(const char *roname)
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
void replorigin_session_reset(void)
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
int max_active_replication_origins
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
static ReplicationState * replication_states
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
static void ReplicationOriginExitCleanup(int code, Datum arg)
void StartupReplicationOrigin(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
static void replorigin_state_clear(RepOriginId roident, bool nowait)
void replorigin_session_setup(RepOriginId node, int acquired_by)
void CheckPointReplicationOrigin(void)
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
static ReplicationState * session_replication_state
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
void ReplicationOriginShmemInit(void)
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
#define REPLICATION_STATE_MAGIC
XLogRecPtr replorigin_session_origin_lsn
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
void replorigin_redo(XLogReaderState *record)
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define InvalidRepOriginId
#define XLOG_REPLORIGIN_DROP
#define XLOG_REPLORIGIN_SET
#define ERRCODE_DATA_CORRUPTED
#define COMP_CRC32C(crc, data, len)
static Datum LSNGetDatum(XLogRecPtr X)
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
static int fd(const char *x, int i)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define InitDirtySnapshot(snapshotdata)
#define BTEqualStrategyNumber
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable origin_cv
Tuplestorestate * setResult
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
#define PG_GETARG_TIMESTAMPTZ(n)
char * text_to_cstring(const text *t)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
bool RecoveryInProgress(void)
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
void XLogRegisterData(const void *data, uint32 len)
void XLogBeginInsert(void)
#define XLogRecGetInfo(decoder)
#define XLogRecGetData(decoder)