1/*-------------------------------------------------------------------------
4 * Internal headers shared by logical replication workers.
6 * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
8 * src/include/replication/worker_internal.h
10 *-------------------------------------------------------------------------
12#ifndef WORKER_INTERNAL_H
13#define WORKER_INTERNAL_H
28/* Different types of worker */
39 /* What type of worker is this? */
42 /* Time at which this worker was launched. */
45 /* Indicates if this slot is used or free. */
48 /* Increased every time the slot is taken by new worker. */
51 /* Pointer to proc array. NULL if not running. */
54 /* Database id to connect to. */
57 /* User to use for connection (will be same as owner of subscription). */
60 /* Subscription id for the worker. */
63 /* Used for initial table synchronization. */
70 * Used to create the changes and subxact files for the streaming
71 * transactions. Upon the arrival of the first streaming transaction or
72 * when the first-time leader apply worker times out while sending changes
73 * to the parallel apply worker, the fileset will be initialized, and it
74 * will be deleted when the worker exits. Under this, separate buffiles
75 * would be created for each transaction which will be deleted after the
76 * transaction is finished.
81 * PID of leader apply worker if this slot is used for a parallel apply
82 * worker, InvalidPid otherwise.
86 /* Indicates whether apply can be performed in parallel. */
90 * Changes made by this transaction and subsequent ones must be preserved.
91 * This ensures that update_deleted conflicts can be accurately detected
92 * during the apply phase of logical replication by this worker.
94 * The logical replication launcher manages an internal replication slot
95 * named "pg_conflict_detection". It asynchronously collects this ID to
96 * decide when to advance the xmin value of the slot.
98 * This ID is set to InvalidTransactionId when the apply worker stops
99 * retaining information needed for conflict detection.
112 * State of the transaction in parallel apply worker.
114 * The enum values must have the same order as the transaction state
125 * State of fileset used to communicate changes from leader to parallel
128 * FS_EMPTY indicates an initial state where the leader doesn't need to use
129 * the file to communicate with the parallel apply worker.
131 * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
134 * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
137 * FS_READY indicates that it is now ok for a parallel apply worker to
149 * Struct for sharing information between leader apply worker and parallel
159 * State used to ensure commit ordering.
161 * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
162 * handling the transaction finish commands while the apply leader will
163 * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
164 * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
169 /* Information from the corresponding LogicalRepWorker slot. */
174 * Indicates whether there are pending streaming blocks in the queue. The
175 * parallel apply worker will check it before starting to wait.
180 * XactLastCommitEnd from the parallel apply worker. This is required by
181 * the leader worker so it can update the lsn_mappings.
186 * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
187 * serialize changes to the file, and share the fileset with the parallel
188 * apply worker when processing the transaction finish command. Then the
189 * parallel apply worker will apply all the spooled messages.
191 * FileSet is used here instead of SharedFileSet because we need it to
192 * survive after releasing the shared memory so that the leader apply
193 * worker can re-use the same fileset for the next streaming transaction.
200 * Information which is used to manage the parallel apply worker.
205 * This queue is used to send changes from the leader apply worker to the
206 * parallel apply worker.
211 * This queue is used to transfer error messages from the parallel apply
212 * worker to the leader apply worker.
219 * Indicates whether the leader apply worker needs to serialize the
220 * remaining changes to a file due to timeout when attempting to send data
221 * to the parallel apply worker via shared memory.
226 * True if the worker is being used to process a parallel apply
227 * transaction. False indicates this worker is available for re-use.
234/* Main memory context for apply worker. Permanent during worker lifetime. */
243/* libpqreceiver connection */
246/* Worker and subscription objects. */
263 bool retain_dead_tuples);
272 char *originname,
Size szoriginname);
285/* Common streaming function to apply all the spooled messages */
309/* Function for apply error callback */
313/* Parallel apply worker setup and interactions */
345 #define isParallelApplyWorker(worker) ((worker)->in_use && \
346 (worker)->type == WORKERTYPE_PARALLEL_APPLY)
347 #define isTablesyncWorker(worker) ((worker)->in_use && \
348 (worker)->type == WORKERTYPE_TABLESYNC)
370#endif /* WORKER_INTERNAL_H */
Assert(PointerIsAligned(start, uint64))
TimestampTz last_recv_time
LogicalRepWorkerType type
TransactionId oldest_nonremovable_xid
TimestampTz last_send_time
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
int logicalrep_worker_slot_no
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
uint16 logicalrep_worker_generation
ParallelTransState xact_state
XLogRecPtr last_commit_end
bool AllTablesyncsReady(void)
@ PARALLEL_TRANS_FINISHED
#define isParallelApplyWorker(worker)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void stream_cleanup_files(Oid subid, TransactionId xid)
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
PGDLLIMPORT ErrorContextCallback * apply_error_context_stack
PGDLLIMPORT bool in_remote_transaction
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
PGDLLIMPORT MemoryContext ApplyMessageContext
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
static bool am_parallel_apply_worker(void)
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
struct ParallelApplyWorkerShared ParallelApplyWorkerShared
void logicalrep_worker_attach(int slot)
void stream_stop_internal(TransactionId xid)
void start_apply(XLogRecPtr origin_startpos)
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void apply_dispatch(StringInfo s)
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
void DisableSubscriptionAndExit(void)
void process_syncing_tables(XLogRecPtr current_lsn)
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void pa_detach_all_error_mq(void)
PGDLLIMPORT struct WalReceiverConn * LogRepWorkerWalRcvConn
PGDLLIMPORT bool InitializingApplyWorker
void stream_start_internal(TransactionId xid, bool first_segment)
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
void logicalrep_worker_stop(Oid subid, Oid relid)
void set_apply_error_context_origin(char *originname)
@ WORKERTYPE_PARALLEL_APPLY
struct LogicalRepWorker LogicalRepWorker
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
@ FS_SERIALIZE_IN_PROGRESS
#define isTablesyncWorker(worker)
void SetupApplyOrSyncWorker(int worker_slot)
bool HasSubscriptionRelationsCached(void)
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
PGDLLIMPORT Subscription * MySubscription
void apply_error_callback(void *arg)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
int logicalrep_sync_worker_count(Oid subid)
void maybe_reread_subscription(void)
void InitializeLogRepWorker(void)
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
PGDLLIMPORT ParallelApplyWorkerShared * MyParallelShared
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
void UpdateTwoPhaseState(Oid suboid, char new_state)
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
PGDLLIMPORT MemoryContext ApplyContext