PostgreSQL Source Code git master
Macros | Enumerations | Functions | Variables
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define  WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

 

Functions

void  InitWalSender (void)
 
bool  exec_replication_command (const char *cmd_string)
 
void  WalSndErrorCleanup (void)
 
 
 
void  WalSndSignals (void)
 
 
void  WalSndShmemInit (void)
 
void  WalSndWakeup (bool physical, bool logical)
 
void  WalSndInitStopping (void)
 
void  WalSndWaitStopping (void)
 
 
void  WalSndRqstFileReload (void)
 
static void  WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

 
 
 
 
 
 
 

Macro Definition Documentation

WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

exec_replication_command()

bool exec_replication_command ( const char *  cmd_string )

Definition at line 1974 of file walsender.c.

1975{
1976 yyscan_t scanner;
1977 int parse_rc;
1978 Node *cmd_node;
1979 const char *cmdtag;
1980 MemoryContext old_context = CurrentMemoryContext;
1981
1982 /* We save and re-use the cmd_context across calls */
1983 static MemoryContext cmd_context = NULL;
1984
1985 /*
1986 * If WAL sender has been told that shutdown is getting close, switch its
1987 * status accordingly to handle the next replication commands correctly.
1988 */
1989 if (got_STOPPING)
1991
1992 /*
1993 * Throw error if in stopping mode. We need prevent commands that could
1994 * generate WAL while the shutdown checkpoint is being written. To be
1995 * safe, we just prohibit all new commands.
1996 */
1998 ereport(ERROR,
1999 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2000 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2001
2002 /*
2003 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2004 * command arrives. Clean up the old stuff if there's anything.
2005 */
2007
2009
2010 /*
2011 * Prepare to parse and execute the command.
2012 *
2013 * Because replication command execution can involve beginning or ending
2014 * transactions, we need a working context that will survive that, so we
2015 * make it a child of TopMemoryContext. That in turn creates a hazard of
2016 * long-lived memory leaks if we lose track of the working context. We
2017 * deal with that by creating it only once per walsender, and resetting it
2018 * for each new command. (Normally this reset is a no-op, but if the
2019 * prior exec_replication_command call failed with an error, it won't be.)
2020 *
2021 * This is subtler than it looks. The transactions we manage can extend
2022 * across replication commands, indeed SnapBuildClearExportedSnapshot
2023 * might have just ended one. Because transaction exit will revert to the
2024 * memory context that was current at transaction start, we need to be
2025 * sure that that context is still valid. That motivates re-using the
2026 * same cmd_context rather than making a new one each time.
2027 */
2028 if (cmd_context == NULL)
2030 "Replication command context",
2032 else
2033 MemoryContextReset(cmd_context);
2034
2035 MemoryContextSwitchTo(cmd_context);
2036
2037 replication_scanner_init(cmd_string, &scanner);
2038
2039 /*
2040 * Is it a WalSender command?
2041 */
2043 {
2044 /* Nope; clean up and get out. */
2046
2047 MemoryContextSwitchTo(old_context);
2048 MemoryContextReset(cmd_context);
2049
2050 /* XXX this is a pretty random place to make this check */
2051 if (MyDatabaseId == InvalidOid)
2052 ereport(ERROR,
2053 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2054 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2055
2056 /* Tell the caller that this wasn't a WalSender command. */
2057 return false;
2058 }
2059
2060 /*
2061 * Looks like a WalSender command, so parse it.
2062 */
2063 parse_rc = replication_yyparse(&cmd_node, scanner);
2064 if (parse_rc != 0)
2065 ereport(ERROR,
2066 (errcode(ERRCODE_SYNTAX_ERROR),
2067 errmsg_internal("replication command parser returned %d",
2068 parse_rc)));
2070
2071 /*
2072 * Report query to various monitoring facilities. For this purpose, we
2073 * report replication commands just like SQL commands.
2074 */
2075 debug_query_string = cmd_string;
2076
2078
2079 /*
2080 * Log replication command if log_replication_commands is enabled. Even
2081 * when it's disabled, log the command with DEBUG1 level for backward
2082 * compatibility.
2083 */
2085 (errmsg("received replication command: %s", cmd_string)));
2086
2087 /*
2088 * Disallow replication commands in aborted transaction blocks.
2089 */
2091 ereport(ERROR,
2092 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2093 errmsg("current transaction is aborted, "
2094 "commands ignored until end of transaction block")));
2095
2097
2098 /*
2099 * Allocate buffers that will be used for each outgoing and incoming
2100 * message. We do this just once per command to reduce palloc overhead.
2101 */
2105
2106 switch (cmd_node->type)
2107 {
2108 case T_IdentifySystemCmd:
2109 cmdtag = "IDENTIFY_SYSTEM";
2110 set_ps_display(cmdtag);
2112 EndReplicationCommand(cmdtag);
2113 break;
2114
2115 case T_ReadReplicationSlotCmd:
2116 cmdtag = "READ_REPLICATION_SLOT";
2117 set_ps_display(cmdtag);
2119 EndReplicationCommand(cmdtag);
2120 break;
2121
2122 case T_BaseBackupCmd:
2123 cmdtag = "BASE_BACKUP";
2124 set_ps_display(cmdtag);
2125 PreventInTransactionBlock(true, cmdtag);
2127 EndReplicationCommand(cmdtag);
2128 break;
2129
2130 case T_CreateReplicationSlotCmd:
2131 cmdtag = "CREATE_REPLICATION_SLOT";
2132 set_ps_display(cmdtag);
2134 EndReplicationCommand(cmdtag);
2135 break;
2136
2137 case T_DropReplicationSlotCmd:
2138 cmdtag = "DROP_REPLICATION_SLOT";
2139 set_ps_display(cmdtag);
2141 EndReplicationCommand(cmdtag);
2142 break;
2143
2144 case T_AlterReplicationSlotCmd:
2145 cmdtag = "ALTER_REPLICATION_SLOT";
2146 set_ps_display(cmdtag);
2148 EndReplicationCommand(cmdtag);
2149 break;
2150
2151 case T_StartReplicationCmd:
2152 {
2153 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2154
2155 cmdtag = "START_REPLICATION";
2156 set_ps_display(cmdtag);
2157 PreventInTransactionBlock(true, cmdtag);
2158
2159 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2160 StartReplication(cmd);
2161 else
2163
2164 /* dupe, but necessary per libpqrcv_endstreaming */
2165 EndReplicationCommand(cmdtag);
2166
2167 Assert(xlogreader != NULL);
2168 break;
2169 }
2170
2171 case T_TimeLineHistoryCmd:
2172 cmdtag = "TIMELINE_HISTORY";
2173 set_ps_display(cmdtag);
2174 PreventInTransactionBlock(true, cmdtag);
2176 EndReplicationCommand(cmdtag);
2177 break;
2178
2179 case T_VariableShowStmt:
2180 {
2182 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2183
2184 cmdtag = "SHOW";
2185 set_ps_display(cmdtag);
2186
2187 /* syscache access needs a transaction environment */
2189 GetPGVariable(n->name, dest);
2191 EndReplicationCommand(cmdtag);
2192 }
2193 break;
2194
2195 case T_UploadManifestCmd:
2196 cmdtag = "UPLOAD_MANIFEST";
2197 set_ps_display(cmdtag);
2198 PreventInTransactionBlock(true, cmdtag);
2200 EndReplicationCommand(cmdtag);
2201 break;
2202
2203 default:
2204 elog(ERROR, "unrecognized replication command node tag: %u",
2205 cmd_node->type);
2206 }
2207
2208 /*
2209 * Done. Revert to caller's memory context, and clean out the cmd_context
2210 * to recover memory right away.
2211 */
2212 MemoryContextSwitchTo(old_context);
2213 MemoryContextReset(cmd_context);
2214
2215 /*
2216 * We need not update ps display or pg_stat_activity, because PostgresMain
2217 * will reset those to "idle". But we must reset debug_query_string to
2218 * ensure it doesn't become a dangling pointer.
2219 */
2220 debug_query_string = NULL;
2221
2222 return true;
2223}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
Definition: backend_status.h:29
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:65
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1161
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
Oid MyDatabaseId
Definition: globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
Assert(PointerIsAligned(start, uint64))
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext TopMemoryContext
Definition: mcxt.c:166
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:37
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1395
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:567
WalSnd * MyWalSnd
Definition: walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:468
static StringInfoData tmpbuf
Definition: walsender.c:178
static void IdentifySystem(void)
Definition: walsender.c:387
static StringInfoData reply_message
Definition: walsender.c:177
void WalSndSetState(WalSndState state)
Definition: walsender.c:3922
static StringInfoData output_message
Definition: walsender.c:176
static void UploadManifest(void)
Definition: walsender.c:657
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:206
bool log_replication_commands
Definition: walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1181
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1437
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1386
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:799
static XLogReaderState * xlogreader
Definition: walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3660
void StartTransactionCommand(void)
Definition: xact.c:3071
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:407
void CommitTransactionCommand(void)
Definition: xact.c:3169

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineIDtli )

Definition at line 3617 of file walsender.c.

3618{
3619 XLogRecPtr replayPtr;
3620 TimeLineID replayTLI;
3621 XLogRecPtr receivePtr;
3623 XLogRecPtr result;
3624
3626
3627 /*
3628 * We can safely send what's already been replayed. Also, if walreceiver
3629 * is streaming WAL from the same timeline, we can send anything that it
3630 * has streamed, but hasn't been replayed yet.
3631 */
3632
3633 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3634 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3635
3636 if (tli)
3637 *tli = replayTLI;
3638
3639 result = replayPtr;
3640 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3641 result = receivePtr;
3642
3643 return result;
3644}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1668
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:124
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:62
static TimeLineID receiveTLI
Definition: xlogrecovery.c:265
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlogrecovery.c:4561

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3673 of file walsender.c.

3674{
3676
3677 /*
3678 * If replication has not yet started, die like with SIGTERM. If
3679 * replication is active, only set a flag and wake up the main loop. It
3680 * will send any outstanding WAL, wait for it to be replicated to the
3681 * standby, and then exit gracefully.
3682 */
3683 if (!replication_active)
3684 kill(MyProcPid, SIGTERM);
3685 else
3686 got_STOPPING = true;
3687}
int MyProcPid
Definition: globals.c:47
bool am_walsender
Definition: walsender.c:123
static volatile sig_atomic_t replication_active
Definition: walsender.c:214
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

InitWalSender()

void InitWalSender ( void  )

Definition at line 287 of file walsender.c.

288{
290
291 /* Create a per-walsender data structure in shared memory */
293
294 /* need resource owner for e.g. basebackups */
296
297 /*
298 * Let postmaster know that we're a WAL sender. Once we've declared us as
299 * a WAL sender process, postmaster will let us outlive the bgwriter and
300 * kill us last in the shutdown sequence, so we get a chance to stream all
301 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
302 * there's no going back, and we mustn't write any WAL records after this.
303 */
306
307 /*
308 * If the client didn't specify a database to connect to, show in PGPROC
309 * that our advertised xmin should affect vacuum horizons in all
310 * databases. This allows physical replication clients to send hot
311 * standby feedback that will delay vacuum cleanup in all databases.
312 */
314 {
316 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
319 LWLockRelease(ProcArrayLock);
320 }
321
322 /* Initialize empty timestamp buffer for lag tracking. */
324}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:996
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
TransactionId xmin
Definition: proc.h:194
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:3001
static LagTracker * lag_tracker
Definition: walsender.c:238
bool RecoveryInProgress(void)
Definition: xlog.c:6386

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1718 of file walsender.c.

1719{
1721
1722 /*
1723 * If we are running in a standby, there is no need to wake up walsenders.
1724 * This is because we do not support syncing slots to cascading standbys,
1725 * so, there are no walsenders waiting for standbys to catch up.
1726 */
1727 if (RecoveryInProgress())
1728 return;
1729
1732}
#define NameStr(name)
Definition: c.h:751
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2872
#define SlotIsPhysical(slot)
Definition: slot.h:254
ReplicationSlotPersistentData data
Definition: slot.h:192
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:117

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 334 of file walsender.c.

335{
340
341 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
343
344 if (MyReplicationSlot != NULL)
346
348
349 replication_active = false;
350
351 /*
352 * If there is a transaction in progress, it will clean up our
353 * ResourceOwner, but if a replication command set up a resource owner
354 * without a transaction, we've got to clean that up now.
355 */
358
360 proc_exit(0);
361
362 /* Revert back to startup state */
364}
void pgaio_error_cleanup(void)
Definition: aio.c:1162
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1945
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1016
void ReplicationSlotRelease(void)
Definition: slot.c:731
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:820
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:205
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5001
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3858 of file walsender.c.

3859{
3860 int i;
3861
3862 for (i = 0; i < max_wal_senders; i++)
3863 {
3864 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3865 pid_t pid;
3866
3867 SpinLockAcquire(&walsnd->mutex);
3868 pid = walsnd->pid;
3869 SpinLockRelease(&walsnd->mutex);
3870
3871 if (pid == 0)
3872 continue;
3873
3875 }
3876}
i
int i
Definition: isn.c:77
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
pid_t pid
int max_wal_senders
Definition: walsender.c:129

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3650 of file walsender.c.

3651{
3652 int i;
3653
3654 for (i = 0; i < max_wal_senders; i++)
3655 {
3656 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3657
3658 SpinLockAcquire(&walsnd->mutex);
3659 if (walsnd->pid == 0)
3660 {
3661 SpinLockRelease(&walsnd->mutex);
3662 continue;
3663 }
3664 walsnd->needreload = true;
3665 SpinLockRelease(&walsnd->mutex);
3666 }
3667}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3734 of file walsender.c.

3735{
3736 bool found;
3737 int i;
3738
3740 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3741
3742 if (!found)
3743 {
3744 /* First time through, so initialize */
3746
3747 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3749
3750 for (i = 0; i < max_wal_senders; i++)
3751 {
3752 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3753
3754 SpinLockInit(&walsnd->mutex);
3755 }
3756
3760 }
3761}
#define MemSet(start, val, len)
Definition: c.h:1019
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3722

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3722 of file walsender.c.

3723{
3724 Size size = 0;
3725
3726 size = offsetof(WalSndCtlData, walsnds);
3727 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3728
3729 return size;
3730}
size_t Size
Definition: c.h:610
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3703 of file walsender.c.

3704{
3705 /* Set up signal handlers */
3707 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3708 pqsignal(SIGTERM, die); /* request shutdown */
3709 /* SIGQUIT handler was already set up by InitPostmasterChild */
3710 InitializeTimeouts(); /* establishes SIGALRM handler */
3711 pqsignal(SIGPIPE, SIG_IGN);
3713 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3714 * shutdown */
3715
3716 /* Reset some signals that are accepted by postmaster but not here */
3717 pqsignal(SIGCHLD, SIG_DFL);
3718}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:100
#define pqsignal
Definition: port.h:531
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3061
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3695
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3884 of file walsender.c.

3885{
3886 for (;;)
3887 {
3888 int i;
3889 bool all_stopped = true;
3890
3891 for (i = 0; i < max_wal_senders; i++)
3892 {
3893 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3894
3895 SpinLockAcquire(&walsnd->mutex);
3896
3897 if (walsnd->pid == 0)
3898 {
3899 SpinLockRelease(&walsnd->mutex);
3900 continue;
3901 }
3902
3903 if (walsnd->state != WALSNDSTATE_STOPPING)
3904 {
3905 all_stopped = false;
3906 SpinLockRelease(&walsnd->mutex);
3907 break;
3908 }
3909 SpinLockRelease(&walsnd->mutex);
3910 }
3911
3912 /* safe to leave if confirmation is done for all WAL senders */
3913 if (all_stopped)
3914 return;
3915
3916 pg_usleep(10000L); /* wait for 10 msec */
3917 }
3918}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3779 of file walsender.c.

3780{
3781 /*
3782 * Wake up all the walsenders waiting on WAL being flushed or replayed
3783 * respectively. Note that waiting walsender would have prepared to sleep
3784 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3785 * before actually waiting.
3786 */
3787 if (physical)
3789
3790 if (logical)
3792}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:138
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3779
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:129

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

am_cascading_walsender

am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

am_walsender

log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

max_wal_senders

wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

wal_sender_timeout

AltStyle によって変換されたページ (->オリジナル) /