1/*-------------------------------------------------------------------------
4 * Use shm_mq to send & receive tuples between parallel backends
6 * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 * under the hood, writes tuples from the executor to a shm_mq.
9 * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
11 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
15 * src/backend/executor/tqueue.c
17 *-------------------------------------------------------------------------
26 * DestReceiver object's private contents
28 * queue is a pointer to data supplied by DestReceiver's caller.
37 * TupleQueueReader object's private contents
39 * queue is a pointer to data supplied by reader's caller.
41 * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
49 * Receive a tuple from a query, and send it to the designated shm_mq.
51 * Returns true if successful, false if shm_mq has been detached.
61 /* Send the tuple itself. */
68 /* Check for failure. */
73 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 errmsg(
"could not send tuple to shared-memory queue")));
80 * Prepare to receive tuples from executor.
89 * Clean up at end of an executor run
96 if (tqueue->
queue != NULL)
102 * Destroy receiver when done with it
109 /* We probably already detached from queue, but let's be sure */
110 if (tqueue->
queue != NULL)
116 * Create a DestReceiver that writes tuples to a tuple queue.
130 self->
queue = handle;
136 * Create a tuple queue reader.
143 reader->
queue = handle;
149 * Destroy a tuple queue reader.
151 * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 * We won't access it here, as it may be detached already.
161 * Fetch a tuple from a tuple queue reader.
163 * The return value is NULL if there are no remaining tuples or if
164 * nowait = true and no tuple is ready to return. *done, if not NULL,
165 * is set to true when there are no remaining tuples and otherwise to false.
167 * The returned tuple, if any, is either in shared memory or a private buffer
168 * and should not be freed. The pointer is invalid after the next call to
169 * TupleQueueReaderNext().
171 * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 * accumulate bytes from a partially-read message, so it's useful to call
173 * this with nowait = true even if nothing is returned.
186 /* Attempt to read a message. */
189 /* If queue is detached, set *done and return NULL. */
197 /* In non-blocking mode, bail out if no message ready yet. */
203 * Return a pointer to the queue memory directly (which had better be
204 * sufficiently aligned).
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
MinimalTuple ExecFetchSlotMinimalTuple(TupleTableSlot *slot, bool *shouldFree)
Assert(PointerIsAligned(start, uint64))
MinimalTupleData * MinimalTuple
void pfree(void *pointer)
void * palloc0(Size size)
void shm_mq_detach(shm_mq_handle *mqh)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
void(* rStartup)(DestReceiver *self, int operation, TupleDesc typeinfo)
void(* rShutdown)(DestReceiver *self)
bool(* receiveSlot)(TupleTableSlot *slot, DestReceiver *self)
void(* rDestroy)(DestReceiver *self)
struct TQueueDestReceiver TQueueDestReceiver
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
static void tqueueShutdownReceiver(DestReceiver *self)
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
DestReceiver * CreateTupleQueueDestReceiver(shm_mq_handle *handle)
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
TupleQueueReader * CreateTupleQueueReader(shm_mq_handle *handle)
void DestroyTupleQueueReader(TupleQueueReader *reader)
static void tqueueDestroyReceiver(DestReceiver *self)