133{
141 int32 send_count = 0;
142 int32 receive_count = 0;
149
150 /* A negative loopcount is nonsensical. */
151 if (loop_count < 0)
153 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
155
156 /*
157 * Using the nonblocking interfaces, we can even send data to ourselves,
158 * so the minimum number of workers for this test is zero.
159 */
160 if (nworkers < 0)
162 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163 errmsg(
"number of workers must be an integer value greater than or equal to zero")));
164
165 /* Set up dynamic shared memory segment and background workers. */
167
168 /* Main loop. */
169 for (;;)
170 {
171 bool wait = true;
172
173 /*
174 * If we haven't yet sent the message the requisite number of times,
175 * try again to send it now. Note that when shm_mq_send() returns
176 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
177 * same message size and contents; that's not an issue here because
178 * we're sending the same message every time.
179 */
180 if (send_count < loop_count)
181 {
182 res =
shm_mq_send(outqh, message_size, message_contents,
true,
183 true);
185 {
186 ++send_count;
187 wait = false;
188 }
191 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 errmsg(
"could not send message")));
193 }
194
195 /*
196 * If we haven't yet received the message the requisite number of
197 * times, try to receive it again now.
198 */
199 if (receive_count < loop_count)
200 {
203 {
204 ++receive_count;
205 /* Verifying every time is slow, so it's optional. */
206 if (verify)
208 wait = false;
209 }
212 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213 errmsg(
"could not receive message")));
214 }
215 else
216 {
217 /*
218 * Otherwise, we've received the message enough times. This
219 * shouldn't happen unless we've also sent it enough times.
220 */
221 if (send_count != receive_count)
223 (
errcode(ERRCODE_INTERNAL_ERROR),
224 errmsg(
"message sent %d times, but received %d times",
225 send_count, receive_count)));
226 break;
227 }
228
229 if (wait)
230 {
231 /* first time, allocate or get the custom wait event */
234
235 /*
236 * If we made no progress, wait for one of the other processes to
237 * which we are connected to set our latch, indicating that they
238 * have read or written data and therefore there may now be work
239 * for us to do.
240 */
245 }
246 }
247
248 /* Clean up. */
250
252}
#define PG_GETARG_BOOL(n)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define CHECK_FOR_INTERRUPTS()
static uint32 we_message_queue
uint32 WaitEventExtensionNew(const char *wait_event_name)
#define WL_EXIT_ON_PM_DEATH