git.postgresql.org Git - postgresql.git/commitdiff

git projects / postgresql.git / commitdiff
? search:
summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: b999c24)
Use latch instead of select() in walreceiver
2016年11月30日 17:00:00 +0000 (12:00 -0500)
Fri, 2 Dec 2016 01:23:28 +0000 (20:23 -0500)
Replace use of poll()/select() by WaitLatchOrSocket(), which is more
portable and flexible.

Also change walreceiver to use its procLatch instead of a custom latch.

From: Petr Jelinek <petr@2ndquadrant.com>


diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a3921977c572d0d96aea464eed2188ccdc18426e..c7584cb1d3492e89684bf986c163e926a86d3bae 100644 (file)
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
event_name = "WalReceiverWaitStart";
break;
+ case WAIT_EVENT_LIBPQWALRECEIVER_READ:
+ event_name = "LibPQWalReceiverRead";
+ break;
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
event_name = "WalSenderWaitForWAL";
break;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f1c843e868c08bedf5a520df3052564f9c7fae15..6c01e7b991853a3388f11a36db605cb2abbb61eb 100644 (file)
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,19 +23,11 @@
#include "pqexpbuffer.h"
#include "access/xlog.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/walreceiver.h"
+#include "storage/proc.h"
#include "utils/builtins.h"
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
-static bool libpq_select(int timeout_ms);
static PGresult *libpqrcv_PQexec(const char *query);
/*
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
PQclear(res);
}
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-libpq_select(int timeout_ms)
-{
- int ret;
-
- Assert(streamConn != NULL);
- if (PQsocket(streamConn) < 0)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
-
- /* We use poll(2) if available, otherwise select(2) */
- {
-#ifdef HAVE_POLL
- struct pollfd input_fd;
-
- input_fd.fd = PQsocket(streamConn);
- input_fd.events = POLLIN | POLLERR;
- input_fd.revents = 0;
-
- ret = poll(&input_fd, 1, timeout_ms);
-#else /* !HAVE_POLL */
-
- fd_set input_mask;
- struct timeval timeout;
- struct timeval *ptr_timeout;
-
- FD_ZERO(&input_mask);
- FD_SET(PQsocket(streamConn), &input_mask);
-
- if (timeout_ms < 0)
- ptr_timeout = NULL;
- else
- {
- timeout.tv_sec = timeout_ms / 1000;
- timeout.tv_usec = (timeout_ms % 1000) * 1000;
- ptr_timeout = &timeout;
- }
-
- ret = select(PQsocket(streamConn) + 1, &input_mask,
- NULL, NULL, ptr_timeout);
-#endif /* HAVE_POLL */
- }
-
- if (ret == 0 || (ret < 0 && errno == EINTR))
- return false;
- if (ret < 0)
- ereport(ERROR,
- (errcode_for_socket_access(),
- errmsg("select() failed: %m")));
- return true;
-}
-
/*
* Send a query and wait for the results by using the asynchronous libpq
* functions and the backend version of select().
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
*/
while (PQisBusy(streamConn))
{
+ int rc;
+
/*
* We don't need to break down the sleep into smaller increments,
- * and check for interrupts after each nap, since we can just
- * elog(FATAL) within SIGTERM signal handler if the signal arrives
- * in the middle of establishment of replication connection.
+ * since we'll get interrupted by signals and can either handle
+ * interrupts here or elog(FATAL) within SIGTERM signal handler if
+ * the signal arrives in the middle of establishment of
+ * replication connection.
*/
- if (!libpq_select(-1))
- continue; /* interrupted */
+ ResetLatch(&MyProc->procLatch);
+ rc = WaitLatchOrSocket(&MyProc->procLatch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_LATCH_SET,
+ PQsocket(streamConn),
+ 0,
+ WAIT_EVENT_LIBPQWALRECEIVER_READ);
+ if (rc & WL_POSTMASTER_DEATH)
+ exit(1);
+
+ /* interrupted */
+ if (rc & WL_LATCH_SET)
+ {
+ CHECK_FOR_INTERRUPTS();
+ continue;
+ }
if (PQconsumeInput(streamConn) == 0)
return NULL; /* trouble */
}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 2bb3dce1b1c5bab7ac7d1e1aeb59e6e50b7bc7a3..8bfb041560823f2c3ca11ba09b42b7ece190c09f 100644 (file)
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -261,7 +261,7 @@ WalReceiverMain(void)
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
- OwnLatch(&walrcv->latch);
+ walrcv->latch = &MyProc->procLatch;
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
@@ -483,7 +483,7 @@ WalReceiverMain(void)
* avoiding some system calls.
*/
Assert(wait_fd != PGINVALID_SOCKET);
- rc = WaitLatchOrSocket(&walrcv->latch,
+ rc = WaitLatchOrSocket(walrcv->latch,
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
@@ -491,7 +491,7 @@ WalReceiverMain(void)
WAIT_EVENT_WAL_RECEIVER_MAIN);
if (rc & WL_LATCH_SET)
{
- ResetLatch(&walrcv->latch);
+ ResetLatch(walrcv->latch);
if (walrcv->force_reply)
{
/*
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
WakeupRecovery();
for (;;)
{
- ResetLatch(&walrcv->latch);
+ ResetLatch(walrcv->latch);
/*
* Emergency bailout if postmaster has died. This is to avoid the
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
}
SpinLockRelease(&walrcv->mutex);
- WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
+ WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
WAIT_EVENT_WAL_RECEIVER_WAIT_START);
}
@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
- DisownLatch(&walrcv->latch);
+ walrcv->latch = NULL;
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
got_SIGTERM = true;
- SetLatch(&WalRcv->latch);
+ if (WalRcv->latch)
+ SetLatch(WalRcv->latch);
/* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
@@ -1297,7 +1298,8 @@ void
WalRcvForceReply(void)
{
WalRcv->force_reply = true;
- SetLatch(&WalRcv->latch);
+ if (WalRcv->latch)
+ SetLatch(WalRcv->latch);
}
/*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 5f6e423f1f63c74d53ed7607849a9bf81ac6eae9..01111a4c12bcd8f3ddde0ebe47edeab4ab3c304c 100644 (file)
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
- InitSharedLatch(&WalRcv->latch);
+ WalRcv->latch = NULL;
}
}
@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
if (launch)
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
- else
- SetLatch(&walrcv->latch);
+ else if (walrcv->latch)
+ SetLatch(walrcv->latch);
}
/*
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0b85b7ad3ae9975eb20e6803c86c498e037ff0de..152ff0620852133323edda179e17539893fbe539 100644 (file)
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -763,6 +763,7 @@ typedef enum
WAIT_EVENT_CLIENT_WRITE,
WAIT_EVENT_SSL_OPEN_SERVER,
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
+ WAIT_EVENT_LIBPQWALRECEIVER_READ,
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
WAIT_EVENT_WAL_SENDER_WRITE_DATA
} WaitEventClient;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index cd787c92b3fc9eb2fb8de62d1199ceb64cc6c1c1..afbb8d8b9541e450d08702834406cc7b835be80c 100644 (file)
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -127,8 +127,9 @@ typedef struct
* where to start streaming (after setting receiveStart and
* receiveStartTLI), and also to tell it to send apply feedback to the
* primary whenever specially marked commit records are applied.
+ * This is normally mapped to procLatch when walreceiver is running.
*/
- Latch latch;
+ Latch *latch;
} WalRcvData;
extern WalRcvData *WalRcv;
This is the main PostgreSQL git repository.
RSS Atom

AltStyle によって変換されたページ (->オリジナル) /