index ada374c0c4402da9e0cccaaf4baf2c7ccf8dd063..2fb9a8bf580639d50ca1ea224cae434d9678bfd1 100644 (file)
case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
event_name = "WalReceiverWaitStart";
break;
- case WAIT_EVENT_LIBPQWALRECEIVER_READ:
- event_name = "LibPQWalReceiverRead";
+ case WAIT_EVENT_LIBPQWALRECEIVER:
+ event_name = "LibPQWalReceiver";
break;
case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
event_name = "WalSenderWaitForWAL";
index daae3f70e73baa6ddad65c8fcd9698b87d4292e5..048d2aaa76b8cca5bfd02d2487f5827c2879301a 100644 (file)
@@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
char **err)
{
WalReceiverConn *conn;
+ PostgresPollingStatusType status;
const char *keys[5];
const char *vals[5];
int i = 0;
@@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
Assert(i < sizeof(keys));
conn = palloc0(sizeof(WalReceiverConn));
- conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
+ conn->streamConn = PQconnectStartParams(keys, vals,
+ /* expand_dbname = */ true);
+ if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+ {
+ *err = pchomp(PQerrorMessage(conn->streamConn));
+ return NULL;
+ }
+
+ /* Poll connection. */
+ do
+ {
+ /* Determine current state of the connection. */
+ status = PQconnectPoll(conn->streamConn);
+
+ /* Sleep a bit if waiting for socket. */
+ if (status == PGRES_POLLING_READING ||
+ status == PGRES_POLLING_WRITING)
+ {
+ int extra_flag;
+ int rc;
+
+ extra_flag = (status == PGRES_POLLING_READING
+ ? WL_SOCKET_READABLE
+ : WL_SOCKET_WRITEABLE);
+
+ ResetLatch(&MyProc->procLatch);
+ rc = WaitLatchOrSocket(&MyProc->procLatch,
+ WL_POSTMASTER_DEATH |
+ WL_LATCH_SET | extra_flag,
+ PQsocket(conn->streamConn),
+ 0,
+ WAIT_EVENT_LIBPQWALRECEIVER);
+
+ /* Emergency bailout. */
+ if (rc & WL_POSTMASTER_DEATH)
+ exit(1);
+
+ /* Interrupted. */
+ if (rc & WL_LATCH_SET)
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Otherwise loop until we have OK or FAILED status. */
+ } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+
if (PQstatus(conn->streamConn) != CONNECTION_OK)
{
*err = pchomp(PQerrorMessage(conn->streamConn));
@@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
WL_LATCH_SET,
PQsocket(streamConn),
0,
- WAIT_EVENT_LIBPQWALRECEIVER_READ);
+ WAIT_EVENT_LIBPQWALRECEIVER);
if (rc & WL_POSTMASTER_DEATH)
exit(1);
index 8b710ecb24e2011e2b9861fc7d73c7e2466467b3..0062fb8af24d0af80e4acd089f83b32013b81168 100644 (file)
WAIT_EVENT_CLIENT_WRITE,
WAIT_EVENT_SSL_OPEN_SERVER,
WAIT_EVENT_WAL_RECEIVER_WAIT_START,
- WAIT_EVENT_LIBPQWALRECEIVER_READ,
+ WAIT_EVENT_LIBPQWALRECEIVER,
WAIT_EVENT_WAL_SENDER_WAIT_WAL,
WAIT_EVENT_WAL_SENDER_WRITE_DATA
} WaitEventClient;