PostgreSQL Source Code: src/include/replication/worker_internal.h Source File

PostgreSQL Source Code git master
worker_internal.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * worker_internal.h
4 * Internal headers shared by logical replication workers.
5 *
6 * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
7 *
8 * src/include/replication/worker_internal.h
9 *
10 *-------------------------------------------------------------------------
11 */
12#ifndef WORKER_INTERNAL_H
13#define WORKER_INTERNAL_H
14
15#include "access/xlogdefs.h"
16#include "catalog/pg_subscription.h"
17#include "datatype/timestamp.h"
18#include "miscadmin.h"
19#include "replication/logicalrelation.h"
20#include "replication/walreceiver.h"
21#include "storage/buffile.h"
22#include "storage/fileset.h"
23#include "storage/lock.h"
24#include "storage/shm_mq.h"
25#include "storage/shm_toc.h"
26#include "storage/spin.h"
27
28/* Different types of worker */
29 typedef enum LogicalRepWorkerType
30{
31 WORKERTYPE_UNKNOWN = 0,
32 WORKERTYPE_TABLESYNC,
33 WORKERTYPE_APPLY,
34 WORKERTYPE_PARALLEL_APPLY,
35 } LogicalRepWorkerType;
36
37 typedef struct LogicalRepWorker
38{
39 /* What type of worker is this? */
40 LogicalRepWorkerType type;
41
42 /* Time at which this worker was launched. */
43 TimestampTz launch_time;
44
45 /* Indicates if this slot is used or free. */
46 bool in_use;
47
48 /* Increased every time the slot is taken by new worker. */
49 uint16 generation;
50
51 /* Pointer to proc array. NULL if not running. */
52 PGPROC *proc;
53
54 /* Database id to connect to. */
55 Oid dbid;
56
57 /* User to use for connection (will be same as owner of subscription). */
58 Oid userid;
59
60 /* Subscription id for the worker. */
61 Oid subid;
62
63 /* Used for initial table synchronization. */
64 Oid relid;
65 char relstate;
66 XLogRecPtr relstate_lsn;
67 slock_t relmutex;
68
69 /*
70 * Used to create the changes and subxact files for the streaming
71 * transactions. Upon the arrival of the first streaming transaction or
72 * when the first-time leader apply worker times out while sending changes
73 * to the parallel apply worker, the fileset will be initialized, and it
74 * will be deleted when the worker exits. Under this, separate buffiles
75 * would be created for each transaction which will be deleted after the
76 * transaction is finished.
77 */
78 FileSet *stream_fileset;
79
80 /*
81 * PID of leader apply worker if this slot is used for a parallel apply
82 * worker, InvalidPid otherwise.
83 */
84 pid_t leader_pid;
85
86 /* Indicates whether apply can be performed in parallel. */
87 bool parallel_apply;
88
89 /*
90 * Changes made by this transaction and subsequent ones must be preserved.
91 * This ensures that update_deleted conflicts can be accurately detected
92 * during the apply phase of logical replication by this worker.
93 *
94 * The logical replication launcher manages an internal replication slot
95 * named "pg_conflict_detection". It asynchronously collects this ID to
96 * decide when to advance the xmin value of the slot.
97 *
98 * This ID is set to InvalidTransactionId when the apply worker stops
99 * retaining information needed for conflict detection.
100 */
101 TransactionId oldest_nonremovable_xid;
102
103 /* Stats. */
104 XLogRecPtr last_lsn;
105 TimestampTz last_send_time;
106 TimestampTz last_recv_time;
107 XLogRecPtr reply_lsn;
108 TimestampTz reply_time;
109 } LogicalRepWorker;
110
111/*
112 * State of the transaction in parallel apply worker.
113 *
114 * The enum values must have the same order as the transaction state
115 * transitions.
116 */
117 typedef enum ParallelTransState
118{
119 PARALLEL_TRANS_UNKNOWN,
120 PARALLEL_TRANS_STARTED,
121 PARALLEL_TRANS_FINISHED,
122 } ParallelTransState;
123
124/*
125 * State of fileset used to communicate changes from leader to parallel
126 * apply worker.
127 *
128 * FS_EMPTY indicates an initial state where the leader doesn't need to use
129 * the file to communicate with the parallel apply worker.
130 *
131 * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
132 * to the file.
133 *
134 * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
135 * the file.
136 *
137 * FS_READY indicates that it is now ok for a parallel apply worker to
138 * read the file.
139 */
140 typedef enum PartialFileSetState
141{
142 FS_EMPTY,
143 FS_SERIALIZE_IN_PROGRESS,
144 FS_SERIALIZE_DONE,
145 FS_READY,
146 } PartialFileSetState;
147
148/*
149 * Struct for sharing information between leader apply worker and parallel
150 * apply workers.
151 */
152 typedef struct ParallelApplyWorkerShared
153{
154 slock_t mutex;
155
156 TransactionId xid;
157
158 /*
159 * State used to ensure commit ordering.
160 *
161 * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
162 * handling the transaction finish commands while the apply leader will
163 * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
164 * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
165 * STREAM_ABORT).
166 */
167 ParallelTransState xact_state;
168
169 /* Information from the corresponding LogicalRepWorker slot. */
170 uint16 logicalrep_worker_generation;
171 int logicalrep_worker_slot_no;
172
173 /*
174 * Indicates whether there are pending streaming blocks in the queue. The
175 * parallel apply worker will check it before starting to wait.
176 */
177 pg_atomic_uint32 pending_stream_count;
178
179 /*
180 * XactLastCommitEnd from the parallel apply worker. This is required by
181 * the leader worker so it can update the lsn_mappings.
182 */
183 XLogRecPtr last_commit_end;
184
185 /*
186 * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
187 * serialize changes to the file, and share the fileset with the parallel
188 * apply worker when processing the transaction finish command. Then the
189 * parallel apply worker will apply all the spooled messages.
190 *
191 * FileSet is used here instead of SharedFileSet because we need it to
192 * survive after releasing the shared memory so that the leader apply
193 * worker can re-use the same fileset for the next streaming transaction.
194 */
195 PartialFileSetState fileset_state;
196 FileSet fileset;
197 } ParallelApplyWorkerShared;
198
199/*
200 * Information which is used to manage the parallel apply worker.
201 */
202 typedef struct ParallelApplyWorkerInfo
203{
204 /*
205 * This queue is used to send changes from the leader apply worker to the
206 * parallel apply worker.
207 */
208 shm_mq_handle *mq_handle;
209
210 /*
211 * This queue is used to transfer error messages from the parallel apply
212 * worker to the leader apply worker.
213 */
214 shm_mq_handle *error_mq_handle;
215
216 dsm_segment *dsm_seg;
217
218 /*
219 * Indicates whether the leader apply worker needs to serialize the
220 * remaining changes to a file due to timeout when attempting to send data
221 * to the parallel apply worker via shared memory.
222 */
223 bool serialize_changes;
224
225 /*
226 * True if the worker is being used to process a parallel apply
227 * transaction. False indicates this worker is available for re-use.
228 */
229 bool in_use;
230
231 ParallelApplyWorkerShared *shared;
232 } ParallelApplyWorkerInfo;
233
234/* Main memory context for apply worker. Permanent during worker lifetime. */
235extern PGDLLIMPORT MemoryContext ApplyContext;
236
237extern PGDLLIMPORT MemoryContext ApplyMessageContext;
238
239extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
240
241extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
242
243/* libpqreceiver connection */
244extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
245
246/* Worker and subscription objects. */
247extern PGDLLIMPORT Subscription *MySubscription;
248extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
249
250extern PGDLLIMPORT bool in_remote_transaction;
251
252extern PGDLLIMPORT bool InitializingApplyWorker;
253
254extern void logicalrep_worker_attach(int slot);
255extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
256 bool only_running);
257extern List *logicalrep_workers_find(Oid subid, bool only_running,
258 bool acquire_lock);
259extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
260 Oid dbid, Oid subid, const char *subname,
261 Oid userid, Oid relid,
262 dsm_handle subworker_dsm,
263 bool retain_dead_tuples);
264extern void logicalrep_worker_stop(Oid subid, Oid relid);
265extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
266extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
267extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
268
269extern int logicalrep_sync_worker_count(Oid subid);
270
271extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
272 char *originname, Size szoriginname);
273
274extern bool AllTablesyncsReady(void);
275extern bool HasSubscriptionRelationsCached(void);
276extern void UpdateTwoPhaseState(Oid suboid, char new_state);
277
278extern void process_syncing_tables(XLogRecPtr current_lsn);
279extern void invalidate_syncing_table_states(Datum arg, int cacheid,
280 uint32 hashvalue);
281
282extern void stream_start_internal(TransactionId xid, bool first_segment);
283extern void stream_stop_internal(TransactionId xid);
284
285/* Common streaming function to apply all the spooled messages */
286extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
287 XLogRecPtr lsn);
288
289extern void apply_dispatch(StringInfo s);
290
291extern void maybe_reread_subscription(void);
292
293extern void stream_cleanup_files(Oid subid, TransactionId xid);
294
295extern void set_stream_options(WalRcvStreamOptions *options,
296 char *slotname,
297 XLogRecPtr *origin_startpos);
298
299extern void start_apply(XLogRecPtr origin_startpos);
300
301extern void InitializeLogRepWorker(void);
302
303extern void SetupApplyOrSyncWorker(int worker_slot);
304
305extern void DisableSubscriptionAndExit(void);
306
307extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
308
309/* Function for apply error callback */
310extern void apply_error_callback(void *arg);
311extern void set_apply_error_context_origin(char *originname);
312
313/* Parallel apply worker setup and interactions */
314extern void pa_allocate_worker(TransactionId xid);
315extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
316extern void pa_detach_all_error_mq(void);
317
318extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
319 const void *data);
320extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
321 bool stream_locked);
322
323extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
324 ParallelTransState xact_state);
325extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
326
327extern void pa_start_subtrans(TransactionId current_xid,
328 TransactionId top_xid);
329extern void pa_reset_subtrans(void);
330extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
331extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
332 PartialFileSetState fileset_state);
333
334extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
335extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
336
337extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
338extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
339
340extern void pa_decr_and_wait_stream_block(void);
341
342extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
343 XLogRecPtr remote_lsn);
344
345 #define isParallelApplyWorker(worker) ((worker)->in_use && \
346 (worker)->type == WORKERTYPE_PARALLEL_APPLY)
347 #define isTablesyncWorker(worker) ((worker)->in_use && \
348 (worker)->type == WORKERTYPE_TABLESYNC)
349
350static inline bool
351 am_tablesync_worker(void)
352{
353 return isTablesyncWorker(MyLogicalRepWorker);
354}
355
356static inline bool
357 am_leader_apply_worker(void)
358{
359 Assert(MyLogicalRepWorker->in_use);
360 return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
361}
362
363static inline bool
364 am_parallel_apply_worker(void)
365{
366 Assert(MyLogicalRepWorker->in_use);
367 return isParallelApplyWorker(MyLogicalRepWorker);
368}
369
370#endif /* WORKER_INTERNAL_H */
#define PGDLLIMPORT
Definition: c.h:1319
uint16_t uint16
Definition: c.h:537
uint32_t uint32
Definition: c.h:538
uint32 TransactionId
Definition: c.h:657
size_t Size
Definition: c.h:610
int64 TimestampTz
Definition: timestamp.h:39
uint32 dsm_handle
Definition: dsm_impl.h:55
Assert(PointerIsAligned(start, uint64))
int LOCKMODE
Definition: lockdefs.h:26
void * arg
const void * data
NameData subname
uint64_t Datum
Definition: postgres.h:70
unsigned int Oid
Definition: postgres_ext.h:32
Definition: fileset.h:23
Definition: pg_list.h:54
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
TransactionId oldest_nonremovable_xid
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
Definition: proc.h:179
shm_mq_handle * error_mq_handle
shm_mq_handle * mq_handle
ParallelApplyWorkerShared * shared
pg_atomic_uint32 pending_stream_count
PartialFileSetState fileset_state
ParallelTransState xact_state
Definition: dsm.c:67
Definition: oid2name.c:30
bool AllTablesyncsReady(void)
Definition: tablesync.c:1770
ParallelTransState
@ PARALLEL_TRANS_UNKNOWN
@ PARALLEL_TRANS_STARTED
@ PARALLEL_TRANS_FINISHED
#define isParallelApplyWorker(worker)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:5483
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:286
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:5350
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
PGDLLIMPORT ErrorContextCallback * apply_error_context_stack
Definition: worker.c:469
PGDLLIMPORT bool in_remote_transaction
Definition: worker.c:484
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:720
PGDLLIMPORT MemoryContext ApplyMessageContext
Definition: worker.c:471
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:317
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
static bool am_parallel_apply_worker(void)
PGDLLIMPORT LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
struct ParallelApplyWorkerShared ParallelApplyWorkerShared
void logicalrep_worker_attach(int slot)
Definition: launcher.c:731
void stream_stop_internal(TransactionId xid)
Definition: worker.c:1840
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:5552
void pa_stream_abort(LogicalRepStreamAbortData *abort_data)
struct ParallelApplyWorkerInfo ParallelApplyWorkerInfo
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
void apply_dispatch(StringInfo s)
Definition: worker.c:3747
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void pa_reset_subtrans(void)
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition: launcher.c:657
void DisableSubscriptionAndExit(void)
Definition: worker.c:5905
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:696
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:254
void pa_detach_all_error_mq(void)
PGDLLIMPORT struct WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
PGDLLIMPORT bool InitializingApplyWorker
Definition: worker.c:499
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1666
void pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, bool stream_locked)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:700
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:633
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6260
LogicalRepWorkerType
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
struct LogicalRepWorker LogicalRepWorker
void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
PartialFileSetState
@ FS_EMPTY
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
#define isTablesyncWorker(worker)
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5831
bool HasSubscriptionRelationsCached(void)
Definition: tablesync.c:1800
static bool am_tablesync_worker(void)
static bool am_leader_apply_worker(void)
PGDLLIMPORT Subscription * MySubscription
Definition: worker.c:479
void apply_error_callback(void *arg)
Definition: worker.c:6118
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3911
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:874
void maybe_reread_subscription(void)
Definition: worker.c:5007
void InitializeLogRepWorker(void)
Definition: worker.c:5705
bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
void pa_allocate_worker(TransactionId xid)
PGDLLIMPORT ParallelApplyWorkerShared * MyParallelShared
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2238
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1821
void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
ParallelApplyWorkerInfo * pa_find_worker(TransactionId xid)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pa_decr_and_wait_stream_block(void)
PGDLLIMPORT MemoryContext ApplyContext
Definition: worker.c:472
uint64 XLogRecPtr
Definition: xlogdefs.h:21

AltStyle によって変換されたページ (->オリジナル) /