1/*-------------------------------------------------------------------------
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
9 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
13 * src/backend/replication/walreceiverfuncs.c
15 *-------------------------------------------------------------------------
37 * How long to wait for walreceiver to start up after requesting
38 * postmaster to launch it. In seconds.
40 #define WALRCV_STARTUP_TIMEOUT 10
42/* Report shared memory space needed by WalRcvShmemInit */
53/* Allocate and initialize walreceiver-related shared memory */
64 /* First time through, so initialize */
74/* Is walreceiver running (or starting up)? */
90 * If it has taken too long for walreceiver to start up, give up. Setting
91 * the state to STOPPED ensures that if walreceiver later does start up
92 * after all, it will see that it's not supposed to be running and die
93 * without doing anything.
101 bool stopped =
false;
123 * Is walreceiver running and streaming (or at least attempting to connect,
141 * If it has taken too long for walreceiver to start up, give up. Setting
142 * the state to STOPPED ensures that if walreceiver later does start up
143 * after all, it will see that it's not supposed to be running and die
144 * without doing anything.
152 bool stopped =
false;
175 * Stop walreceiver (if running) and wait for it to die.
176 * Executed by the Startup process.
183 bool stopped =
false;
186 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
187 * mode once it's finished, and will also request postmaster to not
206 walrcvpid = walrcv->
pid;
211 /* Unnecessary but consistent. */
216 * Signal walreceiver process if it was still running.
219 kill(walrcvpid, SIGTERM);
222 * Wait for walreceiver to acknowledge its death by setting state to
228 WAIT_EVENT_WAL_RECEIVER_EXIT);
233 * Request postmaster to start walreceiver.
235 * "recptr" indicates the position where streaming should begin. "conninfo"
236 * is a libpq connection string to use. "slotname" is, optionally, the name
237 * of a replication slot to acquire. "create_temp_slot" indicates to create
238 * a temporary slot when no "slotname" is given.
240 * WAL receivers do not directly load GUC parameters used for the connection
241 * to the primary, and rely on the values passed down by the caller of this
242 * routine instead. Hence, the addition of any new parameters should happen
243 * through this code path.
247 const char *slotname,
bool create_temp_slot)
255 * We always start at the beginning of the segment. That prevents a broken
256 * segment (i.e., with no records in the first half of a segment) from
257 * being created by XLOG streaming, which might cause trouble later on if
258 * the segment is e.g archived.
265 /* It better be stopped if we try to restart it */
269 if (conninfo != NULL)
275 * Use configured replication slot if present, and ignore the value of
276 * create_temp_slot as the slot name should be persistent. Otherwise, use
277 * create_temp_slot to determine whether this WAL receiver should create a
278 * temporary slot by itself and use it, or not.
280 if (slotname != NULL && slotname[0] !=
'0円')
301 * If this is the first startup of walreceiver (on this timeline),
302 * initialize flushedUpto and latestChunkStart to the starting point.
313 walrcv_proc = walrcv->
procno;
324 * Returns the last+1 byte position that walreceiver has flushed.
326 * Optionally, returns the previous chunk start, that is the first byte
327 * written in the most recent walreceiver flush cycle. Callers not
328 * interested in that value may pass NULL for latestChunkStart. Same for
339 if (latestChunkStart)
349 * Returns the last+1 byte position that walreceiver has written.
350 * This returns a recently written value without taking a lock.
361 * Returns the replication apply delay in ms or -1
362 * if the apply delay info is not available
378 if (receivePtr == replayPtr)
383 if (chunkReplayStartTime == 0)
391 * Returns the network latency in ms, note that this includes any
392 * difference in clock settings between the servers, as well as timezone.
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define MemSet(start, val, len)
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
Assert(PointerIsAligned(start, uint64))
void SetLatch(Latch *latch)
void SendPostmasterSignal(PMSignalReason reason)
@ PMSIGNAL_START_WALRECEIVER
size_t strlcpy(char *dst, const char *src, size_t siz)
#define GetPGProcByNumber(n)
#define INVALID_PROC_NUMBER
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
TimestampTz lastMsgReceiptTime
TimeLineID receiveStartTLI
char slotname[NAMEDATALEN]
XLogRecPtr latestChunkStart
ConditionVariable walRcvStoppedCV
pg_atomic_uint64 writtenUpto
TimestampTz lastMsgSendTime
char conninfo[MAXCONNINFO]
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool WalRcvStreaming(void)
void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot)
XLogRecPtr GetWalRcvWriteRecPtr(void)
void ShutdownWalRcv(void)
#define WALRCV_STARTUP_TIMEOUT
int GetReplicationApplyDelay(void)
void WalRcvShmemInit(void)
Size WalRcvShmemSize(void)
int GetReplicationTransferLatency(void)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
static TimeLineID receiveTLI
TimestampTz GetCurrentChunkReplayStartTime(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)