PostgreSQL Source Code git master
Data Structures | Macros | Typedefs | Functions | Variables
origin.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for origin.c:

Go to the source code of this file.

Data Structures

struct   ReplicationState
 
 
 

Macros

#define  PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"
 
 
#define  REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)
 
 

Typedefs

 
 
 

Functions

static void  replorigin_check_prerequisites (bool check_origins, bool recoveryOK)
 
static bool  IsReservedOriginName (const char *name)
 
RepOriginId  replorigin_by_name (const char *roname, bool missing_ok)
 
RepOriginId  replorigin_create (const char *roname)
 
static void  replorigin_state_clear (RepOriginId roident, bool nowait)
 
void  replorigin_drop_by_name (const char *name, bool missing_ok, bool nowait)
 
bool  replorigin_by_oid (RepOriginId roident, bool missing_ok, char **roname)
 
 
 
 
 
 
void  replorigin_advance (RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
 
 
static void  ReplicationOriginExitCleanup (int code, Datum arg)
 
void  replorigin_session_setup (RepOriginId node, int acquired_by)
 
 
void  replorigin_session_advance (XLogRecPtr remote_commit, XLogRecPtr local_commit)
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Variables

 
 
 
 
 
 
 

Macro Definition Documentation

PG_REPLORIGIN_CHECKPOINT_FILENAME

#define PG_REPLORIGIN_CHECKPOINT_FILENAME   PG_LOGICAL_DIR "/replorigin_checkpoint"

Definition at line 100 of file origin.c.

PG_REPLORIGIN_CHECKPOINT_TMPFILE

#define PG_REPLORIGIN_CHECKPOINT_TMPFILE   PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"

Definition at line 101 of file origin.c.

REPLICATION_ORIGIN_PROGRESS_COLS

#define REPLICATION_ORIGIN_PROGRESS_COLS   4

REPLICATION_STATE_MAGIC

#define REPLICATION_STATE_MAGIC   ((uint32) 0x1257DADE)

Definition at line 187 of file origin.c.

Typedef Documentation

ReplicationState

ReplicationStateCtl

ReplicationStateOnDisk

Function Documentation

CheckPointReplicationOrigin()

void CheckPointReplicationOrigin ( void  )

Definition at line 596 of file origin.c.

597{
598 const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 int tmpfd;
601 int i;
604
606 return;
607
609
610 /* make sure no old temp file is remaining */
611 if (unlink(tmppath) < 0 && errno != ENOENT)
614 errmsg("could not remove file \"%s\": %m",
615 tmppath)));
616
617 /*
618 * no other backend can perform this at the same time; only one checkpoint
619 * can happen at a time.
620 */
621 tmpfd = OpenTransientFile(tmppath,
622 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623 if (tmpfd < 0)
626 errmsg("could not create file \"%s\": %m",
627 tmppath)));
628
629 /* write magic */
630 errno = 0;
631 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 {
633 /* if write didn't set errno, assume problem is no disk space */
634 if (errno == 0)
635 errno = ENOSPC;
638 errmsg("could not write to file \"%s\": %m",
639 tmppath)));
640 }
641 COMP_CRC32C(crc, &magic, sizeof(magic));
642
643 /* prevent concurrent creations/drops */
644 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645
646 /* write actual data */
647 for (i = 0; i < max_active_replication_origins; i++)
648 {
649 ReplicationStateOnDisk disk_state;
651 XLogRecPtr local_lsn;
652
653 if (curstate->roident == InvalidRepOriginId)
654 continue;
655
656 /* zero, to avoid uninitialized padding bytes */
657 memset(&disk_state, 0, sizeof(disk_state));
658
659 LWLockAcquire(&curstate->lock, LW_SHARED);
660
661 disk_state.roident = curstate->roident;
662
663 disk_state.remote_lsn = curstate->remote_lsn;
664 local_lsn = curstate->local_lsn;
665
666 LWLockRelease(&curstate->lock);
667
668 /* make sure we only write out a commit that's persistent */
669 XLogFlush(local_lsn);
670
671 errno = 0;
672 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 sizeof(disk_state))
674 {
675 /* if write didn't set errno, assume problem is no disk space */
676 if (errno == 0)
677 errno = ENOSPC;
680 errmsg("could not write to file \"%s\": %m",
681 tmppath)));
682 }
683
684 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 }
686
687 LWLockRelease(ReplicationOriginLock);
688
689 /* write out the CRC */
691 errno = 0;
692 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 {
694 /* if write didn't set errno, assume problem is no disk space */
695 if (errno == 0)
696 errno = ENOSPC;
699 errmsg("could not write to file \"%s\": %m",
700 tmppath)));
701 }
702
703 if (CloseTransientFile(tmpfd) != 0)
706 errmsg("could not close file \"%s\": %m",
707 tmppath)));
708
709 /* fsync, rename to permanent file, fsync file and directory */
710 durable_rename(tmppath, path, PANIC);
711}
#define PG_BINARY
Definition: c.h:1272
uint32_t uint32
Definition: c.h:538
int errcode_for_file_access(void)
Definition: elog.c:877
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define PANIC
Definition: elog.h:42
#define ereport(elevel,...)
Definition: elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:779
int CloseTransientFile(int fd)
Definition: fd.c:2868
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2691
#define write(a, b, c)
Definition: win32.h:14
i
int i
Definition: isn.c:77
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
int max_active_replication_origins
Definition: origin.c:104
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition: origin.c:101
static ReplicationState * replication_states
Definition: origin.c:171
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition: origin.c:100
#define REPLICATION_STATE_MAGIC
Definition: origin.c:187
#define InvalidRepOriginId
Definition: origin.h:33
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
return crc
XLogRecPtr remote_lsn
Definition: origin.c:150
RepOriginId roident
Definition: origin.c:149
XLogRecPtr remote_lsn
Definition: origin.c:119
XLogRecPtr local_lsn
Definition: origin.c:126
RepOriginId roident
Definition: origin.c:114
LWLock lock
Definition: origin.c:141
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2780
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References CloseTransientFile(), COMP_CRC32C, crc, durable_rename(), ereport, errcode_for_file_access(), errmsg(), FIN_CRC32C, i, INIT_CRC32C, InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, PG_REPLORIGIN_CHECKPOINT_TMPFILE, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, ReplicationStateOnDisk::roident, write, and XLogFlush().

Referenced by CheckPointGuts().

IsReservedOriginName()

static bool IsReservedOriginName ( const char *  name )
static

Definition at line 209 of file origin.c.

210{
211 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213}
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
const char * name

References name, and pg_strcasecmp().

Referenced by pg_replication_origin_create().

pg_replication_origin_advance()

Datum pg_replication_origin_advance ( PG_FUNCTION_ARGS  )

Definition at line 1494 of file origin.c.

1495{
1497 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1498 RepOriginId node;
1499
1500 replorigin_check_prerequisites(true, false);
1501
1502 /* lock to prevent the replication origin from vanishing */
1503 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1504
1505 node = replorigin_by_name(text_to_cstring(name), false);
1506
1507 /*
1508 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1509 * xact hasn't committed yet. This is why this function should be used to
1510 * set up the initial replication state, but not for replay.
1511 */
1512 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1513 true /* go backward */ , true /* WAL log */ );
1514
1515 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1516
1518}
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:911
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition: origin.c:190
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:36
Definition: c.h:692
char * text_to_cstring(const text *t)
Definition: varlena.c:214
uint16 RepOriginId
Definition: xlogdefs.h:68
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References InvalidXLogRecPtr, LockRelationOid(), name, PG_GETARG_LSN, PG_GETARG_TEXT_PP, PG_RETURN_VOID, replorigin_advance(), replorigin_by_name(), replorigin_check_prerequisites(), RowExclusiveLock, text_to_cstring(), and UnlockRelationOid().

pg_replication_origin_create()

Datum pg_replication_origin_create ( PG_FUNCTION_ARGS  )

Definition at line 1305 of file origin.c.

1306{
1307 char *name;
1308 RepOriginId roident;
1309
1310 replorigin_check_prerequisites(false, false);
1311
1313
1314 /*
1315 * Replication origins "any and "none" are reserved for system options.
1316 * The origins "pg_xxx" are reserved for internal use.
1317 */
1319 ereport(ERROR,
1320 (errcode(ERRCODE_RESERVED_NAME),
1321 errmsg("replication origin name \"%s\" is reserved",
1322 name),
1323 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1324 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1325
1326 /*
1327 * If built with appropriate switch, whine when regression-testing
1328 * conventions for replication origin names are violated.
1329 */
1330#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1331 if (strncmp(name, "regress_", 8) != 0)
1332 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1333#endif
1334
1335 roident = replorigin_create(name);
1336
1337 pfree(name);
1338
1339 PG_RETURN_OID(roident);
1340}
bool IsReservedName(const char *name)
Definition: catalog.c:278
int errdetail(const char *fmt,...)
Definition: elog.c:1207
int errcode(int sqlerrcode)
Definition: elog.c:854
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
void pfree(void *pointer)
Definition: mcxt.c:1594
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
static bool IsReservedOriginName(const char *name)
Definition: origin.c:209
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322

References DatumGetPointer(), elog, ereport, errcode(), errdetail(), errmsg(), ERROR, IsReservedName(), IsReservedOriginName(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_OID, replorigin_check_prerequisites(), replorigin_create(), text_to_cstring(), and WARNING.

pg_replication_origin_drop()

Datum pg_replication_origin_drop ( PG_FUNCTION_ARGS  )

Definition at line 1346 of file origin.c.

1347{
1348 char *name;
1349
1350 replorigin_check_prerequisites(false, false);
1351
1353
1354 replorigin_drop_by_name(name, false, true);
1355
1356 pfree(name);
1357
1359}
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_drop_by_name(), and text_to_cstring().

pg_replication_origin_oid()

Datum pg_replication_origin_oid ( PG_FUNCTION_ARGS  )

Definition at line 1365 of file origin.c.

1366{
1367 char *name;
1368 RepOriginId roident;
1369
1370 replorigin_check_prerequisites(false, false);
1371
1373 roident = replorigin_by_name(name, true);
1374
1375 pfree(name);
1376
1377 if (OidIsValid(roident))
1378 PG_RETURN_OID(roident);
1380}
#define OidIsValid(objectId)
Definition: c.h:774
#define PG_RETURN_NULL()
Definition: fmgr.h:345

References DatumGetPointer(), name, OidIsValid, pfree(), PG_GETARG_DATUM, PG_RETURN_NULL, PG_RETURN_OID, replorigin_by_name(), replorigin_check_prerequisites(), and text_to_cstring().

pg_replication_origin_progress()

Datum pg_replication_origin_progress ( PG_FUNCTION_ARGS  )

Definition at line 1529 of file origin.c.

1530{
1531 char *name;
1532 bool flush;
1533 RepOriginId roident;
1534 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1535
1537
1539 flush = PG_GETARG_BOOL(1);
1540
1541 roident = replorigin_by_name(name, false);
1542 Assert(OidIsValid(roident));
1543
1544 remote_lsn = replorigin_get_progress(roident, flush);
1545
1546 if (remote_lsn == InvalidXLogRecPtr)
1548
1549 PG_RETURN_LSN(remote_lsn);
1550}
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
Assert(PointerIsAligned(start, uint64))
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1037
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:37

References Assert(), DatumGetPointer(), InvalidXLogRecPtr, name, OidIsValid, PG_GETARG_BOOL, PG_GETARG_DATUM, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_get_progress(), and text_to_cstring().

pg_replication_origin_session_is_setup()

Datum pg_replication_origin_session_is_setup ( PG_FUNCTION_ARGS  )

Definition at line 1427 of file origin.c.

1428{
1429 replorigin_check_prerequisites(false, false);
1430
1432}
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
RepOriginId replorigin_session_origin
Definition: origin.c:163

References InvalidRepOriginId, PG_RETURN_BOOL, replorigin_check_prerequisites(), and replorigin_session_origin.

pg_replication_origin_session_progress()

Datum pg_replication_origin_session_progress ( PG_FUNCTION_ARGS  )

Definition at line 1443 of file origin.c.

1444{
1445 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1446 bool flush = PG_GETARG_BOOL(0);
1447
1448 replorigin_check_prerequisites(true, false);
1449
1450 if (session_replication_state == NULL)
1451 ereport(ERROR,
1452 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1453 errmsg("no replication origin is configured")));
1454
1455 remote_lsn = replorigin_session_get_progress(flush);
1456
1457 if (remote_lsn == InvalidXLogRecPtr)
1459
1460 PG_RETURN_LSN(remote_lsn);
1461}
static ReplicationState * session_replication_state
Definition: origin.c:184
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1273

References ereport, errcode(), errmsg(), ERROR, InvalidXLogRecPtr, PG_GETARG_BOOL, PG_RETURN_LSN, PG_RETURN_NULL, replorigin_check_prerequisites(), replorigin_session_get_progress(), and session_replication_state.

pg_replication_origin_session_reset()

Datum pg_replication_origin_session_reset ( PG_FUNCTION_ARGS  )

Definition at line 1410 of file origin.c.

1411{
1412 replorigin_check_prerequisites(true, false);
1413
1415
1419
1421}
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
void replorigin_session_reset(void)
Definition: origin.c:1226
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164

References InvalidRepOriginId, InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and replorigin_session_reset().

pg_replication_origin_session_setup()

Datum pg_replication_origin_session_setup ( PG_FUNCTION_ARGS  )

Definition at line 1386 of file origin.c.

1387{
1388 char *name;
1389 RepOriginId origin;
1390 int pid;
1391
1392 replorigin_check_prerequisites(true, false);
1393
1395 origin = replorigin_by_name(name, false);
1396 pid = PG_GETARG_INT32(1);
1397 replorigin_session_setup(origin, pid);
1398
1400
1401 pfree(name);
1402
1404}
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1120

References DatumGetPointer(), name, pfree(), PG_GETARG_DATUM, PG_GETARG_INT32, PG_RETURN_VOID, replorigin_by_name(), replorigin_check_prerequisites(), replorigin_session_origin, replorigin_session_setup(), and text_to_cstring().

pg_replication_origin_xact_reset()

Datum pg_replication_origin_xact_reset ( PG_FUNCTION_ARGS  )

Definition at line 1482 of file origin.c.

References InvalidXLogRecPtr, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, and replorigin_session_origin_timestamp.

pg_replication_origin_xact_setup()

Datum pg_replication_origin_xact_setup ( PG_FUNCTION_ARGS  )

Definition at line 1464 of file origin.c.

1465{
1466 XLogRecPtr location = PG_GETARG_LSN(0);
1467
1468 replorigin_check_prerequisites(true, false);
1469
1470 if (session_replication_state == NULL)
1471 ereport(ERROR,
1472 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1473 errmsg("no replication origin is configured")));
1474
1477
1479}
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64

References ereport, errcode(), errmsg(), ERROR, PG_GETARG_LSN, PG_GETARG_TIMESTAMPTZ, PG_RETURN_VOID, replorigin_check_prerequisites(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, and session_replication_state.

pg_show_replication_origin_status()

Datum pg_show_replication_origin_status ( PG_FUNCTION_ARGS  )

Definition at line 1554 of file origin.c.

1555{
1556 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1557 int i;
1559
1560 /* we want to return 0 rows if slot is set to zero */
1561 replorigin_check_prerequisites(false, true);
1562
1563 InitMaterializedSRF(fcinfo, 0);
1564
1565 /* prevent slots from being concurrently dropped */
1566 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1567
1568 /*
1569 * Iterate through all possible replication_states, display if they are
1570 * filled. Note that we do not take any locks, so slightly corrupted/out
1571 * of date values are a possibility.
1572 */
1573 for (i = 0; i < max_active_replication_origins; i++)
1574 {
1578 char *roname;
1579
1581
1582 /* unused slot, nothing to display */
1583 if (state->roident == InvalidRepOriginId)
1584 continue;
1585
1586 memset(values, 0, sizeof(values));
1587 memset(nulls, 1, sizeof(nulls));
1588
1589 values[0] = ObjectIdGetDatum(state->roident);
1590 nulls[0] = false;
1591
1592 /*
1593 * We're not preventing the origin to be dropped concurrently, so
1594 * silently accept that it might be gone.
1595 */
1596 if (replorigin_by_oid(state->roident, true,
1597 &roname))
1598 {
1599 values[1] = CStringGetTextDatum(roname);
1600 nulls[1] = false;
1601 }
1602
1603 LWLockAcquire(&state->lock, LW_SHARED);
1604
1605 values[2] = LSNGetDatum(state->remote_lsn);
1606 nulls[2] = false;
1607
1608 values[3] = LSNGetDatum(state->local_lsn);
1609 nulls[3] = false;
1610
1611 LWLockRelease(&state->lock);
1612
1613 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1614 values, nulls);
1615 }
1616
1617 LWLockRelease(ReplicationOriginLock);
1618
1619#undef REPLICATION_ORIGIN_PROGRESS_COLS
1620
1621 return (Datum) 0;
1622}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:493
#define REPLICATION_ORIGIN_PROGRESS_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
TupleDesc setDesc
Definition: execnodes.h:364
Tuplestorestate * setResult
Definition: execnodes.h:363
Definition: regguts.h:323
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784

References CStringGetTextDatum, i, InitMaterializedSRF(), InvalidRepOriginId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ObjectIdGetDatum(), REPLICATION_ORIGIN_PROGRESS_COLS, replication_states, replorigin_by_oid(), replorigin_check_prerequisites(), ReturnSetInfo::setDesc, ReturnSetInfo::setResult, tuplestore_putvalues(), and values.

ReplicationOriginExitCleanup()

static void ReplicationOriginExitCleanup ( int  code,
Datum  arg 
)
static

Definition at line 1078 of file origin.c.

1079{
1080 ConditionVariable *cv = NULL;
1081
1082 if (session_replication_state == NULL)
1083 return;
1084
1085 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1086
1088 {
1090
1093 }
1094
1095 LWLockRelease(ReplicationOriginLock);
1096
1097 if (cv)
1099}
void ConditionVariableBroadcast(ConditionVariable *cv)
int MyProcPid
Definition: globals.c:47
@ LW_EXCLUSIVE
Definition: lwlock.h:112
ConditionVariable origin_cv
Definition: origin.c:136
int acquired_by
Definition: origin.c:131

References ReplicationState::acquired_by, ConditionVariableBroadcast(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyProcPid, ReplicationState::origin_cv, and session_replication_state.

Referenced by replorigin_session_setup().

ReplicationOriginShmemInit()

void ReplicationOriginShmemInit ( void  )

Definition at line 549 of file origin.c.

550{
551 bool found;
552
554 return;
555
557 ShmemInitStruct("ReplicationOriginState",
559 &found);
561
562 if (!found)
563 {
564 int i;
565
567
568 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569
570 for (i = 0; i < max_active_replication_origins; i++)
571 {
575 }
576 }
577}
#define MemSet(start, val, len)
Definition: c.h:1019
void ConditionVariableInit(ConditionVariable *cv)
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:176
Size ReplicationOriginShmemSize(void)
Definition: origin.c:534
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:159

References ConditionVariableInit(), i, LWLockInitialize(), max_active_replication_origins, MemSet, replication_states, replication_states_ctl, ReplicationOriginShmemSize(), ShmemInitStruct(), ReplicationStateCtl::states, and ReplicationStateCtl::tranche_id.

Referenced by CreateOrAttachShmemStructs().

ReplicationOriginShmemSize()

Size ReplicationOriginShmemSize ( void  )

Definition at line 534 of file origin.c.

535{
536 Size size = 0;
537
539 return size;
540
541 size = add_size(size, offsetof(ReplicationStateCtl, states));
542
543 size = add_size(size,
545 return size;
546}
size_t Size
Definition: c.h:610
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_active_replication_origins, and mul_size().

Referenced by CalculateShmemSize(), and ReplicationOriginShmemInit().

replorigin_advance()

void replorigin_advance ( RepOriginId  node,
XLogRecPtr  remote_commit,
XLogRecPtr  local_commit,
bool  go_backward,
bool  wal_log 
)

Definition at line 911 of file origin.c.

914{
915 int i;
916 ReplicationState *replication_state = NULL;
917 ReplicationState *free_state = NULL;
918
919 Assert(node != InvalidRepOriginId);
920
921 /* we don't track DoNotReplicateId */
922 if (node == DoNotReplicateId)
923 return;
924
925 /*
926 * XXX: For the case where this is called by WAL replay, it'd be more
927 * efficient to restore into a backend local hashtable and only dump into
928 * shmem after recovery is finished. Let's wait with implementing that
929 * till it's shown to be a measurable expense
930 */
931
932 /* Lock exclusively, as we may have to create a new table entry. */
933 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
934
935 /*
936 * Search for either an existing slot for the origin, or a free one we can
937 * use.
938 */
939 for (i = 0; i < max_active_replication_origins; i++)
940 {
942
943 /* remember where to insert if necessary */
944 if (curstate->roident == InvalidRepOriginId &&
945 free_state == NULL)
946 {
947 free_state = curstate;
948 continue;
949 }
950
951 /* not our slot */
952 if (curstate->roident != node)
953 {
954 continue;
955 }
956
957 /* ok, found slot */
958 replication_state = curstate;
959
960 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
961
962 /* Make sure it's not used by somebody else */
963 if (replication_state->acquired_by != 0)
964 {
966 (errcode(ERRCODE_OBJECT_IN_USE),
967 errmsg("replication origin with ID %d is already active for PID %d",
968 replication_state->roident,
969 replication_state->acquired_by)));
970 }
971
972 break;
973 }
974
975 if (replication_state == NULL && free_state == NULL)
977 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 errmsg("could not find free replication state slot for replication origin with ID %d",
979 node),
980 errhint("Increase \"max_active_replication_origins\" and try again.")));
981
982 if (replication_state == NULL)
983 {
984 /* initialize new slot */
985 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
986 replication_state = free_state;
987 Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
988 Assert(replication_state->local_lsn == InvalidXLogRecPtr);
989 replication_state->roident = node;
990 }
991
992 Assert(replication_state->roident != InvalidRepOriginId);
993
994 /*
995 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996 * and the standby gets the message. Primarily this will be called during
997 * WAL replay (of commit records) where no WAL logging is necessary.
998 */
999 if (wal_log)
1000 {
1001 xl_replorigin_set xlrec;
1002
1003 xlrec.remote_lsn = remote_commit;
1004 xlrec.node_id = node;
1005 xlrec.force = go_backward;
1006
1008 XLogRegisterData(&xlrec, sizeof(xlrec));
1009
1010 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1011 }
1012
1013 /*
1014 * Due to - harmless - race conditions during a checkpoint we could see
1015 * values here that are older than the ones we already have in memory. We
1016 * could also see older values for prepared transactions when the prepare
1017 * is sent at a later point of time along with commit prepared and there
1018 * are other transactions commits between prepare and commit prepared. See
1019 * ReorderBufferFinishPrepared. Don't overwrite those.
1020 */
1021 if (go_backward || replication_state->remote_lsn < remote_commit)
1022 replication_state->remote_lsn = remote_commit;
1023 if (local_commit != InvalidXLogRecPtr &&
1024 (go_backward || replication_state->local_lsn < local_commit))
1025 replication_state->local_lsn = local_commit;
1026 LWLockRelease(&replication_state->lock);
1027
1028 /*
1029 * Release *after* changing the LSNs, slot isn't acquired and thus could
1030 * otherwise be dropped anytime.
1031 */
1032 LWLockRelease(ReplicationOriginLock);
1033}
int errhint(const char *fmt,...)
Definition: elog.c:1321
#define DoNotReplicateId
Definition: origin.h:34
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
bool force
Definition: origin.h:22
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149

References ReplicationState::acquired_by, Assert(), DoNotReplicateId, ereport, errcode(), errhint(), errmsg(), ERROR, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_set::node_id, ReplicationState::remote_lsn, xl_replorigin_set::remote_lsn, replication_states, ReplicationState::roident, XLOG_REPLORIGIN_SET, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), pg_replication_origin_advance(), PrepareRedoAdd(), replorigin_redo(), xact_redo_abort(), and xact_redo_commit().

replorigin_by_name()

RepOriginId replorigin_by_name ( const char *  roname,
bool  missing_ok 
)

Definition at line 226 of file origin.c.

227{
229 Oid roident = InvalidOid;
230 HeapTuple tuple;
231 Datum roname_d;
232
233 roname_d = CStringGetTextDatum(roname);
234
235 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 if (HeapTupleIsValid(tuple))
237 {
239 roident = ident->roident;
240 ReleaseSysCache(tuple);
241 }
242 else if (!missing_ok)
244 (errcode(ERRCODE_UNDEFINED_OBJECT),
245 errmsg("replication origin \"%s\" does not exist",
246 roname)));
247
248 return roident;
249}
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define ident
Definition: indent_codes.h:47
FormData_pg_replication_origin * Form_pg_replication_origin
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220

References CStringGetTextDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), HeapTupleIsValid, ident, InvalidOid, ReleaseSysCache(), and SearchSysCache1().

Referenced by AlterSubscription(), binary_upgrade_replorigin_advance(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_advance(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_setup(), replorigin_drop_by_name(), and run_apply_worker().

replorigin_by_oid()

bool replorigin_by_oid ( RepOriginId  roident,
bool  missing_ok,
char **  roname 
)

Definition at line 493 of file origin.c.

494{
495 HeapTuple tuple;
497
498 Assert(OidIsValid((Oid) roident));
499 Assert(roident != InvalidRepOriginId);
500 Assert(roident != DoNotReplicateId);
501
502 tuple = SearchSysCache1(REPLORIGIDENT,
503 ObjectIdGetDatum((Oid) roident));
504
505 if (HeapTupleIsValid(tuple))
506 {
508 *roname = text_to_cstring(&ric->roname);
509 ReleaseSysCache(tuple);
510
511 return true;
512 }
513 else
514 {
515 *roname = NULL;
516
517 if (!missing_ok)
519 (errcode(ERRCODE_UNDEFINED_OBJECT),
520 errmsg("replication origin with ID %d does not exist",
521 roident)));
522
523 return false;
524 }
525}

References Assert(), DoNotReplicateId, ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), HeapTupleIsValid, InvalidRepOriginId, ObjectIdGetDatum(), OidIsValid, ReleaseSysCache(), SearchSysCache1(), and text_to_cstring().

Referenced by errdetail_apply_conflict(), pg_show_replication_origin_status(), and send_repl_origin().

replorigin_check_prerequisites()

static void replorigin_check_prerequisites ( bool  check_origins,
bool  recoveryOK 
)
static

Definition at line 190 of file origin.c.

191{
192 if (check_origins && max_active_replication_origins == 0)
194 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196
197 if (!recoveryOK && RecoveryInProgress())
199 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 errmsg("cannot manipulate replication origins during recovery")));
201}
bool RecoveryInProgress(void)
Definition: xlog.c:6386

References ereport, errcode(), errmsg(), ERROR, max_active_replication_origins, and RecoveryInProgress().

Referenced by pg_replication_origin_advance(), pg_replication_origin_create(), pg_replication_origin_drop(), pg_replication_origin_oid(), pg_replication_origin_progress(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_progress(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), and pg_show_replication_origin_status().

replorigin_create()

RepOriginId replorigin_create ( const char *  roname )

Definition at line 257 of file origin.c.

258{
259 Oid roident;
260 HeapTuple tuple = NULL;
261 Relation rel;
262 Datum roname_d;
263 SnapshotData SnapshotDirty;
264 SysScanDesc scan;
266
267 /*
268 * To avoid needing a TOAST table for pg_replication_origin, we limit
269 * replication origin names to 512 bytes. This should be more than enough
270 * for all practical use.
271 */
272 if (strlen(roname) > MAX_RONAME_LEN)
274 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 errmsg("replication origin name is too long"),
276 errdetail("Replication origin names must be no longer than %d bytes.",
278
279 roname_d = CStringGetTextDatum(roname);
280
282
283 /*
284 * We need the numeric replication origin to be 16bit wide, so we cannot
285 * rely on the normal oid allocation. Instead we simply scan
286 * pg_replication_origin for the first unused id. That's not particularly
287 * efficient, but this should be a fairly infrequent operation - we can
288 * easily spend a bit more code on this when it turns out it needs to be
289 * faster.
290 *
291 * We handle concurrency by taking an exclusive lock (allowing reads!)
292 * over the table for the duration of the search. Because we use a "dirty
293 * snapshot" we can read rows that other in-progress sessions have
294 * written, even though they would be invisible with normal snapshots. Due
295 * to the exclusive lock there's no danger that new rows can appear while
296 * we're checking.
297 */
298 InitDirtySnapshot(SnapshotDirty);
299
300 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301
302 /*
303 * We want to be able to access pg_replication_origin without setting up a
304 * snapshot. To make that safe, it needs to not have a TOAST table, since
305 * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 * its only varlena column is roname, which we limit to 512 bytes to avoid
307 * needing out-of-line storage. If you add a TOAST table to this catalog,
308 * be sure to set up a snapshot everywhere it might be needed. For more
309 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 */
311 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312
313 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 {
315 bool nulls[Natts_pg_replication_origin];
316 Datum values[Natts_pg_replication_origin];
317 bool collides;
318
320
322 Anum_pg_replication_origin_roident,
323 BTEqualStrategyNumber, F_OIDEQ,
324 ObjectIdGetDatum(roident));
325
326 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 true /* indexOK */ ,
328 &SnapshotDirty,
329 1, &key);
330
331 collides = HeapTupleIsValid(systable_getnext(scan));
332
333 systable_endscan(scan);
334
335 if (!collides)
336 {
337 /*
338 * Ok, found an unused roident, insert the new row and do a CCI,
339 * so our callers can look it up if they want to.
340 */
341 memset(&nulls, 0, sizeof(nulls));
342
343 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
344 values[Anum_pg_replication_origin_roname - 1] = roname_d;
345
346 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
347 CatalogTupleInsert(rel, tuple);
349 break;
350 }
351 }
352
353 /* now release lock again, */
355
356 if (tuple == NULL)
358 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 errmsg("could not find free replication origin ID")));
360
361 heap_freetuple(tuple);
362 return roident;
363}
#define PG_UINT16_MAX
Definition: c.h:592
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
#define ExclusiveLock
Definition: lockdefs.h:42
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define MAX_RONAME_LEN
Definition: origin.h:41
#define RelationGetDescr(relation)
Definition: rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
#define BTEqualStrategyNumber
Definition: stratnum.h:31
Definition: rel.h:56
Form_pg_class rd_rel
Definition: rel.h:111
Definition: skey.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool IsTransactionState(void)
Definition: xact.c:387
void CommandCounterIncrement(void)
Definition: xact.c:1100

References Assert(), BTEqualStrategyNumber, CatalogTupleInsert(), CHECK_FOR_INTERRUPTS, CommandCounterIncrement(), CStringGetTextDatum, ereport, errcode(), errdetail(), errmsg(), ERROR, ExclusiveLock, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InitDirtySnapshot, InvalidOid, IsTransactionState(), sort-test::key, MAX_RONAME_LEN, ObjectIdGetDatum(), OidIsValid, PG_UINT16_MAX, RelationData::rd_rel, RelationGetDescr, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), and values.

Referenced by CreateSubscription(), LogicalRepSyncTableStart(), pg_replication_origin_create(), and run_apply_worker().

replorigin_drop_by_name()

void replorigin_drop_by_name ( const char *  name,
bool  missing_ok,
bool  nowait 
)

Definition at line 439 of file origin.c.

440{
441 RepOriginId roident;
442 Relation rel;
443 HeapTuple tuple;
444
446
447 rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448
449 roident = replorigin_by_name(name, missing_ok);
450
451 /* Lock the origin to prevent concurrent drops. */
452 LockSharedObject(ReplicationOriginRelationId, roident, 0,
454
455 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 if (!HeapTupleIsValid(tuple))
457 {
458 if (!missing_ok)
459 elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 roident);
461
462 /*
463 * We don't need to retain the locks if the origin is already dropped.
464 */
465 UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
468 return;
469 }
470
471 replorigin_state_clear(roident, nowait);
472
473 /*
474 * Now, we can delete the catalog entry.
475 */
476 CatalogTupleDelete(rel, &tuple->t_self);
477 ReleaseSysCache(tuple);
478
480
481 /* We keep the lock on pg_replication_origin until commit */
482 table_close(rel, NoLock);
483}
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1148
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:369
ItemPointerData t_self
Definition: htup.h:65

References AccessExclusiveLock, Assert(), CatalogTupleDelete(), CommandCounterIncrement(), elog, ERROR, HeapTupleIsValid, IsTransactionState(), LockSharedObject(), name, NoLock, ObjectIdGetDatum(), ReleaseSysCache(), replorigin_by_name(), replorigin_state_clear(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), table_open(), and UnlockSharedObject().

Referenced by AlterSubscription_refresh(), DropSubscription(), pg_replication_origin_drop(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

replorigin_get_progress()

XLogRecPtr replorigin_get_progress ( RepOriginId  node,
bool  flush 
)

Definition at line 1037 of file origin.c.

1038{
1039 int i;
1040 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1041 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1042
1043 /* prevent slots from being concurrently dropped */
1044 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1045
1046 for (i = 0; i < max_active_replication_origins; i++)
1047 {
1049
1051
1052 if (state->roident == node)
1053 {
1054 LWLockAcquire(&state->lock, LW_SHARED);
1055
1056 remote_lsn = state->remote_lsn;
1057 local_lsn = state->local_lsn;
1058
1059 LWLockRelease(&state->lock);
1060
1061 break;
1062 }
1063 }
1064
1065 LWLockRelease(ReplicationOriginLock);
1066
1067 if (flush && local_lsn != InvalidXLogRecPtr)
1068 XLogFlush(local_lsn);
1069
1070 return remote_lsn;
1071}

References i, InvalidXLogRecPtr, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, replication_states, and XLogFlush().

Referenced by AlterSubscription(), and pg_replication_origin_progress().

replorigin_redo()

void replorigin_redo ( XLogReaderStaterecord )

Definition at line 850 of file origin.c.

851{
852 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
853
854 switch (info)
855 {
857 {
858 xl_replorigin_set *xlrec =
860
862 xlrec->remote_lsn, record->EndRecPtr,
863 xlrec->force /* backward */ ,
864 false /* WAL log */ );
865 break;
866 }
868 {
869 xl_replorigin_drop *xlrec;
870 int i;
871
872 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
873
874 for (i = 0; i < max_active_replication_origins; i++)
875 {
877
878 /* found our slot */
879 if (state->roident == xlrec->node_id)
880 {
881 /* reset entry */
882 state->roident = InvalidRepOriginId;
883 state->remote_lsn = InvalidXLogRecPtr;
884 state->local_lsn = InvalidXLogRecPtr;
885 break;
886 }
887 }
888 break;
889 }
890 default:
891 elog(PANIC, "replorigin_redo: unknown op code %u", info);
892 }
893}
uint8_t uint8
Definition: c.h:536
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
RepOriginId node_id
Definition: origin.h:27
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415

References elog, XLogReaderState::EndRecPtr, xl_replorigin_set::force, i, InvalidRepOriginId, InvalidXLogRecPtr, max_active_replication_origins, xl_replorigin_set::node_id, xl_replorigin_drop::node_id, PANIC, xl_replorigin_set::remote_lsn, replication_states, replorigin_advance(), XLOG_REPLORIGIN_DROP, XLOG_REPLORIGIN_SET, XLogRecGetData, and XLogRecGetInfo.

replorigin_session_advance()

void replorigin_session_advance ( XLogRecPtr  remote_commit,
XLogRecPtr  local_commit 
)

Definition at line 1255 of file origin.c.

References Assert(), InvalidRepOriginId, ReplicationState::local_lsn, ReplicationState::lock, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, ReplicationState::roident, and session_replication_state.

Referenced by EndPrepare(), RecordTransactionAbort(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), and RecordTransactionCommitPrepared().

replorigin_session_get_progress()

XLogRecPtr replorigin_session_get_progress ( bool  flush )

Definition at line 1273 of file origin.c.

1274{
1275 XLogRecPtr remote_lsn;
1276 XLogRecPtr local_lsn;
1277
1279
1284
1285 if (flush && local_lsn != InvalidXLogRecPtr)
1286 XLogFlush(local_lsn);
1287
1288 return remote_lsn;
1289}

References Assert(), InvalidXLogRecPtr, ReplicationState::local_lsn, ReplicationState::lock, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationState::remote_lsn, session_replication_state, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), pg_replication_origin_session_progress(), and run_apply_worker().

replorigin_session_reset()

void replorigin_session_reset ( void  )

Definition at line 1226 of file origin.c.

1227{
1229
1231
1232 if (session_replication_state == NULL)
1233 ereport(ERROR,
1234 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1235 errmsg("no replication origin is configured")));
1236
1237 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1238
1242
1243 LWLockRelease(ReplicationOriginLock);
1244
1246}

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), ereport, errcode(), errmsg(), ERROR, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, ReplicationState::origin_cv, and session_replication_state.

Referenced by pg_replication_origin_session_reset(), and process_syncing_tables_for_sync().

replorigin_session_setup()

void replorigin_session_setup ( RepOriginId  node,
int  acquired_by 
)

Definition at line 1120 of file origin.c.

1121{
1122 static bool registered_cleanup;
1123 int i;
1124 int free_slot = -1;
1125
1126 if (!registered_cleanup)
1127 {
1129 registered_cleanup = true;
1130 }
1131
1133
1134 if (session_replication_state != NULL)
1135 ereport(ERROR,
1136 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 errmsg("cannot setup replication origin when one is already setup")));
1138
1139 /* Lock exclusively, as we may have to create a new table entry. */
1140 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1141
1142 /*
1143 * Search for either an existing slot for the origin, or a free one we can
1144 * use.
1145 */
1146 for (i = 0; i < max_active_replication_origins; i++)
1147 {
1149
1150 /* remember where to insert if necessary */
1151 if (curstate->roident == InvalidRepOriginId &&
1152 free_slot == -1)
1153 {
1154 free_slot = i;
1155 continue;
1156 }
1157
1158 /* not our slot */
1159 if (curstate->roident != node)
1160 continue;
1161
1162 else if (curstate->acquired_by != 0 && acquired_by == 0)
1163 {
1164 ereport(ERROR,
1165 (errcode(ERRCODE_OBJECT_IN_USE),
1166 errmsg("replication origin with ID %d is already active for PID %d",
1167 curstate->roident, curstate->acquired_by)));
1168 }
1169
1170 else if (curstate->acquired_by != acquired_by)
1171 {
1172 ereport(ERROR,
1173 (errcode(ERRCODE_OBJECT_IN_USE),
1174 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 node, acquired_by)));
1176 }
1177
1178 /* ok, found slot */
1179 session_replication_state = curstate;
1180 break;
1181 }
1182
1183
1184 if (session_replication_state == NULL && free_slot == -1)
1185 ereport(ERROR,
1186 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1187 errmsg("could not find free replication state slot for replication origin with ID %d",
1188 node),
1189 errhint("Increase \"max_active_replication_origins\" and try again.")));
1190 else if (session_replication_state == NULL)
1191 {
1192 if (acquired_by)
1193 ereport(ERROR,
1194 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1195 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1196 acquired_by, node)));
1197
1198 /* initialize new slot */
1203 }
1204
1205
1207
1208 if (acquired_by == 0)
1210 else
1212
1213 LWLockRelease(ReplicationOriginLock);
1214
1215 /* probably this one is pointless */
1217}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1078

References ReplicationState::acquired_by, Assert(), ConditionVariableBroadcast(), ereport, errcode(), errhint(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, ReplicationState::local_lsn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, MyProcPid, on_shmem_exit(), ReplicationState::origin_cv, ReplicationState::remote_lsn, replication_states, ReplicationOriginExitCleanup(), ReplicationState::roident, and session_replication_state.

Referenced by LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_setup(), and run_apply_worker().

replorigin_state_clear()

static void replorigin_state_clear ( RepOriginId  roident,
bool  nowait 
)
static

Definition at line 369 of file origin.c.

370{
371 int i;
372
373 /*
374 * Clean up the slot state info, if there is any matching slot.
375 */
376restart:
377 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378
379 for (i = 0; i < max_active_replication_origins; i++)
380 {
382
383 if (state->roident == roident)
384 {
385 /* found our slot, is it busy? */
386 if (state->acquired_by != 0)
387 {
389
390 if (nowait)
392 (errcode(ERRCODE_OBJECT_IN_USE),
393 errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 state->roident,
395 state->acquired_by)));
396
397 /*
398 * We must wait and then retry. Since we don't know which CV
399 * to wait on until here, we can't readily use
400 * ConditionVariablePrepareToSleep (calling it here would be
401 * wrong, since we could miss the signal if we did so); just
402 * use ConditionVariableSleep directly.
403 */
404 cv = &state->origin_cv;
405
406 LWLockRelease(ReplicationOriginLock);
407
408 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 goto restart;
410 }
411
412 /* first make a WAL log entry */
413 {
414 xl_replorigin_drop xlrec;
415
416 xlrec.node_id = roident;
418 XLogRegisterData(&xlrec, sizeof(xlrec));
419 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 }
421
422 /* then clear the in-memory slot */
423 state->roident = InvalidRepOriginId;
424 state->remote_lsn = InvalidXLogRecPtr;
425 state->local_lsn = InvalidXLogRecPtr;
426 break;
427 }
428 }
429 LWLockRelease(ReplicationOriginLock);
431}
bool ConditionVariableCancelSleep(void)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)

References ConditionVariableCancelSleep(), ConditionVariableSleep(), ereport, errcode(), errmsg(), ERROR, i, InvalidRepOriginId, InvalidXLogRecPtr, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, xl_replorigin_drop::node_id, replication_states, XLOG_REPLORIGIN_DROP, XLogBeginInsert(), XLogInsert(), and XLogRegisterData().

Referenced by replorigin_drop_by_name().

StartupReplicationOrigin()

void StartupReplicationOrigin ( void  )

Definition at line 722 of file origin.c.

723{
724 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 int fd;
726 int readBytes;
728 int last_state = 0;
729 pg_crc32c file_crc;
731
732 /* don't want to overwrite already existing state */
733#ifdef USE_ASSERT_CHECKING
734 static bool already_started = false;
735
736 Assert(!already_started);
737 already_started = true;
738#endif
739
741 return;
742
744
745 elog(DEBUG2, "starting up replication origin progress state");
746
747 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748
749 /*
750 * might have had max_active_replication_origins == 0 last run, or we just
751 * brought up a standby.
752 */
753 if (fd < 0 && errno == ENOENT)
754 return;
755 else if (fd < 0)
758 errmsg("could not open file \"%s\": %m",
759 path)));
760
761 /* verify magic, that is written even if nothing was active */
762 readBytes = read(fd, &magic, sizeof(magic));
763 if (readBytes != sizeof(magic))
764 {
765 if (readBytes < 0)
768 errmsg("could not read file \"%s\": %m",
769 path)));
770 else
773 errmsg("could not read file \"%s\": read %d of %zu",
774 path, readBytes, sizeof(magic))));
775 }
776 COMP_CRC32C(crc, &magic, sizeof(magic));
777
778 if (magic != REPLICATION_STATE_MAGIC)
780 (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 magic, REPLICATION_STATE_MAGIC)));
782
783 /* we can skip locking here, no other access is possible */
784
785 /* recover individual states, until there are no more to be found */
786 while (true)
787 {
788 ReplicationStateOnDisk disk_state;
789
790 readBytes = read(fd, &disk_state, sizeof(disk_state));
791
792 /* no further data */
793 if (readBytes == sizeof(crc))
794 {
795 /* not pretty, but simple ... */
796 file_crc = *(pg_crc32c *) &disk_state;
797 break;
798 }
799
800 if (readBytes < 0)
801 {
804 errmsg("could not read file \"%s\": %m",
805 path)));
806 }
807
808 if (readBytes != sizeof(disk_state))
809 {
812 errmsg("could not read file \"%s\": read %d of %zu",
813 path, readBytes, sizeof(disk_state))));
814 }
815
816 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
817
818 if (last_state == max_active_replication_origins)
820 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822
823 /* copy data to shared memory */
824 replication_states[last_state].roident = disk_state.roident;
825 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
826 last_state++;
827
828 ereport(LOG,
829 errmsg("recovered replication state of node %d to %X/%08X",
830 disk_state.roident,
831 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
832 }
833
834 /* now check checksum */
836 if (file_crc != crc)
839 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840 crc, file_crc)));
841
842 if (CloseTransientFile(fd) != 0)
845 errmsg("could not close file \"%s\": %m",
846 path)));
847}
#define LOG
Definition: elog.h:31
#define DEBUG2
Definition: elog.h:29
#define read(a, b, c)
Definition: win32.h:13
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:46

References Assert(), CloseTransientFile(), COMP_CRC32C, crc, DEBUG2, elog, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), fd(), FIN_CRC32C, INIT_CRC32C, LOG, LSN_FORMAT_ARGS, max_active_replication_origins, OpenTransientFile(), PANIC, PG_BINARY, PG_REPLORIGIN_CHECKPOINT_FILENAME, read, ReplicationState::remote_lsn, ReplicationStateOnDisk::remote_lsn, REPLICATION_STATE_MAGIC, replication_states, ReplicationState::roident, and ReplicationStateOnDisk::roident.

Referenced by StartupXLOG().

Variable Documentation

max_active_replication_origins

int max_active_replication_origins = 10

Definition at line 104 of file origin.c.

Referenced by check_new_cluster_subscription_configuration(), CheckPointReplicationOrigin(), logicalrep_worker_launch(), pg_show_replication_origin_status(), ReplicationOriginShmemInit(), ReplicationOriginShmemSize(), replorigin_advance(), replorigin_check_prerequisites(), replorigin_get_progress(), replorigin_redo(), replorigin_session_reset(), replorigin_session_setup(), replorigin_state_clear(), and StartupReplicationOrigin().

replication_states

ReplicationState* replication_states
static

Definition at line 171 of file origin.c.

Referenced by CheckPointReplicationOrigin(), pg_show_replication_origin_status(), ReplicationOriginShmemInit(), replorigin_advance(), replorigin_get_progress(), replorigin_redo(), replorigin_session_setup(), replorigin_state_clear(), and StartupReplicationOrigin().

replication_states_ctl

ReplicationStateCtl* replication_states_ctl
static

Definition at line 176 of file origin.c.

Referenced by ReplicationOriginShmemInit().

replorigin_session_origin

RepOriginId replorigin_session_origin = InvalidRepOriginId

Definition at line 163 of file origin.c.

Referenced by apply_handle_delete_internal(), apply_handle_tuple_routing(), apply_handle_update_internal(), EndPrepare(), LogicalRepSyncTableStart(), ParallelApplyWorkerMain(), pg_replication_origin_session_is_setup(), pg_replication_origin_session_reset(), pg_replication_origin_session_setup(), process_syncing_tables_for_sync(), RecordTransactionAbort(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), RecordTransactionCommitPrepared(), replorigin_reset(), run_apply_worker(), XactLogAbortRecord(), XactLogCommitRecord(), and XLogRecordAssemble().

replorigin_session_origin_lsn

XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr

Definition at line 164 of file origin.c.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare_internal(), apply_handle_rollback_prepared(), EndPrepare(), pa_stream_abort(), pg_replication_origin_session_reset(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), process_syncing_tables_for_sync(), RecordTransactionAbort(), RecordTransactionAbortPrepared(), RecordTransactionCommit(), RecordTransactionCommitPrepared(), replorigin_reset(), XactLogAbortRecord(), and XactLogCommitRecord().

replorigin_session_origin_timestamp

TimestampTz replorigin_session_origin_timestamp = 0

Definition at line 165 of file origin.c.

Referenced by apply_handle_commit_internal(), apply_handle_commit_prepared(), apply_handle_prepare_internal(), apply_handle_rollback_prepared(), EndPrepare(), pa_stream_abort(), pg_replication_origin_session_reset(), pg_replication_origin_xact_reset(), pg_replication_origin_xact_setup(), process_syncing_tables_for_sync(), RecordTransactionCommit(), RecordTransactionCommitPrepared(), replorigin_reset(), XactLogAbortRecord(), and XactLogCommitRecord().

session_replication_state

ReplicationState* session_replication_state = NULL
static

Definition at line 184 of file origin.c.

Referenced by pg_replication_origin_session_progress(), pg_replication_origin_xact_setup(), ReplicationOriginExitCleanup(), replorigin_session_advance(), replorigin_session_get_progress(), replorigin_session_reset(), and replorigin_session_setup().

AltStyle によって変換されたページ (->オリジナル) /