git.postgresql.org Git - postgresql.git/commitdiff

git projects / postgresql.git / commitdiff
? search:
summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 83fd453)
Rationalize GetWalRcv{Write,Flush}RecPtr().
Wed, 8 Apr 2020 11:45:09 +0000 (23:45 +1200)
Wed, 8 Apr 2020 11:45:09 +0000 (23:45 +1200)
GetWalRcvWriteRecPtr() previously reported the latest *flushed*
location. Adopt the conventional terminology used elsewhere in the tree
by renaming it to GetWalRcvFlushRecPtr(), and likewise for some related
variables that used the term "received".

Add a new definition of GetWalRcvWriteRecPtr(), which returns the latest
*written* value. This will allow later patches to use the value for
non-data-integrity purposes, without having to wait for the flush
pointer to advance.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com


diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 740d7044b1d50eab5aad478e48ab84247b0e3d31..c38bc1412d8861c78481e2ace67a964e2fcdc20a 100644 (file)
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -208,8 +208,8 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec;
-/* Local copy of WalRcv->receivedUpto */
-static XLogRecPtr receivedUpto = 0;
+/* Local copy of WalRcv->flushedUpto */
+static XLogRecPtr flushedUpto = 0;
static TimeLineID receiveTLI = 0;
/*
@@ -9363,7 +9363,7 @@ CreateRestartPoint(int flags)
* Retreat _logSegNo using the current end of xlog replayed or received,
* whichever is later.
*/
- receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+ receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
@@ -11856,7 +11856,7 @@ retry:
/* See if we need to retrieve more data */
if (readFile < 0 ||
(readSource == XLOG_FROM_STREAM &&
- receivedUpto < targetPagePtr + reqLen))
+ flushedUpto < targetPagePtr + reqLen))
{
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
private->randAccess,
@@ -11887,10 +11887,10 @@ retry:
*/
if (readSource == XLOG_FROM_STREAM)
{
- if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
+ if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ))
readLen = XLOG_BLCKSZ;
else
- readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) -
+ readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) -
targetPageOff;
}
else
@@ -12305,7 +12305,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
PrimarySlotName,
wal_receiver_create_temp_slot);
- receivedUpto = 0;
+ flushedUpto = 0;
}
/*
@@ -12329,14 +12329,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* XLogReceiptTime will not advance, so the grace time
* allotted to conflicting queries will decrease.
*/
- if (RecPtr < receivedUpto)
+ if (RecPtr < flushedUpto)
havedata = true;
else
{
XLogRecPtr latestChunkStart;
- receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
- if (RecPtr < receivedUpto && receiveTLI == curFileTLI)
+ flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI);
+ if (RecPtr < flushedUpto && receiveTLI == curFileTLI)
{
havedata = true;
if (latestChunkStart <= RecPtr)
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index b84ba572596082af229d74eedd4cf50bc5ef8ccc..00e1b33ed5fd97ed256a592fb825d8d51bbe3a03 100644 (file)
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS)
{
XLogRecPtr recptr;
- recptr = GetWalRcvWriteRecPtr(NULL, NULL);
+ recptr = GetWalRcvFlushRecPtr(NULL, NULL);
if (recptr == 0)
PG_RETURN_NULL();
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 0cbb9906135efb039a0432b3cf8c7a4d7c8ec915..8ccdd86e74b20d18346d8dccaebd720dc01ac577 100644 (file)
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
WalRcvData->receiveStart.
As walreceiver receives WAL from the master server, and writes and flushes
-it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals
+it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
the startup process to know how far WAL replay can advance.
Walreceiver sends information about replication progress to the master server
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index aee67c61aa67917028777ab772c5bce31d892bc9..d69fb90132d1d39ef16bd410bc38ceca8cd7afa9 100644 (file)
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -12,7 +12,7 @@
* in the primary server), and then keeps receiving XLOG records and
* writing them to the disk as long as the connection is alive. As XLOG
* records are received and flushed to disk, it updates the
- * WalRcv->receivedUpto variable in shared memory, to inform the startup
+ * WalRcv->flushedUpto variable in shared memory, to inform the startup
* process of how far it can proceed with XLOG replay.
*
* A WAL receiver cannot directly load GUC parameters used when establishing
@@ -261,6 +261,8 @@ WalReceiverMain(void)
SpinLockRelease(&walrcv->mutex);
+ pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
LogstreamResult.Write = recptr;
}
+
+ /* Update shared-memory status */
+ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
}
/*
@@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying)
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
- if (walrcv->receivedUpto < LogstreamResult.Flush)
+ if (walrcv->flushedUpto < LogstreamResult.Flush)
{
- walrcv->latestChunkStart = walrcv->receivedUpto;
- walrcv->receivedUpto = LogstreamResult.Flush;
+ walrcv->latestChunkStart = walrcv->flushedUpto;
+ walrcv->flushedUpto = LogstreamResult.Flush;
walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
@@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
state = WalRcv->walRcvState;
receive_start_lsn = WalRcv->receiveStart;
receive_start_tli = WalRcv->receiveStartTLI;
- received_lsn = WalRcv->receivedUpto;
+ received_lsn = WalRcv->flushedUpto;
received_tli = WalRcv->receivedTLI;
last_send_time = WalRcv->lastMsgSendTime;
last_receipt_time = WalRcv->lastMsgReceiptTime;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 21d182360766fa03136ddcde938c066b7d6c4a93..4afad83539ce98e84b332574530357571386fa44 100644 (file)
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
/*
* If this is the first startup of walreceiver (on this timeline),
- * initialize receivedUpto and latestChunkStart to the starting point.
+ * initialize flushedUpto and latestChunkStart to the starting point.
*/
if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
{
- walrcv->receivedUpto = recptr;
+ walrcv->flushedUpto = recptr;
walrcv->receivedTLI = tli;
walrcv->latestChunkStart = recptr;
}
@@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
}
/*
- * Returns the last+1 byte position that walreceiver has written.
+ * Returns the last+1 byte position that walreceiver has flushed.
*
* Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not
@@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
* receiveTLI.
*/
XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
+GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{
WalRcvData *walrcv = WalRcv;
XLogRecPtr recptr;
SpinLockAcquire(&walrcv->mutex);
- recptr = walrcv->receivedUpto;
+ recptr = walrcv->flushedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
if (receiveTLI)
@@ -328,6 +328,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
return recptr;
}
+/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+ WalRcvData *walrcv = WalRcv;
+
+ return pg_atomic_read_u64(&walrcv->writtenUpto);
+}
+
/*
* Returns the replication apply delay in ms or -1
* if the apply delay info is not available
@@ -345,7 +357,7 @@ GetReplicationApplyDelay(void)
TimestampTz chunkReplayStartTime;
SpinLockAcquire(&walrcv->mutex);
- receivePtr = walrcv->receivedUpto;
+ receivePtr = walrcv->flushedUpto;
SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 06e8b7903601041fabee3291556a5c063ce9c182..122d884f3e4c9cf9b307c2e77352006eed98851e 100644 (file)
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void)
* has streamed, but hasn't been replayed yet.
*/
- receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+ receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
ThisTimeLineID = replayTLI;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cf3e43128c792ebd7bfb7170d8507b51b7b4bf30..f1aa6e9977cf7af82eed73297ba8f6955189d725 100644 (file)
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -16,6 +16,7 @@
#include "access/xlogdefs.h"
#include "getaddrinfo.h" /* for NI_MAXHOST */
#include "pgtime.h"
+#include "port/atomics.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
#include "storage/latch.h"
@@ -73,19 +74,19 @@ typedef struct
TimeLineID receiveStartTLI;
/*
- * receivedUpto-1 is the last byte position that has already been
+ * flushedUpto-1 is the last byte position that has already been
* received, and receivedTLI is the timeline it came from. At the first
* startup of walreceiver, these are set to receiveStart and
* receiveStartTLI. After that, walreceiver updates these whenever it
* flushes the received WAL to disk.
*/
- XLogRecPtr receivedUpto;
+ XLogRecPtr flushedUpto;
TimeLineID receivedTLI;
/*
* latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of
- * receivedUpto before the last flush to disk. Startup process can use
+ * flushedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not.
*/
XLogRecPtr latestChunkStart;
@@ -141,6 +142,14 @@ typedef struct
slock_t mutex; /* locks shared variables shown above */
+ /*
+ * Like flushedUpto, but advanced after writing and before flushing,
+ * without the need to acquire the spin lock. Data can be read by another
+ * process up to this point, but shouldn't be used for data integrity
+ * purposes.
+ */
+ pg_atomic_uint64 writtenUpto;
+
/*
* force walreceiver reply? This doesn't need to be locked; memory
* barriers for ordering are sufficient. But we do need atomic fetch and
@@ -322,7 +331,8 @@ extern bool WalRcvRunning(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
const char *conninfo, const char *slotname,
bool create_temp_slot);
-extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
+extern XLogRecPtr GetWalRcvWriteRecPtr(void);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
extern void WalRcvForceReply(void);
This is the main PostgreSQL git repository.
RSS Atom

AltStyle によって変換されたページ (->オリジナル) /