1/*-------------------------------------------------------------------------
3 * Functionality for synchronizing slots to a standby server from the
6 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
9 * src/backend/replication/logical/slotsync.c
11 * This file contains the code for slot synchronization on a physical standby
12 * to fetch logical failover slots information from the primary server, create
13 * the slots on the standby and synchronize them periodically.
15 * Slot synchronization can be performed either automatically by enabling slot
16 * sync worker or manually by calling SQL function pg_sync_replication_slots().
18 * If the WAL corresponding to the remote's restart_lsn is not available on the
19 * physical standby or the remote's catalog_xmin precedes the oldest xid for
20 * which it is guaranteed that rows wouldn't have been removed then we cannot
21 * create the local standby slot because that would mean moving the local slot
22 * backward and decoding won't be possible via such a slot. In this case, the
23 * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
24 * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
25 * which slot sync worker can perform the sync periodically or user can call
26 * pg_sync_replication_slots() periodically to perform the syncs.
28 * If synchronized slots fail to build a consistent snapshot from the
29 * restart_lsn before reaching confirmed_flush_lsn, they would become
30 * unreliable after promotion due to potential data loss from changes
31 * before reaching a consistent point. This can happen because the slots can
32 * be synced at some random time and we may not reach the consistent point
33 * at the same WAL location as the primary. So, we mark such slots as
34 * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
35 * consistent point, they will be marked as RS_PERSISTENT.
37 * The slot sync worker waits for some time before the next synchronization,
38 * with the duration varying based on whether any slots were updated during
39 * the last cycle. Refer to the comments above wait_for_slot_activity() for
42 * Any standby synchronized slots will be dropped if they no longer need
43 * to be synchronized. See comment atop drop_local_obsolete_slots() for more
45 *---------------------------------------------------------------------------
72 * Struct for sharing information to control slot synchronization.
74 * The slot sync worker's pid is needed by the startup process to shut it
75 * down during promotion. The startup process shuts down the slot sync worker
76 * and also sets stopSignaled=true to handle the race condition when the
77 * postmaster has not noticed the promotion yet and thus may end up restarting
78 * the slot sync worker. If stopSignaled is set, the worker will exit in such a
79 * case. The SQL function pg_sync_replication_slots() will also error out if
80 * this flag is set. Note that we don't need to reset this variable as after
81 * promotion the slot sync worker won't be restarted because the pmState
82 * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
83 * primary without restarting the server. See LaunchMissingBackgroundProcesses.
85 * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
88 * The 'last_start_time' is needed by postmaster to start the slot sync worker
89 * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where an immediate restart
90 * is expected (e.g., slot sync GUCs change), slot sync worker will reset
91 * last_start_time before exiting, so that postmaster can start the worker
92 * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
109 * The sleep time (ms) between slot-sync cycles varies dynamically
110 * (within a MIN/MAX range) according to slot activity. See
111 * wait_for_slot_activity() for details.
113 #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
114 #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */
118/* The restart interval for slot sync work used by postmaster */
119 #define SLOTSYNC_RESTART_INTERVAL_SEC 10
122 * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
123 * in SlotSyncCtxStruct, this flag is true only if the current process is
124 * performing slot synchronization.
129 * Structure to hold information fetched from the primary server about a logical
144 /* RS_INVAL_NONE if valid, or the reason of invalidation */
152 * If necessary, update the local synced slot's metadata based on the data
153 * from the remote slot.
155 * If no update was needed (the data of the remote slot is the same as the
156 * local slot) return false, otherwise true.
158 * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
159 * modified, and decoding from the corresponding LSN's can reach a
160 * consistent snapshot.
162 * *remote_slot_precedes will be true if the remote slot's LSN or xmin
163 * precedes locally reserved position.
167 bool *found_consistent_snapshot,
168 bool *remote_slot_precedes)
171 bool updated_xmin_or_lsn =
false;
172 bool updated_config =
false;
176 if (found_consistent_snapshot)
177 *found_consistent_snapshot =
false;
179 if (remote_slot_precedes)
180 *remote_slot_precedes =
false;
183 * Don't overwrite if we already have a newer catalog_xmin and
191 * This can happen in following situations:
193 * If the slot is temporary, it means either the initial WAL location
194 * reserved for the local slot is ahead of the remote slot's
195 * restart_lsn or the initial xmin_horizon computed for the local slot
196 * is ahead of the remote slot.
198 * If the slot is persistent, both restart_lsn and catalog_xmin of the
199 * synced slot could still be ahead of the remote slot. Since we use
200 * slot advance functionality to keep snapbuild/slot updated, it is
201 * possible that the restart_lsn and catalog_xmin are advanced to a
202 * later position than it has on the primary. This can happen when
203 * slot advancing machinery finds running xacts record after reaching
204 * the consistent state at a later point than the primary where it
205 * serializes the snapshot and updates the restart_lsn.
207 * We LOG the message if the slot is temporary as it can help the user
208 * to understand why the slot is not sync-ready. In the case of a
209 * persistent slot, it would be a more common case and won't directly
210 * impact the users, so we used DEBUG1 level to log the message.
213 errmsg(
"could not synchronize replication slot \"%s\"",
215 errdetail(
"Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
221 if (remote_slot_precedes)
222 *remote_slot_precedes =
true;
225 * Skip updating the configuration. This is required to avoid syncing
226 * two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
227 * transaction between old confirmed_lsn and two_phase_at will
228 * unexpectedly get decoded and sent to the downstream after
229 * promotion. See comments in ReorderBufferFinishPrepared.
235 * Attempt to sync LSNs and xmins only if remote slot is ahead of local
244 * We can't directly copy the remote slot's LSN or xmin unless there
245 * exists a consistent snapshot at that point. Otherwise, after
246 * promotion, the slots may not reach a consistent point before the
247 * confirmed_flush_lsn which can lead to a data loss. To avoid data
248 * loss, we let slot machinery advance the slot which ensures that
249 * snapbuilder/slot statuses are updated properly.
254 * Update the slot info directly if there is a serialized snapshot
255 * at the restart_lsn, as the slot can quickly reach consistency
256 * at restart_lsn by restoring the snapshot.
264 if (found_consistent_snapshot)
265 *found_consistent_snapshot =
true;
270 found_consistent_snapshot);
275 errmsg_internal(
"synchronized confirmed_flush for slot \"%s\" differs from remote slot",
282 updated_xmin_or_lsn =
true;
293 /* Avoid expensive operations while holding a spinlock. */
304 updated_config =
true;
307 * Ensure that there is no risk of sending prepared transactions
308 * unexpectedly after the promotion.
314 * We have to write the changed xmin to disk *before* we change the
315 * in-memory value, otherwise after a crash we wouldn't know that some
316 * catalog tuples might have been removed already.
318 if (updated_config || updated_xmin_or_lsn)
325 * Now the new xmin is safely on disk, we can let the global value
326 * advance. We do not take ProcArrayLock or similar since we only advance
327 * xmin here and there's not much harm done by a concurrent computation
330 if (updated_xmin_or_lsn)
340 return updated_config || updated_xmin_or_lsn;
344 * Get the list of local logical slots that are synchronized from the
358 /* Check if it is a synchronized slot */
362 local_slots =
lappend(local_slots, s);
372 * Helper function to check if local_slot is required to be retained.
374 * Return false either if local_slot does not exist in the remote_slots list
375 * or is invalidated while the corresponding remote slot is still valid,
381 bool remote_exists =
false;
382 bool locally_invalidated =
false;
388 remote_exists =
true;
391 * If remote slot is not invalidated but local slot is marked as
392 * invalidated, then set locally_invalidated flag.
395 locally_invalidated =
404 return (remote_exists && !locally_invalidated);
408 * Drop local obsolete slots.
410 * Drop the local slots that no longer need to be synced i.e. these either do
411 * not exist on the primary or are no longer enabled for failover.
413 * Additionally, drop any slots that are valid on the primary but got
414 * invalidated on the standby. This situation may occur due to the following
416 * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
417 * records from the restart_lsn of the slot.
418 * - 'primary_slot_name' is temporarily reset to null and the physical slot is
420 * These dropped slots will get recreated in next sync-cycle and it is okay to
421 * drop and recreate such slots as long as these are not consumable on the
422 * standby (which is the case currently).
424 * Note: Change of 'wal_level' on the primary server to a level lower than
425 * logical may also result in slot invalidation and removal on the standby.
426 * This is because such 'wal_level' change is only possible if the logical
427 * slots are removed on the primary server, so it's expected to see the
428 * slots being invalidated and removed on the standby too (and re-created
429 * if they are re-created on the primary server).
438 /* Drop the local slot if it is not required to be retained. */
444 * Use shared lock to prevent a conflict with
445 * ReplicationSlotsDropDBSlots(), trying to drop the same slot
446 * during a drop-database operation.
452 * In the small window between getting the slot to drop and
453 * locking the database, there is a possibility of a parallel
454 * database drop by the startup process and the creation of a new
455 * slot by the user. This new user-created slot may end up using
456 * the same shared memory as that of 'local_slot'. Thus check if
457 * local_slot is still the synced one before performing actual
461 synced_slot = local_slot->in_use && local_slot->data.synced;
474 errmsg(
"dropped replication slot \"%s\" of database with OID %u",
475 NameStr(local_slot->data.name),
476 local_slot->data.database));
482 * Reserve WAL for the currently active local slot using the specified WAL
483 * location (restart_lsn).
485 * If the given WAL location has been removed, reserve WAL using the oldest
486 * existing WAL segment.
504 /* Prevent WAL removal as fast as possible */
510 * Find the oldest existing WAL segment file.
512 * Normally, we can determine it by using the last removed segment
513 * number. However, if no WAL segment files have been removed by a
514 * checkpoint since startup, we need to search for the oldest segment
515 * file from the current timeline existing in XLOGDIR.
517 * XXX: Currently, we are searching for the oldest segment in the
518 * current timeline as there is less chance of the slot's restart_lsn
519 * from being some prior timeline, and even if it happens, in the
520 * worst case, we will wait to sync till the slot's restart_lsn moved
521 * to the current timeline.
525 if (oldest_segno == 1)
534 segno, oldest_segno);
537 * If all required WAL is still there, great, otherwise retry. The
538 * slot should prevent further removal of WAL, unless there's a
539 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
540 * the new restart_lsn above, so normally we should never need to loop
543 if (segno >= oldest_segno)
546 /* Retry using the location of the oldest wal segment */
552 * If the remote restart_lsn and catalog_xmin have caught up with the
553 * local ones, then update the LSNs and persist the local synced slot for
554 * future synchronization; otherwise, do nothing.
556 * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
563 bool found_consistent_snapshot =
false;
564 bool remote_slot_precedes =
false;
567 &found_consistent_snapshot,
568 &remote_slot_precedes);
571 * Check if the primary server has caught up. Refer to the comment atop
572 * the file for details on this check.
574 if (remote_slot_precedes)
577 * The remote slot didn't catch up to locally reserved position.
579 * We do not drop the slot because the restart_lsn can be ahead of the
580 * current location when recreating the slot in the next cycle. It may
581 * take more time to create such a slot. Therefore, we keep this slot
582 * and attempt the synchronization in the next cycle.
588 * Don't persist the slot if it cannot reach the consistent point from the
589 * restart_lsn. See comments atop this file.
591 if (!found_consistent_snapshot)
594 errmsg(
"could not synchronize replication slot \"%s\"", remote_slot->
name),
595 errdetail(
"Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
604 errmsg(
"newly created replication slot \"%s\" is sync-ready now",
611 * Synchronize a single slot to the given position.
613 * This creates a new slot if there is no existing one and updates the
614 * metadata of the slot as per the data received from the primary server.
616 * The slot is created as a temporary slot and stays in the same state until the
617 * remote_slot catches up with locally reserved position and local slot is
618 * updated. The slot is then persisted and is considered as sync-ready for
621 * Returns TRUE if the local slot is updated.
628 bool slot_updated =
false;
631 * Make sure that concerned WAL is received and flushed before syncing
632 * slot to target lsn received from the primary server.
638 * Can get here only if GUC 'synchronized_standby_slots' on the
639 * primary server was not configured correctly.
642 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
643 errmsg(
"skipping slot synchronization because the received slot sync"
644 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
652 /* Search for the named slot */
661 /* User-created slot with the same name exists, raise ERROR. */
664 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
665 errmsg(
"exiting from slot synchronization because same"
666 " name slot \"%s\" already exists on the standby",
670 * The slot has been synchronized before.
672 * It is important to acquire the slot here before checking
673 * invalidation. If we don't acquire the slot first, there could be a
674 * race condition that the local slot could be invalidated just after
675 * checking the 'invalidated' flag here and we could end up
676 * overwriting 'invalidated' flag to remote_slot's value. See
677 * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
678 * if the slot is not acquired by other processes.
680 * XXX: If it ever turns out that slot acquire/release is costly for
681 * cases when none of the slot properties is changed then we can do a
682 * pre-check to ensure that at least one of the slot properties is
683 * changed before acquiring the slot.
690 * Copy the invalidation cause from remote only if local slot is not
691 * invalidated locally, we don't want to overwrite existing one.
700 /* Make sure the invalidated state persists across server restart */
707 /* Skip the sync of an invalidated slot */
714 /* Slot not ready yet, let's attempt to make it sync-ready now. */
721 /* Slot ready for sync, so sync it. */
725 * Sanity check: As long as the invalidations are handled
726 * appropriately as above, this should never happen.
728 * We don't need to check restart_lsn here. See the comments in
729 * update_local_synced_slot() for details.
735 errdetail_internal(
"Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
743 /* Otherwise create the slot first. */
749 /* Skip creating the local slot if remote_slot is invalidated already */
754 * We create temporary slots instead of ephemeral slots here because
755 * we want the slots to survive after releasing them. This is done to
756 * avoid dropping and re-creating the slots in each synchronization
757 * cycle if the restart_lsn or catalog_xmin of the remote slot has not
765 /* For shorter lines. */
768 /* Avoid expensive operations while holding a spinlock. */
800 * Gets the failover logical slots info from the primary server and updates
801 * the slots locally. Creates the slots if not present on the standby.
803 * Returns TRUE if any of the slots gets updated in this sync-cycle.
808#define SLOTSYNC_COLUMN_COUNT 10
810 LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
815 bool some_slot_updated =
false;
816 bool started_tx =
false;
817 const char *query =
"SELECT slot_name, plugin, confirmed_flush_lsn,"
818 " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
819 " database, invalidation_reason"
820 " FROM pg_catalog.pg_replication_slots"
821 " WHERE failover and NOT temporary";
823 /* The syscache access in walrcv_exec() needs a transaction env. */
830 /* Execute the query */
834 errmsg(
"could not fetch failover logical slots info from the primary server: %s",
837 /* Construct the remote_slot tuple and synchronize each slot locally */
855 * It is possible to get null values for LSN and Xmin if slot is
856 * invalidated on the primary server, so handle accordingly.
892 * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
893 * slot is valid, that means we have fetched the remote_slot in its
894 * RS_EPHEMERAL state. In such a case, don't sync it; we can always
895 * sync it in the next sync cycle when the remote_slot is persisted
896 * and has valid lsn(s) and xmin values.
898 * XXX: In future, if we plan to expose 'slot->data.persistency' in
899 * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
900 * slots in the first place.
908 /* Create list of remote slots */
909 remote_slot_list =
lappend(remote_slot_list, remote_slot);
914 /* Drop local slots that no longer need to be synced. */
917 /* Now sync the slots locally */
923 * Use shared lock to prevent a conflict with
924 * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
925 * a drop-database operation.
934 /* We are done, free remote_slot_list elements */
942 return some_slot_updated;
946 * Checks the remote server info.
948 * We ensure that the 'primary_slot_name' exists on the remote server and the
949 * remote server is not a standby node.
954#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
960 bool remote_in_recovery;
961 bool primary_slot_valid;
962 bool started_tx =
false;
966 "SELECT pg_is_in_recovery(), count(*) = 1"
967 " FROM pg_catalog.pg_replication_slots"
968 " WHERE slot_type='physical' AND slot_name=%s",
971 /* The syscache access in walrcv_exec() needs a transaction env. */
983 errmsg(
"could not fetch primary slot name \"%s\" info from the primary server: %s",
985 errhint(
"Check if \"primary_slot_name\" is configured correctly."));
990 "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
996 * Slot sync is currently not supported on a cascading standby. This is
997 * because if we allow it, the primary server needs to wait for all the
998 * cascading standbys, otherwise, logical subscribers can still be ahead
999 * of one of the cascading standbys which we plan to promote. Thus, to
1000 * avoid this additional complexity, we restrict it for the time being.
1002 if (remote_in_recovery)
1004 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1005 errmsg(
"cannot synchronize replication slots from a standby server"));
1010 if (!primary_slot_valid)
1012 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1013 /* translator: second %s is a GUC variable name */
1014 errmsg(
"replication slot \"%s\" specified by \"%s\" does not exist on primary server",
1025 * Checks if dbname is specified in 'primary_conninfo'.
1027 * Error out if not specified otherwise return it.
1035 * The slot synchronization needs a database connection for walrcv_exec to
1041 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1044 * translator: first %s is a connection option; second %s is a GUC
1047 errmsg(
"replication slot synchronization requires \"%s\" to be specified in \"%s\"",
1048 "dbname",
"primary_conninfo"));
1053 * Return true if all necessary GUCs for slot synchronization are set
1054 * appropriately, otherwise, return false.
1060 * Logical slot sync/creation requires wal_level >= logical.
1065 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1066 errmsg(
"replication slot synchronization requires \"wal_level\" >= \"logical\""));
1071 * A physical replication slot(primary_slot_name) is required on the
1072 * primary to ensure that the rows needed by the standby are not removed
1073 * after restarting, so that the synchronized slot on the standby will not
1079 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1080 /* translator: %s is a GUC variable name */
1081 errmsg(
"replication slot synchronization requires \"%s\" to be set",
"primary_slot_name"));
1086 * hot_standby_feedback must be enabled to cooperate with the physical
1087 * replication slot, which allows informing the primary about the xmin and
1088 * catalog_xmin values on the standby.
1093 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1094 /* translator: %s is a GUC variable name */
1095 errmsg(
"replication slot synchronization requires \"%s\" to be enabled",
1096 "hot_standby_feedback"));
1101 * The primary_conninfo is required to make connection to primary for
1102 * getting slots information.
1107 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1108 /* translator: %s is a GUC variable name */
1109 errmsg(
"replication slot synchronization requires \"%s\" to be set",
1110 "primary_conninfo"));
1118 * Re-read the config file.
1120 * Exit if any of the slot sync GUCs have changed. The postmaster will
1130 bool conninfo_changed;
1131 bool primary_slotname_changed;
1138 conninfo_changed = strcmp(old_primary_conninfo,
PrimaryConnInfo) != 0;
1139 primary_slotname_changed = strcmp(old_primary_slotname,
PrimarySlotName) != 0;
1140 pfree(old_primary_conninfo);
1141 pfree(old_primary_slotname);
1146 /* translator: %s is a GUC variable name */
1147 errmsg(
"replication slot synchronization worker will shut down because \"%s\" is disabled",
"sync_replication_slots"));
1151 if (conninfo_changed ||
1152 primary_slotname_changed ||
1156 errmsg(
"replication slot synchronization worker will restart because of a parameter change"));
1159 * Reset the last-start time for this worker so that the postmaster
1160 * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
1170 * Interrupt handler for main loop of slot sync worker.
1180 errmsg(
"replication slot synchronization worker is shutting down on receiving SIGINT"));
1190 * Connection cleanup function for slotsync worker.
1192 * Called on slotsync worker exit.
1203 * Cleanup function for slotsync worker.
1205 * Called on slotsync worker exit.
1211 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1213 * The startup process during promotion invokes ShutDownSlotSync() which
1214 * waits for slot sync to finish and it does that by checking the
1215 * 'syncing' flag. Thus the slot sync worker must be done with slots'
1216 * release and cleanup to avoid any dangling temporary slots or active
1217 * slots before it marks itself as finished syncing.
1220 /* Make sure active replication slots are released */
1224 /* Also cleanup the temporary slots. */
1232 * If syncing_slots is true, it indicates that the process errored out
1233 * without resetting the flag. So, we need to clean up shared memory and
1234 * reset the flag here.
1246 * Sleep for long enough that we believe it's likely that the slots on primary
1249 * If there is no slot activity the wait time between sync-cycles will double
1250 * (to a maximum of 30s). If there is some slot activity the wait time between
1251 * sync-cycles is reset to the minimum (200ms).
1258 if (!some_slot_updated)
1261 * No slots were updated, so double the sleep time, but not beyond the
1262 * maximum allowable value.
1269 * Some slots were updated since the last sleep, so reset the sleep
1278 WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1285 * Emit an error if a promotion or a concurrent sync call is in progress.
1286 * Otherwise, advertise that a sync is in progress.
1293 /* The worker pid must not be already assigned in SlotSyncCtx */
1297 * Emit an error if startup process signaled the slot sync machinery to
1298 * stop. See comments atop SlotSyncCtxStruct.
1304 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1305 errmsg(
"cannot synchronize replication slots when standby promotion is ongoing"));
1312 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1313 errmsg(
"cannot synchronize replication slots concurrently"));
1319 * Advertise the required PID so that the startup process can kill the
1320 * slot sync worker on promotion.
1330 * Reset syncing flag.
1343 * The main loop of our worker process.
1345 * It connects to the primary server, fetches logical failover slots
1346 * information periodically in order to create and sync the slots.
1354 sigjmp_buf local_sigjmp_buf;
1357 Assert(startup_data_len == 0);
1366 * Create a per-backend PGPROC struct in shared memory. We must do this
1367 * before we access any shared memory.
1372 * Early initialization.
1379 * If an exception is encountered, processing resumes here.
1381 * We just need to clean up, report the error, and go away.
1383 * If we do not have this handling here, then since this worker process
1384 * operates at the bottom of the exception stack, ERRORs turn into FATALs.
1385 * Therefore, we create our own exception handler to catch ERRORs.
1387 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1389 /* since not using PG_TRY, must reset error stack by hand */
1392 /* Prevents interrupts while cleaning up */
1395 /* Report the error to the server log */
1399 * We can now go away. Note that because we called InitProcess, a
1400 * callback was registered to do ProcKill, which will clean up
1406 /* We can now handle ereport(ERROR) */
1409 /* Setup signal handling */
1423 /* Register it as soon as SlotSyncCtx->pid is initialized. */
1427 * Establishes SIGALRM handler and initialize timeout module. It is needed
1428 * by InitPostgres to register different timeouts.
1432 /* Load the libpq-specific functions */
1436 * Unblock signals (they were blocked when the postmaster forked us)
1441 * Set always-secure search path, so malicious users can't redirect user
1442 * code (e.g. operators).
1444 * It's not strictly necessary since we won't be scanning or writing to
1445 * any user table locally, but it's good to retain it here for added
1453 * Connect to the database specified by the user in primary_conninfo. We
1454 * need a database connection for walrcv_exec to work which we use to
1455 * fetch slot information from the remote node. See comments atop
1458 * We do not specify a specific user here since the slot sync worker will
1459 * operate as a superuser. This is safe because the slot sync worker does
1460 * not interact with user tables, eliminating the risk of executing
1461 * arbitrary code within triggers.
1474 * Establish the connection to the primary server for slot
1482 errcode(ERRCODE_CONNECTION_FAILURE),
1483 errmsg(
"synchronization worker \"%s\" could not connect to the primary server: %s",
1489 * Register the disconnection callback.
1491 * XXX: This can be combined with previous cleanup registration of
1492 * slotsync_worker_onexit() but that will need the connection to be made
1493 * global and we want to avoid introducing global for this purpose.
1498 * Using the specified primary server connection, check that we are not a
1499 * cascading standby and slot configured in 'primary_slot_name' exists on
1500 * the primary server.
1504 /* Main loop to synchronize slots */
1507 bool some_slot_updated =
false;
1517 * The slot sync worker can't get here because it will only stop when it
1518 * receives a SIGINT from the startup process, or when there is an error.
1524 * Update the inactive_since property for synced slots.
1526 * Note that this function is currently called when we shutdown the slot
1535 * We need to update inactive_since only when we are promoting standby to
1536 * correctly interpret the inactive_since if the standby gets promoted
1537 * without a restart. We don't want the slots to appear inactive for a
1538 * long time after promotion if they haven't been synchronized recently.
1539 * Whoever acquires the slot, i.e., makes the slot active, will reset it.
1544 /* The slot sync worker or SQL function mustn't be running by now */
1553 /* Check if it is a synchronized slot */
1558 /* The slot must not be acquired by any process */
1561 /* Use the same inactive_since time for all the slots. */
1573 * Shut down the slot sync worker.
1575 * This function sends signal to shutdown slot sync worker, if required. It
1576 * also waits till the slot sync worker has exited or
1577 * pg_sync_replication_slots() has finished.
1589 * Return if neither the slot sync worker is running nor the function
1590 * pg_sync_replication_slots() is executing.
1604 kill(worker_pid, SIGINT);
1606 /* Wait for slot sync to end */
1611 /* Wait a bit, we don't expect to have to wait long */
1614 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1624 /* Ensure that no process is syncing the slots. */
1637 * SlotSyncWorkerCanRestart
1639 * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
1640 * since it was launched last. Otherwise returns false.
1642 * This is a safety valve to protect against continuous respawn attempts if the
1643 * worker is dying immediately at launch. Note that since we will retry to
1644 * launch the worker from the postmaster main loop, we will get another
1650 time_t curtime = time(NULL);
1652 /* Return false if too soon since last start. */
1663 * Is current process syncing replication slots?
1665 * Could be either backend executing SQL function or slot sync worker.
1674 * Amount of shared memory required for slot synchronization.
1683 * Allocate and initialize the shared memory of slot synchronization.
1703 * Error cleanup callback for slot sync SQL function.
1711 * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1713 * The startup process during promotion invokes ShutDownSlotSync() which
1714 * waits for slot sync to finish and it does that by checking the
1715 * 'syncing' flag. Thus the SQL function must be done with slots' release
1716 * and cleanup to avoid any dangling temporary slots or active slots
1717 * before it marks itself as finished syncing.
1720 /* Make sure active replication slots are released */
1724 /* Also cleanup the synced temporary slots. */
1728 * The set syncing_slots indicates that the process errored out without
1729 * resetting the flag. So, we need to clean up shared memory and reset the
1739 * Synchronize the failover enabled replication slots using the specified
1740 * primary server connection.
1753 /* Cleanup the synced temporary slots */
1756 /* We are done with sync, so reset sync flag */
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define TextDatumGetCString(d)
Oid get_database_oid(const char *dbname, bool missing_ok)
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
void ProcessConfigFile(GucContext context)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
#define GetProcessingMode()
#define CHECK_FOR_INTERRUPTS()
#define AmLogicalSlotSyncWorkerProcess()
#define HOLD_INTERRUPTS()
#define SetProcessingMode(mode)
BackendType MyBackendType
void namestrcpy(Name name, const char *str)
#define foreach_ptr(type, var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
void FloatExceptionHandler(SIGNAL_ARGS)
static bool DatumGetBool(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static TransactionId DatumGetTransactionId(Datum X)
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void init_ps_display(const char *fixed_part)
char * quote_literal_cstr(const char *rawstr)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
static List * get_local_synced_slots(void)
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
#define PRIMARY_INFO_OUTPUT_COL_COUNT
static void slotsync_worker_disconnect(int code, Datum arg)
void SyncReplicationSlots(WalReceiverConn *wrconn)
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
static void drop_local_obsolete_slots(List *remote_slot_list)
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
void ShutDownSlotSync(void)
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
bool sync_replication_slots
static SlotSyncCtxStruct * SlotSyncCtx
static void slotsync_failure_callback(int code, Datum arg)
#define SLOTSYNC_COLUMN_COUNT
#define SLOTSYNC_RESTART_INTERVAL_SEC
static void reset_syncing_flag()
char * CheckAndGetDbnameFromConninfo(void)
static bool syncing_slots
struct RemoteSlot RemoteSlot
static void ProcessSlotSyncInterrupts(void)
struct SlotSyncCtxStruct SlotSyncCtxStruct
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
static bool synchronize_slots(WalReceiverConn *wrconn)
bool SlotSyncWorkerCanRestart(void)
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
static void wait_for_slot_activity(bool some_slot_updated)
static void slotsync_reread_config(void)
void SlotSyncShmemInit(void)
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
static void slotsync_worker_onexit(int code, Datum arg)
static void check_and_set_sync_info(pid_t worker_pid)
static void update_synced_slots_inactive_since(void)
bool ValidateSlotSyncParams(int elevel)
static void validate_remote_info(WalReceiverConn *wrconn)
bool IsSyncingReplicationSlots(void)
void ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
Size SlotSyncShmemSize(void)
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
ReplicationSlotInvalidationCause invalidated
TransactionId catalog_xmin
ReplicationSlot replication_slots[1]
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
TransactionId effective_catalog_xmin
ReplicationSlotPersistentData data
Tuplestorestate * tuplestore
void InitializeTimeouts(void)
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool TransactionIdFollows(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
#define WL_EXIT_ON_PM_DEATH
static WalReceiverConn * wrconn
bool hot_standby_feedback
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
bool IsTransactionState(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr