index 63e60478ea6e8f4b7041ac28808c039958dfe3a4..fff6c54c45d3f1507757f8408f33a5d310bd2772 100644 (file)
#include <signal.h>
#include "access/xlog_internal.h"
+#include "pgstat.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
/* First time through, so initialize */
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
+ ConditionVariableInit(&WalRcv->walRcvStoppedCV);
SpinLockInit(&WalRcv->mutex);
pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
WalRcv->latch = NULL;
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
- SpinLockAcquire(&walrcv->mutex);
+ bool stopped = false;
+ SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING)
+ {
state = walrcv->walRcvState = WALRCV_STOPPED;
-
+ stopped = true;
+ }
SpinLockRelease(&walrcv->mutex);
+
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
}
}
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
- SpinLockAcquire(&walrcv->mutex);
+ bool stopped = false;
+ SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING)
+ {
state = walrcv->walRcvState = WALRCV_STOPPED;
-
+ stopped = true;
+ }
SpinLockRelease(&walrcv->mutex);
+
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
}
}
{
WalRcvData *walrcv = WalRcv;
pid_t walrcvpid = 0;
+ bool stopped = false;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
break;
case WALRCV_STARTING:
walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
break;
case WALRCV_STREAMING:
}
SpinLockRelease(&walrcv->mutex);
+ /* Unnecessary but consistent. */
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+
/*
* Signal walreceiver process if it was still running.
*/
* Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED.
*/
+ ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
while (WalRcvRunning())
- {
- /*
- * This possibly-long loop needs to handle interrupts of startup
- * process.
- */
- HandleStartupProcInterrupts();
-
- pg_usleep(100000); /* 100ms */
- }
+ ConditionVariableSleep(&walrcv->walRcvStoppedCV,
+ WAIT_EVENT_WALRCV_EXIT);
+ ConditionVariableCancelSleep();
}
/*
index a97a59a6a30aaedf74b356f01094cdef4ee3af46..4fd7c25ea74e88f01779f5c5798476e780f43e90 100644 (file)
#include "port/atomics.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "utils/tuplestore.h"
*/
pid_t pid;
WalRcvState walRcvState;
+ ConditionVariable walRcvStoppedCV;
pg_time_t startTime;
/*