1/*--------------------------------------------------------------------------
4 * Test harness code for shared memory message queues.
6 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
9 * src/test/modules/test_shm_mq/test.c
11 * -------------------------------------------------------------------------
31/* value cached, fetched from shared memory */
35 * Simple test of the shared memory message queue infrastructure.
37 * We set up a ring of message queues passing through 1 or more background
38 * processes and eventually looping back to ourselves. We then send a message
39 * through the ring a number of times indicated by the loop count. At the end,
40 * we check whether the final message matches the one we started with.
58 /* A negative loopcount is nonsensical. */
61 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
65 * Since this test sends data using the blocking interfaces, it cannot
66 * send data to itself. Therefore, a minimum of 1 worker is required. Of
67 * course, a negative worker count is nonsensical.
71 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
72 errmsg(
"number of workers must be an integer value greater than zero")));
74 /* Set up dynamic shared memory segment and background workers. */
77 /* Send the initial message. */
78 res =
shm_mq_send(outqh, message_size, message_contents,
false,
true);
81 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
82 errmsg(
"could not send message")));
85 * Receive a message and send it back out again. Do this a number of
86 * times equal to the loop count.
90 /* Receive a message. */
94 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
95 errmsg(
"could not receive message")));
97 /* If this is supposed to be the last iteration, stop here. */
98 if (--loop_count <= 0)
101 /* Send it back out. */
105 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
106 errmsg(
"could not send message")));
110 * Finally, check that we got back the same message from the last
111 * iteration that we originally sent.
122 * Pipelined test of the shared memory message queue infrastructure.
124 * As in the basic test, we set up a ring of message queues passing through
125 * 1 or more background processes and eventually looping back to ourselves.
126 * Then, we send N copies of the user-specified message through the ring and
127 * receive them all back. Since this might fill up all message queues in the
128 * ring and then stall, we must be prepared to begin receiving the messages
129 * back before we've finished sending them.
141 int32 send_count = 0;
142 int32 receive_count = 0;
150 /* A negative loopcount is nonsensical. */
153 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 errmsg(
"repeat count size must be an integer value greater than or equal to zero")));
157 * Using the nonblocking interfaces, we can even send data to ourselves,
158 * so the minimum number of workers for this test is zero.
162 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163 errmsg(
"number of workers must be an integer value greater than or equal to zero")));
165 /* Set up dynamic shared memory segment and background workers. */
174 * If we haven't yet sent the message the requisite number of times,
175 * try again to send it now. Note that when shm_mq_send() returns
176 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
177 * same message size and contents; that's not an issue here because
178 * we're sending the same message every time.
180 if (send_count < loop_count)
182 res =
shm_mq_send(outqh, message_size, message_contents,
true,
191 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 errmsg(
"could not send message")));
196 * If we haven't yet received the message the requisite number of
197 * times, try to receive it again now.
199 if (receive_count < loop_count)
205 /* Verifying every time is slow, so it's optional. */
212 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213 errmsg(
"could not receive message")));
218 * Otherwise, we've received the message enough times. This
219 * shouldn't happen unless we've also sent it enough times.
221 if (send_count != receive_count)
223 (
errcode(ERRCODE_INTERNAL_ERROR),
224 errmsg(
"message sent %d times, but received %d times",
225 send_count, receive_count)));
231 /* first time, allocate or get the custom wait event */
236 * If we made no progress, wait for one of the other processes to
237 * which we are connected to set our latch, indicating that they
238 * have read or written data and therefore there may now be work
255 * Verify that two messages are the same.
262 if (origlen != newlen)
264 (
errmsg(
"message corrupted"),
265 errdetail(
"The original message was %zu bytes but the final message is %zu bytes.",
268 for (
i = 0;
i < origlen; ++
i)
269 if (origdata[
i] != newdata[
i])
271 (
errmsg(
"message corrupted"),
272 errdetail(
"The new and original messages differ at byte %zu of %zu.",
i, origlen)));
void dsm_detach(dsm_segment *seg)
int errdetail(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define PG_GETARG_TEXT_PP(n)
#define PG_GETARG_INT64(n)
#define PG_GETARG_INT32(n)
#define PG_GETARG_BOOL(n)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define CHECK_FOR_INTERRUPTS()
void test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, shm_mq_handle **output, shm_mq_handle **input)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Datum test_shm_mq_pipelined(PG_FUNCTION_ARGS)
static void verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
PG_FUNCTION_INFO_V1(test_shm_mq)
Datum test_shm_mq(PG_FUNCTION_ARGS)
static uint32 we_message_queue
static Size VARSIZE_ANY_EXHDR(const void *PTR)
static char * VARDATA_ANY(const void *PTR)
uint32 WaitEventExtensionNew(const char *wait_event_name)
#define WL_EXIT_ON_PM_DEATH