1/*-------------------------------------------------------------------------
4 * Scan a plan in multiple workers, and do order-preserving merge.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/executor/nodeGatherMerge.c
12 *-------------------------------------------------------------------------
27 * When we read tuples from workers, it's a good idea to read several at once
28 * for efficiency when possible: this minimizes context-switching overhead.
29 * But reading too many at a time wastes memory without improving performance.
30 * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
32 #define MAX_TUPLE_STORE 10
35 * Pending-tuple array for each worker. This holds additional tuples that
36 * we were able to fetch from the worker, but can't process yet. In addition,
37 * this struct holds the "done" flag indicating the worker is known to have
38 * no more tuples. (We do not use this struct for the leader; we don't keep
39 * any pending tuples for the leader, and the need_to_scan_locally flag serves
40 * as its "done" indicator.)
45 int nTuples;
/* number of tuples currently stored */
47 bool done;
/* true if reader is known exhausted */
54 bool nowait,
bool *done);
63/* ----------------------------------------------------------------
65 * ----------------------------------------------------------------
74 /* Gather merge node doesn't have innerPlan node. */
78 * create state structure
90 * Miscellaneous initialization
92 * create expression context for node
97 * GatherMerge doesn't support checking a qual (it's always more efficient
98 * to do it in the child node).
103 * now initialize outer plan
109 * Leader may access ExecProcNode result directly (if
110 * need_to_scan_locally), or from workers via tuple queue. So we can't
111 * trivially rely on the slot type being fixed for expressions evaluated
118 * Store the tuple descriptor into gather merge state, so we can use it
119 * while initializing the gather merge slots.
125 * Initialize result type and projection.
131 * Without projections result slot type is not trivially known, see
141 * initialize sort-key information
161 * We don't perform abbreviated key conversion here, for the same
162 * reasons that it isn't used in MergeAppend
170 /* Now allocate the workspace for gather merge */
176/* ----------------------------------------------------------------
177 * ExecGatherMerge(node)
179 * Scans the relation via multiple workers and returns
180 * the next qualifying tuple.
181 * ----------------------------------------------------------------
193 * As with Gather, we don't launch workers until this node is actually
202 * Sometimes we might have to run without parallelism; but if parallel
203 * mode is active then we can try to fire up some workers.
209 /* Initialize, or re-initialize, shared state needed by workers. */
221 /* Try to launch workers. */
224 /* We save # workers launched for the benefit of EXPLAIN */
228 * Count number of workers originally wanted and actually
234 /* Set up tuple queue readers to read the results. */
238 /* Make a working array showing the active readers */
247 /* No workers? Then never mind. */
253 /* allow leader to participate if enabled or no choice */
260 * Reset per-tuple memory context to free any expression evaluation
261 * storage allocated in the previous tuple cycle.
267 * Get next tuple, either from one of our workers, or by running the plan
274 /* If no projection is required, we're done. */
279 * Form the result tuple using ExecProject(), and return it.
285/* ----------------------------------------------------------------
288 * frees any storage allocated through C routines.
289 * ----------------------------------------------------------------
298/* ----------------------------------------------------------------
299 * ExecShutdownGatherMerge
301 * Destroy the setup for parallel workers including parallel context.
302 * ----------------------------------------------------------------
309 /* Now destroy the parallel context. */
310 if (node->
pei != NULL)
317/* ----------------------------------------------------------------
318 * ExecShutdownGatherMergeWorkers
320 * Stop all the parallel workers.
321 * ----------------------------------------------------------------
326 if (node->
pei != NULL)
329 /* Flush local copy of reader array */
335/* ----------------------------------------------------------------
336 * ExecReScanGatherMerge
338 * Prepare to re-scan the result of a GatherMerge.
339 * ----------------------------------------------------------------
347 /* Make sure any existing workers are gracefully shut down */
350 /* Free any unused tuples, so we don't leak memory across rescans */
353 /* Mark node so that shared state will be rebuilt at next call */
358 * Set child node's chgParam to tell it that the next scan might deliver a
359 * different set of rows within the leader process. (The overall rowset
360 * shouldn't change, but the leader process's subset might; hence nodes
361 * between here and the parallel table scan node mustn't optimize on the
362 * assumption of an unchanging rowset.)
369 * If chgParam of subnode is not null then plan will be re-scanned by
370 * first ExecProcNode. Note: because this does nothing if we have a
371 * rescan_param, it's currently guaranteed that parallel-aware child nodes
372 * will not see a ReScan call until after they get a ReInitializeDSM call.
373 * That ordering might not be something to rely on, though. A good rule
374 * of thumb is that ReInitializeDSM should reset only shared state, ReScan
375 * should reset only local state, and anything that depends on both of
376 * those steps being finished must wait until the first ExecProcNode call.
383 * Set up the data structures that we'll need for Gather Merge.
385 * We allocate these once on the basis of gm->num_workers, which is an
386 * upper bound for the number of workers we'll actually have. During
387 * a rescan, we reset the structures to empty. This approach simplifies
388 * not leaking memory across rescans.
390 * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
391 * are for workers. The values placed into gm_heap correspond to indexes
392 * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
393 * 0 to n-1; it has no entry for the leader.
403 * Allocate gm_slots for the number of workers + one more slot for leader.
404 * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
405 * read the tuple, and then stores it directly into its gm_slots entry.
406 * For other slots, code below will call ExecInitExtraTupleSlot() to
407 * create a slot for the worker's results. Note that during any single
408 * scan, we might have fewer than num_workers available workers, in which
409 * case the extra array entries go unused.
414 /* Allocate the tuple slot and tuple array for each worker */
418 for (
i = 0;
i < nreaders;
i++)
420 /* Allocate the tuple array with length MAX_TUPLE_STORE */
424 /* Initialize tuple slot for worker */
430 /* Allocate the resources for the merge */
437 * Initialize the Gather Merge.
439 * Reset data structures to ensure they're empty. Then pull at least one
440 * tuple from leader + each worker (or set its "done" indicator), and set up
450 /* Assert that gather_merge_setup made enough space */
453 /* Reset leader's tuple slot to empty */
456 /* Reset the tuple slot and tuple array for each worker */
457 for (
i = 0;
i < nreaders;
i++)
459 /* Reset tuple array to empty */
462 /* Reset done flag to not-done */
464 /* Ensure output slot is empty */
468 /* Reset binary heap to empty */
472 * First, try to read a tuple from each worker (including leader) in
473 * nowait mode. After this, if not all workers were able to produce a
474 * tuple (or a "done" indication), then re-read from remaining workers,
475 * this time using wait mode. Add all live readers (those producing at
476 * least one tuple) to the heap.
479 for (
i = 0;
i <= nreaders;
i++)
483 /* skip this source if already known done */
489 /* Don't have a tuple yet, try to get one */
497 * We already got at least one tuple from this worker, but
498 * might as well see if it has any more ready by now.
505 /* need not recheck leader, since nowait doesn't matter for it */
506 for (
i = 1;
i <= nreaders;
i++)
516 /* Now heapify the heap. */
523 * Clear out the tuple table slot, and any unused pending tuples,
524 * for each gather merge input.
543 * Read the next tuple for gather merge.
545 * Fetch the sorted tuple out of the heap.
555 * First time through: pull the first tuple from each participant, and
563 * Otherwise, pull the next tuple from whichever participant we
564 * returned from last time, and reinsert that participant's index into
565 * the heap, because it might now compare differently against the
566 * other elements of the heap.
574 /* reader exhausted, remove it from heap */
581 /* All the queues are exhausted, and so is the heap */
587 /* Return next tuple from whichever participant has the leading one */
594 * Read tuple(s) for given reader in nowait mode, and load into its tuple
595 * array, until we have MAX_TUPLE_STORE of them or would have to block.
603 /* Don't do anything if this is the leader. */
609 /* If there's nothing in the array, reset the counters to zero. */
613 /* Try to fill additional slots in the array. */
621 &tuple_buffer->
done);
624 tuple_buffer->
tuple[
i] = tuple;
630 * Store the next tuple for a given reader into the appropriate slot.
632 * Returns true if successful, false if not (either reader is exhausted,
633 * or we didn't want to wait for a tuple). Sets done flag if reader
634 * is found to be exhausted.
643 * If we're being asked to generate a tuple from the leader, then we just
644 * call ExecProcNode as normal to produce one.
654 /* Install our DSA area while executing the plan. */
661 gm_state->
gm_slots[0] = outerTupleSlot;
664 /* need_to_scan_locally serves as "done" flag for leader */
670 /* Otherwise, check the state of the relevant tuple buffer. */
675 /* Return any tuple previously read that is still buffered. */
678 else if (tuple_buffer->
done)
680 /* Reader is known to be exhausted. */
685 /* Read and buffer next tuple. */
689 &tuple_buffer->
done);
694 * Attempt to read more tuples in nowait mode and store them in the
695 * pending-tuple array for the reader.
702 /* Build the TupleTableSlot for the given tuple */
704 gm_state->
gm_slots[reader],
/* slot in which to
706 true);
/* pfree tuple when done with it */
712 * Attempt to read a tuple from given worker.
721 /* Check for async events, particularly messages from workers. */
725 * Attempt to read a tuple.
727 * Note that TupleQueueReaderNext will just return NULL for a worker which
728 * fails to initialize. We'll treat that worker as having produced no
729 * tuples; WaitForParallelWorkersToFinish will error out when we get
732 reader = gm_state->
reader[nreader - 1];
736 * Since we'll be buffering these across multiple calls, we need to make a
743 * We have one slot for each item in the heap array. We use SlotNumber
744 * to store slot indexes. This doesn't actually provide any formal
745 * type-safety, but it makes the code more self-documenting.
750 * Compare the tuples in the two given slots.
766 for (nkey = 0; nkey < node->
gm_nkeys; nkey++)
void LaunchParallelWorkers(ParallelContext *pcxt)
void binaryheap_build(binaryheap *heap)
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
void binaryheap_reset(binaryheap *heap)
bh_node_type binaryheap_first(binaryheap *heap)
bh_node_type binaryheap_remove_first(binaryheap *heap)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
#define binaryheap_empty(h)
Bitmapset * bms_add_member(Bitmapset *a, int x)
#define INVERT_COMPARE_RESULT(var)
void ExecReScan(PlanState *node)
void ExecParallelCleanup(ParallelExecutorInfo *pei)
void ExecParallelReinitialize(PlanState *planstate, ParallelExecutorInfo *pei, Bitmapset *sendParams)
void ExecParallelCreateReaders(ParallelExecutorInfo *pei)
ParallelExecutorInfo * ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed)
void ExecParallelFinish(ParallelExecutorInfo *pei)
void ExecEndNode(PlanState *node)
PlanState * ExecInitNode(Plan *node, EState *estate, int eflags)
void ExecInitResultTypeTL(PlanState *planstate)
TupleTableSlot * ExecStoreMinimalTuple(MinimalTuple mtup, TupleTableSlot *slot, bool shouldFree)
TupleTableSlot * ExecInitExtraTupleSlot(EState *estate, TupleDesc tupledesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleDesc ExecGetResultType(PlanState *planstate)
void ExecAssignExprContext(EState *estate, PlanState *planstate)
void ExecConditionalAssignProjectionInfo(PlanState *planstate, TupleDesc inputDesc, int varno)
#define outerPlanState(node)
static TupleTableSlot * ExecProject(ProjectionInfo *projInfo)
#define ResetExprContext(econtext)
static TupleTableSlot * ExecProcNode(PlanState *node)
static int compare(const void *arg1, const void *arg2)
Assert(PointerIsAligned(start, uint64))
MinimalTuple heap_copy_minimal_tuple(MinimalTuple mtup, Size extra)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
struct GMReaderTupleBuffer GMReaderTupleBuffer
static void gather_merge_init(GatherMergeState *gm_state)
static void gather_merge_setup(GatherMergeState *gm_state)
static int32 heap_compare_slots(Datum a, Datum b, void *arg)
void ExecReScanGatherMerge(GatherMergeState *node)
static void gather_merge_clear_tuples(GatherMergeState *gm_state)
void ExecShutdownGatherMerge(GatherMergeState *node)
static void load_tuple_array(GatherMergeState *gm_state, int reader)
GatherMergeState * ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node)
static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done)
void ExecEndGatherMerge(GatherMergeState *node)
static TupleTableSlot * gather_merge_getnext(GatherMergeState *gm_state)
static TupleTableSlot * ExecGatherMerge(PlanState *pstate)
#define castNode(_type_, nodeptr)
bool parallel_leader_participation
static Datum Int32GetDatum(int32 X)
static int32 DatumGetInt32(Datum X)
void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
static int ApplySortComparator(Datum datum1, bool isNull1, Datum datum2, bool isNull2, SortSupport ssup)
struct dsa_area * es_query_dsa
int es_parallel_workers_to_launch
bool es_use_parallel_mode
int es_parallel_workers_launched
TupleTableSlot * ecxt_outertuple
struct ParallelExecutorInfo * pei
struct TupleQueueReader ** reader
struct GMReaderTupleBuffer * gm_tuple_buffers
TupleTableSlot ** gm_slots
bool need_to_scan_locally
struct binaryheap * gm_heap
struct TupleQueueReader ** reader
ExprContext * ps_ExprContext
ProjectionInfo * ps_ProjInfo
ExecProcNodeMtd ExecProcNode
MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)