1/*-------------------------------------------------------------------------
2 * applyparallelworker.c
3 * Support routines for applying xact by parallel apply worker
5 * Copyright (c) 2023-2025, PostgreSQL Global Development Group
8 * src/backend/replication/logical/applyparallelworker.c
10 * This file contains the code to launch, set up, and teardown a parallel apply
11 * worker which receives the changes from the leader worker and invokes routines
12 * to apply those on the subscriber database. Additionally, this file contains
13 * routines that are intended to support setting up, using, and tearing down a
14 * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15 * apply workers can communicate with each other.
17 * The parallel apply workers are assigned (if available) as soon as xact's
18 * first stream is received for subscriptions that have set their 'streaming'
19 * option as parallel. The leader apply worker will send changes to this new
20 * worker via shared memory. We keep this worker assigned till the transaction
21 * commit is received and also wait for the worker to finish at commit. This
22 * preserves commit ordering and avoid file I/O in most cases, although we
23 * still need to spill to a file if there is no worker available. See comments
24 * atop logical/worker to know more about streamed xacts whose changes are
25 * spilled to disk. It is important to maintain commit order to avoid failures
26 * due to: (a) transaction dependencies - say if we insert a row in the first
27 * transaction and update it in the second transaction on publisher then
28 * allowing the subscriber to apply both in parallel can lead to failure in the
29 * update; (b) deadlocks - allowing transactions that update the same set of
30 * rows/tables in the opposite order to be applied in parallel can lead to
33 * A worker pool is used to avoid restarting workers for each streaming
34 * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35 * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36 * its information is added to the ParallelApplyWorkerPool. Once the worker
37 * finishes applying the transaction, it is marked as available for re-use.
38 * Now, before starting a new worker to apply the streaming transaction, we
39 * check the list for any available worker. Note that we retain a maximum of
40 * half the max_parallel_apply_workers_per_subscription workers in the pool and
41 * after that, we simply exit the worker after applying the transaction.
43 * XXX This worker pool threshold is arbitrary and we can provide a GUC
44 * variable for this in the future if required.
46 * The leader apply worker will create a separate dynamic shared memory segment
47 * when each parallel apply worker starts. The reason for this design is that
48 * we cannot predict how many workers will be needed. It may be possible to
49 * allocate enough shared memory in one segment based on the maximum number of
50 * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51 * this would waste memory if no process is actually started.
53 * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54 * send changes in the transaction from leader apply worker to parallel apply
55 * worker; (b) another shm_mq that is used to send errors (and other messages
56 * reported via elog/ereport) from the parallel apply worker to leader apply
57 * worker; (c) necessary information to be shared among parallel apply workers
58 * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
60 * Locking Considerations
61 * ----------------------
62 * We have a risk of deadlock due to concurrently applying the transactions in
63 * parallel mode that were independent on the publisher side but became
64 * dependent on the subscriber side due to the different database structures
65 * (like schema of subscription tables, constraints, etc.) on each side. This
66 * can happen even without parallel mode when there are concurrent operations
67 * on the subscriber. In order to detect the deadlocks among leader (LA) and
68 * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69 * next stream (set of changes) and LA waits for PA to finish the transaction.
70 * An alternative approach could be to not allow parallelism when the schema of
71 * tables is different between the publisher and subscriber but that would be
72 * too restrictive and would require the publisher to send much more
73 * information than it is currently sending.
75 * Consider a case where the subscribed table does not have a unique key on the
76 * publisher and has a unique key on the subscriber. The deadlock can happen in
79 * 1) Deadlock between the leader apply worker and a parallel apply worker
81 * Consider that the parallel apply worker (PA) is executing TX-1 and the
82 * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83 * Now, LA is waiting for PA because of the unique key constraint of the
84 * subscribed table while PA is waiting for LA to send the next stream of
85 * changes or transaction finish command message.
87 * In order for lmgr to detect this, we have LA acquire a session lock on the
88 * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89 * trying to receive the next stream of changes. Specifically, LA will acquire
90 * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91 * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92 * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93 * acquire the lock in AccessShare mode after processing STREAM_STOP and
94 * STREAM_ABORT (for subtransaction) and then release the lock immediately
97 * The lock graph for the above example will look as follows:
98 * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99 * acquire the stream lock) -> LA
101 * This way, when PA is waiting for LA for the next stream of changes, we can
102 * have a wait-edge from PA to LA in lmgr, which will make us detect the
103 * deadlock between LA and PA.
105 * 2) Deadlock between the leader apply worker and parallel apply workers
107 * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108 * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109 * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110 * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111 * transaction in order to preserve the commit order. There is a deadlock among
112 * the three processes.
114 * In order for lmgr to detect this, we have PA acquire a session lock (this is
115 * a different lock than referred in the previous case, see
116 * pa_lock_transaction()) on the transaction being applied and have LA wait on
117 * the lock before proceeding in the transaction finish commands. Specifically,
118 * PA will acquire this lock in AccessExclusive mode before executing the first
119 * message of the transaction and release it at the xact end. LA will acquire
120 * this lock in AccessShare mode at transaction finish commands and release it
123 * The lock graph for the above example will look as follows:
124 * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125 * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
128 * This way when LA is waiting to finish the transaction end command to preserve
129 * the commit order, we will be able to detect deadlock, if any.
131 * One might think we can use XactLockTableWait(), but XactLockTableWait()
132 * considers PREPARED TRANSACTION as still in progress which means the lock
133 * won't be released even after the parallel apply worker has prepared the
136 * 3) Deadlock when the shm_mq buffer is full
138 * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139 * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140 * wait to send messages, and this wait doesn't appear in lmgr.
142 * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143 * the timeout is exceeded, the LA will serialize all the pending messages to
144 * a file and indicate PA-2 that it needs to read that file for the remaining
145 * messages. Then LA will start waiting for commit as in the previous case
146 * which will detect deadlock if any. See pa_send_data() and
147 * enum TransApplyAction.
151 * Both the stream lock and the transaction lock mentioned above are
152 * session-level locks because both locks could be acquired outside the
153 * transaction, and the stream lock in the leader needs to persist across
154 * transaction boundaries i.e. until the end of the streaming transaction.
155 *-------------------------------------------------------------------------
175 #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
178 * DSM keys for parallel apply worker. Unlike other parallel execution code,
179 * since we don't need to worry about DSM keys conflicting with plan_node_id we
180 * can use small integers.
182 #define PARALLEL_APPLY_KEY_SHARED 1
183 #define PARALLEL_APPLY_KEY_MQ 2
184 #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
186/* Queue size of DSM, 16 MB for now. */
187 #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
190 * Error queue size of DSM. It is desirable to make it large enough that a
191 * typical ErrorResponse can be sent without blocking. That way, a worker that
192 * errors out can write the whole message into the queue and terminate without
193 * waiting for the user backend.
195 #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
198 * There are three fields in each message received by the parallel apply
199 * worker: start_lsn, end_lsn and send_time. Because we have updated these
200 * statistics in the leader apply worker, we can ignore these fields in the
201 * parallel apply worker (see function LogicalRepApplyLoop).
203 #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
206 * The type of session-level lock on a transaction being applied on a logical
207 * replication subscriber.
209 #define PARALLEL_APPLY_LOCK_STREAM 0
210 #define PARALLEL_APPLY_LOCK_XACT 1
213 * Hash table entry to map xid to the parallel apply worker state.
222 * A hash table used to cache the state of streaming transactions being applied
223 * by the parallel apply workers.
228* A list (pool) of active parallel apply workers. The information for
229* the new worker is added to the list after successfully launching it. The
230* list entry is removed if there are already enough workers in the worker
231* pool at the end of the transaction. For more information about the worker
232* pool, see comments atop this file.
237 * Information shared between leader apply worker and parallel apply worker.
242 * Is there a message sent by a parallel apply worker that the leader apply
243 * worker needs to receive?
248 * Cache the parallel apply worker information required for applying the
249 * current streaming transaction. It is used to save the cost of searching the
250 * hash table when applying the changes between STREAM_START and STREAM_STOP.
254/* A list to maintain subtransactions, if any. */
262 * Returns true if it is OK to start a parallel apply worker, false otherwise.
267 /* Only leader apply workers can start parallel apply workers. */
272 * It is good to check for any change in the subscription parameter to
273 * avoid the case where for a very long time the change doesn't get
274 * reflected. This can happen when there is a constant flow of streaming
275 * transactions that are handled by parallel apply workers.
277 * It is better to do it before the below checks so that the latest values
278 * of subscription can be used for the checks.
283 * Don't start a new parallel apply worker if the subscription is not
284 * using parallel streaming mode, or if the publisher does not support
291 * Don't start a new parallel worker if user has set skiplsn as it's
292 * possible that they want to skip the streaming transaction. For
293 * streaming transactions, we need to serialize the transaction to a file
294 * so that we can get the last LSN of the transaction to judge whether to
295 * skip before starting to apply the change.
297 * One might think that we could allow parallelism if the first lsn of the
298 * transaction is greater than skiplsn, but we don't send it with the
299 * STREAM START message, and it doesn't seem worth sending the extra eight
300 * bytes with the STREAM START to enable parallelism for this case.
306 * For streaming transactions that are being applied using a parallel
307 * apply worker, we cannot decide whether to apply the change for a
308 * relation that is not in the READY state (see
309 * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310 * time. So, we don't start the new parallel apply worker in this case.
319 * Set up a dynamic shared memory segment.
321 * We set up a control region that contains a fixed-size worker info
322 * (ParallelApplyWorkerShared), a message queue, and an error queue.
324 * Returns true on success, false on failure.
339 * Estimate how much shared memory we need.
341 * Because the TOC machinery may choose to insert padding of oddly-sized
342 * requests, we must estimate each chunk separately.
344 * We need one key to register the location of the header, and two other
345 * keys to track the locations of the message queue and the error message
356 /* Create the shared memory segment and establish a table of contents. */
364 /* Set up the header region. */
375 /* Set up message queue for the worker. */
380 /* Attach the queue. */
383 /* Set up error queue for the worker. */
389 /* Attach the queue. */
392 /* Return results to caller. */
400 * Try to get a parallel apply worker from the pool. If none is available then
411 /* Try to get an available parallel apply worker from the worker pool. */
421 * Start a new parallel apply worker.
423 * The worker info can be used for the lifetime of the worker process, so
424 * create it in a permanent context.
430 /* Setup shared memory. */
463 * Allocate a parallel apply worker that will be used for the specified xid.
465 * We first try to get an available worker from the pool, if any and then try
466 * to launch a new worker. On successful allocation, remember the worker
467 * information in the hash table so that we can get it later for processing the
484 /* First time through, initialize parallel apply worker state hashtable. */
499 /* Create an entry for the requested transaction. */
504 /* Update the transaction information in shared memory. */
512 entry->
winfo = winfo;
516 * Find the assigned worker for the given transaction, if any.
530 /* Return the cached parallel apply worker if valid. */
534 /* Find an entry for the requested transaction. */
538 /* The worker must not have exited. */
547 * Makes the worker available for reuse.
549 * This removes the parallel apply worker entry from the hash table so that it
550 * can't be used. If there are enough workers in the pool, it stops the worker
551 * and frees the corresponding info. Otherwise it just marks the worker as
552 * available for reuse.
554 * For more information about the worker pool, see comments atop this file.
567 * Stop the worker if there are enough workers in the pool.
569 * XXX Additionally, we also stop the worker if the leader apply worker
570 * serialize part of the transaction data due to a send timeout. This is
571 * because the message could be partially written to the queue and there
572 * is no way to clean the queue other than resending the message until it
573 * succeeds. Instead of trying to send the data which anyway would have
574 * been serialized and then letting the parallel apply worker deal with
575 * the spurious message, we stop the worker.
592 * Free the parallel apply worker information and unlink the files with
593 * serialized changes if any.
606 /* Unlink the files with serialized changes. */
613 /* Remove from the worker pool. */
620 * Detach the error queue for all parallel apply workers.
640 * Check if there are any pending spooled messages.
653 * Replay the spooled messages once the leader apply worker has finished
654 * serializing changes to the file.
656 * Returns false if there aren't any pending spooled messages, true otherwise.
669 * If the leader apply worker is busy serializing the partial changes then
670 * acquire the stream lock now and wait for the leader worker to finish
671 * serializing the changes. Otherwise, the parallel apply worker won't get
672 * a chance to receive a STREAM_STOP (and acquire the stream lock) until
673 * the leader had serialized all changes which can lead to undetected
676 * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
677 * worker has finished serializing the changes.
688 * We cannot read the file immediately after the leader has serialized all
689 * changes to the file because there may still be messages in the memory
690 * queue. We will apply all spooled messages the next time we call this
691 * function and that will ensure there are no messages left in the memory
710 * Interrupt handler for main loop of parallel apply worker.
720 (
errmsg(
"logical replication parallel apply worker for subscription \"%s\" has finished",
733/* Parallel apply worker main loop. */
742 * Init the ApplyMessageContext which we clean up after each replication
746 "ApplyMessageContext",
750 * Push apply error context callback. Fields will be filled while applying
764 /* Ensure we are reading the data into our memory context. */
780 * The first byte of messages sent from leader apply worker to
781 * parallel apply workers can only be PqReplMsg_WALData.
788 * Ignore statistics fields that have been updated by the leader
791 * XXX We can avoid sending the statistics fields from the leader
792 * apply worker but for that, it needs to rebuild the entire
793 * message by removing these fields which could be more work than
794 * simply ignoring these fields in the parallel apply worker.
802 /* Replay the changes from the file, if any. */
807 /* Wait for more work. */
811 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
822 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 errmsg(
"lost connection to the logical replication apply worker")));
830 /* Pop the error context stack. */
837 * Make sure the leader apply worker tries to read from our error queue one more
838 * time. This guards against the case where we exit uncleanly without sending
839 * an ErrorResponse, for example because some code calls proc_exit directly.
841 * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
842 * if any. See ParallelWorkerShutdown for details.
855 * Parallel apply worker entry point.
874 * Setup signal handling.
876 * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
877 * initiated by the leader apply worker. This helps to differentiate it
878 * from the case where we abort the current transaction and exit on
887 * Attach to the dynamic shared memory segment for the parallel apply, and
888 * find its table of contents.
890 * Like parallel query, we don't need resource owner by this time. See
891 * ParallelWorkerMain.
897 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
898 errmsg(
"could not map dynamic shared memory segment")));
903 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 errmsg(
"invalid magic number in dynamic shared memory segment")));
906 /* Look up the shared information. */
911 * Attach to the message queue.
918 * Primary initialization is complete. Now, we can attach to our slot.
919 * This is to ensure that the leader apply worker does not write data to
920 * the uninitialized memory queue.
925 * Register the shutdown callback after we are attached to the worker
926 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
927 * callback is invoked.
937 * Attach to the error queue.
954 /* Setup replication origin tracking. */
957 originname,
sizeof(originname));
961 * The parallel apply worker doesn't need to monopolize this replication
962 * origin which was already acquired by its leader process.
969 * Setup callback for syscache so that we know when something changes in
970 * the subscription relation state.
981 * The parallel apply worker must not get here because the parallel apply
982 * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
983 * leader, or SIGINT from itself, or when there is an error. None of these
984 * cases will allow the code to reach here.
990 * Handle receipt of an interrupt indicating a parallel apply worker message.
992 * Note: this is called within a signal handler! All we can do is set a flag
993 * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
994 * ProcessParallelApplyMessages().
1005 * Process a single protocol message received from a single parallel apply
1021 /* Parse ErrorResponse. */
1025 * If desired, add a context line to show that this is a
1026 * message propagated from a parallel apply worker. Otherwise,
1027 * it can sometimes be confusing to understand what actually
1032 _(
"logical replication parallel apply worker"));
1034 edata.
context =
pstrdup(
_(
"logical replication parallel apply worker"));
1037 * Context beyond that should use the error context callbacks
1038 * that were in effect in LogicalRepApplyLoop().
1043 * The actual error must have been reported by the parallel
1047 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1048 errmsg(
"logical replication parallel apply worker exited due to error"),
1053 * Don't need to do anything about NoticeResponse and
1054 * NotificationResponse as the logical replication worker doesn't
1055 * need to send messages to the client.
1062 elog(
ERROR,
"unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1068 * Handle any queued protocol messages received from parallel apply workers.
1079 * This is invoked from ProcessInterrupts(), and since some of the
1080 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1081 * for recursive calls if more signals are received while this runs. It's
1082 * unclear that recursive entry would be safe, and it doesn't seem useful
1083 * even if it is safe, so let's block interrupts until done.
1088 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1089 * don't want to risk leaking data into long-lived contexts, so let's do
1090 * our work here in a private context that we can reset on each use.
1092 if (!hpam_context)
/* first time through? */
1094 "ProcessParallelApplyMessages",
1111 * The leader will detach from the error queue and set it to NULL
1112 * before preparing to stop all parallel apply workers, so we don't
1113 * need to handle error messages anymore. See
1114 * logicalrep_worker_detach.
1134 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1135 errmsg(
"lost connection to the logical replication parallel apply worker")));
1140 /* Might as well clear the context on our way out */
1147 * Send the data to the specified parallel apply worker via shared-memory
1150 * Returns false if the attempt to send data via shared memory times out, true
1164 * We don't try to send data to parallel worker for 'immediate' mode. This
1165 * is primarily used for testing purposes.
1171 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1172 * to send the message unless the parallel apply worker is waiting on some
1173 * lock or there is a serious resource crunch. See the comments atop this file
1174 * to know why we are using a non-blocking way to send the message.
1176#define SHM_SEND_RETRY_INTERVAL_MS 1000
1177#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1187 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1188 errmsg(
"could not send data to shared-memory queue")));
1192 /* Wait before retrying. */
1196 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1213 * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1214 * that the current data and any subsequent data for this transaction will be
1215 * serialized to a file. This is done to prevent possible deadlocks with
1216 * another parallel apply worker (refer to the comments atop this file).
1223 (
errmsg(
"logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1227 * The parallel apply worker could be stuck for some reason (say waiting
1228 * on some lock by other backend), so stop trying to send data directly to
1229 * it and start serializing data to the file instead.
1233 /* Initialize the stream fileset. */
1237 * Acquires the stream lock if not already to make sure that the parallel
1238 * apply worker will wait for the leader to release the stream lock until
1239 * the end of the transaction.
1248 * Wait until the parallel apply worker's transaction state has reached or
1249 * exceeded the given xact_state.
1258 * Stop if the transaction state has reached or exceeded the given
1264 /* Wait to be signalled. */
1268 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1270 /* Reset the latch so we don't spin. */
1273 /* An interrupt may have occurred while we were waiting. */
1279 * Wait until the parallel apply worker's transaction finishes.
1285 * Wait until the parallel apply worker set the state to
1286 * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1287 * lock. This is to prevent leader apply worker from acquiring the
1288 * transaction lock earlier than the parallel apply worker.
1293 * Wait for the transaction lock to be released. This is required to
1294 * detect deadlock among leader and parallel apply workers. Refer to the
1295 * comments atop this file.
1301 * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1302 * apply worker failed while applying changes causing the lock to be
1307 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 errmsg(
"lost connection to the logical replication parallel apply worker")));
1312 * Set the transaction state for a given parallel apply worker.
1324 * Get the transaction state for a given parallel apply worker.
1339 * Cache the parallel apply worker information.
1348 * Form a unique savepoint name for the streaming transaction.
1350 * Note that different subscriptions for publications on different nodes can
1351 * receive same remote xid, so we need to use subscription id along with it.
1353 * Returns the name in the supplied buffer.
1358 snprintf(spname, szsp,
"pg_sp_%u_%u", suboid, xid);
1362 * Define a savepoint for a subxact in parallel apply worker if needed.
1364 * The parallel apply worker can figure out if a new subtransaction was
1365 * started by checking if the new change arrived with a different xid. In that
1366 * case define a named savepoint, so that we are able to rollback to it
1372 if (current_xid != top_xid &&
1379 spname,
sizeof(spname));
1381 elog(
DEBUG1,
"defining savepoint %s in logical replication parallel apply worker", spname);
1383 /* We must be in transaction block to define the SAVEPOINT. */
1396 * CommitTransactionCommand is needed to start a subtransaction after
1397 * issuing a SAVEPOINT inside a transaction block (see
1398 * StartSubTransaction()).
1408/* Reset the list that maintains subtransactions. */
1413 * We don't need to free this explicitly as the allocated memory will be
1414 * freed at the transaction end.
1420 * Handle STREAM ABORT message when the transaction was applied in a parallel
1430 * Update origin state so we can restart streaming from correct position
1437 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1438 * just free the subxactlist.
1445 * Release the lock as we might be processing an empty streaming
1446 * transaction in which case the lock won't be released during
1447 * transaction rollback.
1449 * Note that it's ok to release the transaction lock before aborting
1450 * the transaction because even if the parallel apply worker dies due
1451 * to crash or some other reason, such a transaction would still be
1452 * considered aborted.
1470 /* OK, so it's a subxact. Rollback to the savepoint. */
1476 elog(
DEBUG1,
"rolling back to savepoint %s in logical replication parallel apply worker", spname);
1479 * Search the subxactlist, determine the offset tracked for the
1480 * subxact, and truncate the list.
1482 * Note that for an empty sub-transaction we won't find the subxid
1489 if (xid_tmp == subxid)
1501 * Set the fileset state for a particular parallel apply worker. The fileset
1502 * will be set once the leader worker serialized all changes to the file
1503 * so that it can be used by parallel apply worker.
1523 * Get the fileset state for the current parallel apply worker.
1536 return fileset_state;
1540 * Helper functions to acquire and release a lock for each stream block.
1542 * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1545 * Refer to the comments atop this file to see how the stream lock is used.
1562 * Helper functions to acquire and release a lock for each local transaction
1565 * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1568 * Note that all the callers must pass a remote transaction ID instead of a
1569 * local transaction ID as xid. This is because the local transaction ID will
1570 * only be assigned while applying the first change in the parallel apply but
1571 * it's possible that the first change in the parallel apply worker is blocked
1572 * by a concurrently executing transaction in another parallel apply worker. We
1573 * can only communicate the local transaction id to the leader after applying
1574 * the first change so it won't be able to wait after sending the xact finish
1575 * command using this lock.
1577 * Refer to the comments atop this file to see how the transaction lock is
1595 * Decrement the number of pending streaming blocks and wait on the stream lock
1596 * if there is no pending block available.
1604 * It is only possible to not have any pending stream chunks when we are
1605 * applying spooled messages.
1612 elog(
ERROR,
"invalid pending streaming chunk 0");
1623 * Finish processing the streaming transaction in the leader apply worker.
1631 * Unlock the shared object lock so that parallel apply worker can
1632 * continue to receive and apply changes.
1637 * Wait for that worker to finish. This is necessary to maintain commit
1638 * order which avoids failures due to transaction dependencies and
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static ParallelApplyWorkerInfo * stream_apply_worker
static List * ParallelApplyWorkerPool
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
#define DSM_ERROR_QUEUE_SIZE
volatile sig_atomic_t ParallelApplyMessagePending
static bool pa_can_start(void)
void HandleParallelApplyMessageInterrupt(void)
void ProcessParallelApplyMessages(void)
#define SHM_SEND_TIMEOUT_MS
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
static void ProcessParallelApplyInterrupts(void)
static void ProcessParallelApplyMessage(StringInfo msg)
static PartialFileSetState pa_get_fileset_state(void)
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
#define PARALLEL_APPLY_LOCK_XACT
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static List * subxactlist
static bool pa_has_spooled_message_pending()
static void pa_shutdown(int code, Datum arg)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
#define PARALLEL_APPLY_KEY_SHARED
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
ParallelApplyWorkerShared * MyParallelShared
void pa_detach_all_error_mq(void)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
#define PARALLEL_APPLY_KEY_MQ
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
#define SIZE_STATS_MESSAGE
#define SHM_SEND_RETRY_INTERVAL_MS
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
static bool pa_process_spooled_messages_if_required(void)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
static HTAB * ParallelApplyTxnHash
#define PARALLEL_APPLY_LOCK_STREAM
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
void ParallelApplyWorkerMain(Datum main_arg)
#define PG_LOGICAL_APPLY_SHM_MAGIC
void pa_decr_and_wait_stream_block(void)
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
void stream_cleanup_files(Oid subid, TransactionId xid)
MemoryContext ApplyMessageContext
bool InitializingApplyWorker
void apply_dispatch(StringInfo s)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
ErrorContextCallback * apply_error_context_stack
void stream_start_internal(TransactionId xid, bool first_segment)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
void pgstat_report_activity(BackendState state, const char *cmd_str)
void BackgroundWorkerUnblockSignals(void)
#define MemSet(start, val, len)
dsm_handle dsm_segment_handle(dsm_segment *seg)
void dsm_detach(dsm_segment *seg)
void * dsm_segment_address(dsm_segment *seg)
dsm_segment * dsm_create(Size size, int flags)
dsm_segment * dsm_attach(dsm_handle h)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
ErrorContextCallback * error_context_stack
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
volatile sig_atomic_t InterruptPending
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
void logicalrep_worker_attach(int slot)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
LogicalRepWorker * MyLogicalRepWorker
int max_parallel_apply_workers_per_subscription
List * list_delete_ptr(List *list, void *datum)
List * lappend(List *list, void *datum)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
List * list_truncate(List *list, int new_size)
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
#define AccessExclusiveLock
void MemoryContextReset(MemoryContext context)
MemoryContext TopTransactionContext
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_session_origin
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_origin_lsn
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static ListCell * list_nth_cell(const List *list, int n)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static int32 DatumGetInt32(Datum X)
BackgroundWorker * MyBgworkerEntry
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_APPLY_MESSAGE
#define PqReplMsg_WALData
#define PqMsg_NotificationResponse
#define PqMsg_ErrorResponse
#define PqMsg_NoticeResponse
char * psprintf(const char *fmt,...)
int debug_logical_replication_streaming
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
shm_mq * shm_mq_create(void *address, Size size)
void shm_mq_detach(shm_mq_handle *mqh)
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Size shm_toc_estimate(shm_toc_estimator *e)
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
shm_toc * shm_toc_attach(uint64 magic, void *address)
#define shm_toc_estimate_chunk(e, sz)
#define shm_toc_initialize_estimator(e)
#define shm_toc_estimate_keys(e, cnt)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
void initStringInfo(StringInfo str)
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
char bgw_extra[BGW_EXTRALEN]
struct ErrorContextCallback * previous
void(* callback)(void *arg)
TimestampTz last_recv_time
TimestampTz last_send_time
ParallelApplyWorkerInfo * winfo
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
int logicalrep_worker_slot_no
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
uint16 logicalrep_worker_generation
ParallelTransState xact_state
XLogRecPtr last_commit_end
bool AllTablesyncsReady(void)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
#define TransactionIdIsValid(xid)
#define WL_EXIT_ON_PM_DEATH
@ PARALLEL_TRANS_FINISHED
static bool am_parallel_apply_worker(void)
@ WORKERTYPE_PARALLEL_APPLY
@ FS_SERIALIZE_IN_PROGRESS
static bool am_leader_apply_worker(void)
void DefineSavepoint(const char *name)
bool IsTransactionState(void)
void StartTransactionCommand(void)
bool IsTransactionBlock(void)
void BeginTransactionBlock(void)
void CommitTransactionCommand(void)
void RollbackToSavepoint(const char *name)
bool EndTransactionBlock(bool chain)
void AbortCurrentTransaction(void)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr