PostgreSQL Source Code: src/backend/replication/logical/tablesync.c Source File

PostgreSQL Source Code git master
tablesync.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * tablesync.c
3 * PostgreSQL logical replication: initial table data synchronization
4 *
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/tablesync.c
9 *
10 * NOTES
11 * This file contains code for initial table data synchronization for
12 * logical replication.
13 *
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the leader apply worker.
18 *
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
28 *
29 * The stream position synchronization works in multiple steps:
30 * - Apply worker requests a tablesync worker to start, setting the new
31 * table state to INIT.
32 * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 * copying.
34 * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 * worker specific) state to indicate when the copy phase has completed, so
36 * if the worker crashes with this (non-memory) state then the copy will not
37 * be re-attempted.
38 * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 * until either the table state is set to SYNCDONE or the sync worker
42 * exits.
43 * - After the sync worker has seen the state change to CATCHUP, it will
44 * read the stream and apply changes (acting like an apply worker) until
45 * it catches up to the specified stream position. Then it sets the
46 * state to SYNCDONE. There might be zero changes applied between
47 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 * apply worker.
49 * - Once the state is set to SYNCDONE, the apply will continue tracking
50 * the table until it reaches the SYNCDONE stream position, at which
51 * point it sets state to READY and stops tracking. Again, there might
52 * be zero changes in between.
53 *
54 * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 *
57 * The catalog pg_subscription_rel is used to keep information about
58 * subscribed tables and their state. The catalog holds all states
59 * except SYNCWAIT and CATCHUP which are only in shared memory.
60 *
61 * Example flows look like this:
62 * - Apply is in front:
63 * sync:8
64 * -> set in catalog FINISHEDCOPY
65 * -> set in memory SYNCWAIT
66 * apply:10
67 * -> set in memory CATCHUP
68 * -> enter wait-loop
69 * sync:10
70 * -> set in catalog SYNCDONE
71 * -> exit
72 * apply:10
73 * -> exit wait-loop
74 * -> continue rep
75 * apply:11
76 * -> set in catalog READY
77 *
78 * - Sync is in front:
79 * sync:10
80 * -> set in catalog FINISHEDCOPY
81 * -> set in memory SYNCWAIT
82 * apply:8
83 * -> set in memory CATCHUP
84 * -> continue per-table filtering
85 * sync:10
86 * -> set in catalog SYNCDONE
87 * -> exit
88 * apply:10
89 * -> set in catalog READY
90 * -> stop per-table filtering
91 * -> continue rep
92 *-------------------------------------------------------------------------
93 */
94
95#include "postgres.h"
96
97#include "access/table.h"
98#include "access/xact.h"
99#include "catalog/indexing.h"
100#include "catalog/pg_subscription_rel.h"
101#include "catalog/pg_type.h"
102#include "commands/copy.h"
103#include "miscadmin.h"
104#include "nodes/makefuncs.h"
105#include "parser/parse_relation.h"
106#include "pgstat.h"
107#include "replication/logicallauncher.h"
108#include "replication/logicalrelation.h"
109#include "replication/logicalworker.h"
110#include "replication/origin.h"
111#include "replication/slot.h"
112#include "replication/walreceiver.h"
113#include "replication/worker_internal.h"
114#include "storage/ipc.h"
115#include "storage/lmgr.h"
116#include "utils/acl.h"
117#include "utils/array.h"
118#include "utils/builtins.h"
119#include "utils/lsyscache.h"
120#include "utils/memutils.h"
121#include "utils/rls.h"
122#include "utils/snapmgr.h"
123#include "utils/syscache.h"
124#include "utils/usercontext.h"
125
126 typedef enum
127{
128 SYNC_TABLE_STATE_NEEDS_REBUILD,
129 SYNC_TABLE_STATE_REBUILD_STARTED,
130 SYNC_TABLE_STATE_VALID,
131} SyncingTablesState;
132
133 static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 static List *table_states_not_ready = NIL;
135static bool FetchTableStates(bool *started_tx);
136
137 static StringInfo copybuf = NULL;
138
139/*
140 * Exit routine for synchronization worker.
141 */
142pg_noreturn static void
143 finish_sync_worker(void)
144{
145 /*
146 * Commit any outstanding transaction. This is the usual case, unless
147 * there was nothing to do for the table.
148 */
149 if (IsTransactionState())
150 {
151 CommitTransactionCommand();
152 pgstat_report_stat(true);
153 }
154
155 /* And flush all writes. */
156 XLogFlush(GetXLogWriteRecPtr());
157
158 StartTransactionCommand();
159 ereport(LOG,
160 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
161 MySubscription->name,
162 get_rel_name(MyLogicalRepWorker->relid))));
163 CommitTransactionCommand();
164
165 /* Find the leader apply worker and signal it. */
166 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
167
168 /* Stop gracefully */
169 proc_exit(0);
170}
171
172/*
173 * Wait until the relation sync state is set in the catalog to the expected
174 * one; return true when it happens.
175 *
176 * Returns false if the table sync worker or the table itself have
177 * disappeared, or the table state has been reset.
178 *
179 * Currently, this is used in the apply worker when transitioning from
180 * CATCHUP state to SYNCDONE.
181 */
182static bool
183 wait_for_relation_state_change(Oid relid, char expected_state)
184{
185 char state;
186
187 for (;;)
188 {
189 LogicalRepWorker *worker;
190 XLogRecPtr statelsn;
191
192 CHECK_FOR_INTERRUPTS();
193
194 InvalidateCatalogSnapshot();
195 state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
196 relid, &statelsn);
197
198 if (state == SUBREL_STATE_UNKNOWN)
199 break;
200
201 if (state == expected_state)
202 return true;
203
204 /* Check if the sync worker is still running and bail if not. */
205 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
206 worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
207 false);
208 LWLockRelease(LogicalRepWorkerLock);
209 if (!worker)
210 break;
211
212 (void) WaitLatch(MyLatch,
213 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
214 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215
216 ResetLatch(MyLatch);
217 }
218
219 return false;
220}
221
222/*
223 * Wait until the apply worker changes the state of our synchronization
224 * worker to the expected one.
225 *
226 * Used when transitioning from SYNCWAIT state to CATCHUP.
227 *
228 * Returns false if the apply worker has disappeared.
229 */
230static bool
231 wait_for_worker_state_change(char expected_state)
232{
233 int rc;
234
235 for (;;)
236 {
237 LogicalRepWorker *worker;
238
239 CHECK_FOR_INTERRUPTS();
240
241 /*
242 * Done if already in correct state. (We assume this fetch is atomic
243 * enough to not give a misleading answer if we do it with no lock.)
244 */
245 if (MyLogicalRepWorker->relstate == expected_state)
246 return true;
247
248 /*
249 * Bail out if the apply worker has died, else signal it we're
250 * waiting.
251 */
252 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
253 worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
254 InvalidOid, false);
255 if (worker && worker->proc)
256 logicalrep_worker_wakeup_ptr(worker);
257 LWLockRelease(LogicalRepWorkerLock);
258 if (!worker)
259 break;
260
261 /*
262 * Wait. We expect to get a latch signal back from the apply worker,
263 * but use a timeout in case it dies without sending one.
264 */
265 rc = WaitLatch(MyLatch,
266 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
267 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268
269 if (rc & WL_LATCH_SET)
270 ResetLatch(MyLatch);
271 }
272
273 return false;
274}
275
276/*
277 * Callback from syscache invalidation.
278 */
279void
280 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
281{
282 table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
283}
284
285/*
286 * Handle table synchronization cooperation from the synchronization
287 * worker.
288 *
289 * If the sync worker is in CATCHUP state and reached (or passed) the
290 * predetermined synchronization point in the WAL stream, mark the table as
291 * SYNCDONE and finish.
292 */
293static void
294 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295{
296 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
297
298 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
299 current_lsn >= MyLogicalRepWorker->relstate_lsn)
300 {
301 TimeLineID tli;
302 char syncslotname[NAMEDATALEN] = {0};
303 char originname[NAMEDATALEN] = {0};
304
305 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
306 MyLogicalRepWorker->relstate_lsn = current_lsn;
307
308 SpinLockRelease(&MyLogicalRepWorker->relmutex);
309
310 /*
311 * UpdateSubscriptionRelState must be called within a transaction.
312 */
313 if (!IsTransactionState())
314 StartTransactionCommand();
315
316 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
317 MyLogicalRepWorker->relid,
318 MyLogicalRepWorker->relstate,
319 MyLogicalRepWorker->relstate_lsn,
320 false);
321
322 /*
323 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324 * the slot.
325 */
326 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
327
328 /*
329 * Cleanup the tablesync slot.
330 *
331 * This has to be done after updating the state because otherwise if
332 * there is an error while doing the database operations we won't be
333 * able to rollback dropped slot.
334 */
335 ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
336 MyLogicalRepWorker->relid,
337 syncslotname,
338 sizeof(syncslotname));
339
340 /*
341 * It is important to give an error if we are unable to drop the slot,
342 * otherwise, it won't be dropped till the corresponding subscription
343 * is dropped. So passing missing_ok = false.
344 */
345 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
346
347 CommitTransactionCommand();
348 pgstat_report_stat(false);
349
350 /*
351 * Start a new transaction to clean up the tablesync origin tracking.
352 * This transaction will be ended within the finish_sync_worker().
353 * Now, even, if we fail to remove this here, the apply worker will
354 * ensure to clean it up afterward.
355 *
356 * We need to do this after the table state is set to SYNCDONE.
357 * Otherwise, if an error occurs while performing the database
358 * operation, the worker will be restarted and the in-memory state of
359 * replication progress (remote_lsn) won't be rolled-back which would
360 * have been cleared before restart. So, the restarted worker will use
361 * invalid replication progress state resulting in replay of
362 * transactions that have already been applied.
363 */
364 StartTransactionCommand();
365
366 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
367 MyLogicalRepWorker->relid,
368 originname,
369 sizeof(originname));
370
371 /*
372 * Resetting the origin session removes the ownership of the slot.
373 * This is needed to allow the origin to be dropped.
374 */
375 replorigin_session_reset();
376 replorigin_session_origin = InvalidRepOriginId;
377 replorigin_session_origin_lsn = InvalidXLogRecPtr;
378 replorigin_session_origin_timestamp = 0;
379
380 /*
381 * Drop the tablesync's origin tracking if exists.
382 *
383 * There is a chance that the user is concurrently performing refresh
384 * for the subscription where we remove the table state and its origin
385 * or the apply worker would have removed this origin. So passing
386 * missing_ok = true.
387 */
388 replorigin_drop_by_name(originname, true, false);
389
390 finish_sync_worker();
391 }
392 else
393 SpinLockRelease(&MyLogicalRepWorker->relmutex);
394}
395
396/*
397 * Handle table synchronization cooperation from the apply worker.
398 *
399 * Walk over all subscription tables that are individually tracked by the
400 * apply process (currently, all that have state other than
401 * SUBREL_STATE_READY) and manage synchronization for them.
402 *
403 * If there are tables that need synchronizing and are not being synchronized
404 * yet, start sync workers for them (if there are free slots for sync
405 * workers). To prevent starting the sync worker for the same relation at a
406 * high frequency after a failure, we store its last start time with each sync
407 * state info. We start the sync worker for the same relation after waiting
408 * at least wal_retrieve_retry_interval.
409 *
410 * For tables that are being synchronized already, check if sync workers
411 * either need action from the apply worker or have finished. This is the
412 * SYNCWAIT to CATCHUP transition.
413 *
414 * If the synchronization position is reached (SYNCDONE), then the table can
415 * be marked as READY and is no longer tracked.
416 */
417static void
418 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
419{
420 struct tablesync_start_time_mapping
421 {
422 Oid relid;
423 TimestampTz last_start_time;
424 };
425 static HTAB *last_start_times = NULL;
426 ListCell *lc;
427 bool started_tx = false;
428 bool should_exit = false;
429 Relation rel = NULL;
430
431 Assert(!IsTransactionState());
432
433 /* We need up-to-date sync state info for subscription tables here. */
434 FetchTableStates(&started_tx);
435
436 /*
437 * Prepare a hash table for tracking last start times of workers, to avoid
438 * immediate restarts. We don't need it if there are no tables that need
439 * syncing.
440 */
441 if (table_states_not_ready != NIL && !last_start_times)
442 {
443 HASHCTL ctl;
444
445 ctl.keysize = sizeof(Oid);
446 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
447 last_start_times = hash_create("Logical replication table sync worker start times",
448 256, &ctl, HASH_ELEM | HASH_BLOBS);
449 }
450
451 /*
452 * Clean up the hash table when we're done with all tables (just to
453 * release the bit of memory).
454 */
455 else if (table_states_not_ready == NIL && last_start_times)
456 {
457 hash_destroy(last_start_times);
458 last_start_times = NULL;
459 }
460
461 /*
462 * Process all tables that are being synchronized.
463 */
464 foreach(lc, table_states_not_ready)
465 {
466 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
467
468 if (rstate->state == SUBREL_STATE_SYNCDONE)
469 {
470 /*
471 * Apply has caught up to the position where the table sync has
472 * finished. Mark the table as ready so that the apply will just
473 * continue to replicate it normally.
474 */
475 if (current_lsn >= rstate->lsn)
476 {
477 char originname[NAMEDATALEN];
478
479 rstate->state = SUBREL_STATE_READY;
480 rstate->lsn = current_lsn;
481 if (!started_tx)
482 {
483 StartTransactionCommand();
484 started_tx = true;
485 }
486
487 /*
488 * Remove the tablesync origin tracking if exists.
489 *
490 * There is a chance that the user is concurrently performing
491 * refresh for the subscription where we remove the table
492 * state and its origin or the tablesync worker would have
493 * already removed this origin. We can't rely on tablesync
494 * worker to remove the origin tracking as if there is any
495 * error while dropping we won't restart it to drop the
496 * origin. So passing missing_ok = true.
497 *
498 * Lock the subscription and origin in the same order as we
499 * are doing during DDL commands to avoid deadlocks. See
500 * AlterSubscription_refresh.
501 */
502 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
503 0, AccessShareLock);
504
505 if (!rel)
506 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
507
508 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
509 rstate->relid,
510 originname,
511 sizeof(originname));
512 replorigin_drop_by_name(originname, true, false);
513
514 /*
515 * Update the state to READY only after the origin cleanup.
516 */
517 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
518 rstate->relid, rstate->state,
519 rstate->lsn, true);
520 }
521 }
522 else
523 {
524 LogicalRepWorker *syncworker;
525
526 /*
527 * Look for a sync worker for this relation.
528 */
529 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
530
531 syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
532 rstate->relid, false);
533
534 if (syncworker)
535 {
536 /* Found one, update our copy of its state */
537 SpinLockAcquire(&syncworker->relmutex);
538 rstate->state = syncworker->relstate;
539 rstate->lsn = syncworker->relstate_lsn;
540 if (rstate->state == SUBREL_STATE_SYNCWAIT)
541 {
542 /*
543 * Sync worker is waiting for apply. Tell sync worker it
544 * can catchup now.
545 */
546 syncworker->relstate = SUBREL_STATE_CATCHUP;
547 syncworker->relstate_lsn =
548 Max(syncworker->relstate_lsn, current_lsn);
549 }
550 SpinLockRelease(&syncworker->relmutex);
551
552 /* If we told worker to catch up, wait for it. */
553 if (rstate->state == SUBREL_STATE_SYNCWAIT)
554 {
555 /* Signal the sync worker, as it may be waiting for us. */
556 if (syncworker->proc)
557 logicalrep_worker_wakeup_ptr(syncworker);
558
559 /* Now safe to release the LWLock */
560 LWLockRelease(LogicalRepWorkerLock);
561
562 if (started_tx)
563 {
564 /*
565 * We must commit the existing transaction to release
566 * the existing locks before entering a busy loop.
567 * This is required to avoid any undetected deadlocks
568 * due to any existing lock as deadlock detector won't
569 * be able to detect the waits on the latch.
570 *
571 * Also close any tables prior to the commit.
572 */
573 if (rel)
574 {
575 table_close(rel, NoLock);
576 rel = NULL;
577 }
578 CommitTransactionCommand();
579 pgstat_report_stat(false);
580 }
581
582 /*
583 * Enter busy loop and wait for synchronization worker to
584 * reach expected state (or die trying).
585 */
586 StartTransactionCommand();
587 started_tx = true;
588
589 wait_for_relation_state_change(rstate->relid,
590 SUBREL_STATE_SYNCDONE);
591 }
592 else
593 LWLockRelease(LogicalRepWorkerLock);
594 }
595 else
596 {
597 /*
598 * If there is no sync worker for this table yet, count
599 * running sync workers for this subscription, while we have
600 * the lock.
601 */
602 int nsyncworkers =
603 logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
604
605 /* Now safe to release the LWLock */
606 LWLockRelease(LogicalRepWorkerLock);
607
608 /*
609 * If there are free sync worker slot(s), start a new sync
610 * worker for the table.
611 */
612 if (nsyncworkers < max_sync_workers_per_subscription)
613 {
614 TimestampTz now = GetCurrentTimestamp();
615 struct tablesync_start_time_mapping *hentry;
616 bool found;
617
618 hentry = hash_search(last_start_times, &rstate->relid,
619 HASH_ENTER, &found);
620
621 if (!found ||
622 TimestampDifferenceExceeds(hentry->last_start_time, now,
623 wal_retrieve_retry_interval))
624 {
625 /*
626 * Set the last_start_time even if we fail to start
627 * the worker, so that we won't retry until
628 * wal_retrieve_retry_interval has elapsed.
629 */
630 hentry->last_start_time = now;
631 (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
632 MyLogicalRepWorker->dbid,
633 MySubscription->oid,
634 MySubscription->name,
635 MyLogicalRepWorker->userid,
636 rstate->relid,
637 DSM_HANDLE_INVALID,
638 false);
639 }
640 }
641 }
642 }
643 }
644
645 /* Close table if opened */
646 if (rel)
647 table_close(rel, NoLock);
648
649
650 if (started_tx)
651 {
652 /*
653 * Even when the two_phase mode is requested by the user, it remains
654 * as 'pending' until all tablesyncs have reached READY state.
655 *
656 * When this happens, we restart the apply worker and (if the
657 * conditions are still ok) then the two_phase tri-state will become
658 * 'enabled' at that time.
659 *
660 * Note: If the subscription has no tables then leave the state as
661 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
662 * work.
663 */
664 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
665 {
666 CommandCounterIncrement(); /* make updates visible */
667 if (AllTablesyncsReady())
668 {
669 ereport(LOG,
670 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
671 MySubscription->name)));
672 should_exit = true;
673 }
674 }
675
676 CommitTransactionCommand();
677 pgstat_report_stat(true);
678 }
679
680 if (should_exit)
681 {
682 /*
683 * Reset the last-start time for this worker so that the launcher will
684 * restart it without waiting for wal_retrieve_retry_interval.
685 */
686 ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
687
688 proc_exit(0);
689 }
690}
691
692/*
693 * Process possible state change(s) of tables that are being synchronized.
694 */
695void
696 process_syncing_tables(XLogRecPtr current_lsn)
697{
698 switch (MyLogicalRepWorker->type)
699 {
700 case WORKERTYPE_PARALLEL_APPLY:
701
702 /*
703 * Skip for parallel apply workers because they only operate on
704 * tables that are in a READY state. See pa_can_start() and
705 * should_apply_changes_for_rel().
706 */
707 break;
708
709 case WORKERTYPE_TABLESYNC:
710 process_syncing_tables_for_sync(current_lsn);
711 break;
712
713 case WORKERTYPE_APPLY:
714 process_syncing_tables_for_apply(current_lsn);
715 break;
716
717 case WORKERTYPE_UNKNOWN:
718 /* Should never happen. */
719 elog(ERROR, "Unknown worker type");
720 }
721}
722
723/*
724 * Create list of columns for COPY based on logical relation mapping.
725 */
726static List *
727 make_copy_attnamelist(LogicalRepRelMapEntry *rel)
728{
729 List *attnamelist = NIL;
730 int i;
731
732 for (i = 0; i < rel->remoterel.natts; i++)
733 {
734 attnamelist = lappend(attnamelist,
735 makeString(rel->remoterel.attnames[i]));
736 }
737
738
739 return attnamelist;
740}
741
742/*
743 * Data source callback for the COPY FROM, which reads from the remote
744 * connection and passes the data back to our local COPY.
745 */
746static int
747 copy_read_data(void *outbuf, int minread, int maxread)
748{
749 int bytesread = 0;
750 int avail;
751
752 /* If there are some leftover data from previous read, use it. */
753 avail = copybuf->len - copybuf->cursor;
754 if (avail)
755 {
756 if (avail > maxread)
757 avail = maxread;
758 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
759 copybuf->cursor += avail;
760 maxread -= avail;
761 bytesread += avail;
762 }
763
764 while (maxread > 0 && bytesread < minread)
765 {
766 pgsocket fd = PGINVALID_SOCKET;
767 int len;
768 char *buf = NULL;
769
770 for (;;)
771 {
772 /* Try read the data. */
773 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
774
775 CHECK_FOR_INTERRUPTS();
776
777 if (len == 0)
778 break;
779 else if (len < 0)
780 return bytesread;
781 else
782 {
783 /* Process the data */
784 copybuf->data = buf;
785 copybuf->len = len;
786 copybuf->cursor = 0;
787
788 avail = copybuf->len - copybuf->cursor;
789 if (avail > maxread)
790 avail = maxread;
791 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
792 outbuf = (char *) outbuf + avail;
793 copybuf->cursor += avail;
794 maxread -= avail;
795 bytesread += avail;
796 }
797
798 if (maxread <= 0 || bytesread >= minread)
799 return bytesread;
800 }
801
802 /*
803 * Wait for more data or latch.
804 */
805 (void) WaitLatchOrSocket(MyLatch,
806 WL_SOCKET_READABLE | WL_LATCH_SET |
807 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
808 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
809
810 ResetLatch(MyLatch);
811 }
812
813 return bytesread;
814}
815
816
817/*
818 * Get information about remote relation in similar fashion the RELATION
819 * message provides during replication.
820 *
821 * This function also returns (a) the relation qualifications to be used in
822 * the COPY command, and (b) whether the remote relation has published any
823 * generated column.
824 */
825static void
826 fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
827 List **qual, bool *gencol_published)
828{
829 WalRcvExecResult *res;
830 StringInfoData cmd;
831 TupleTableSlot *slot;
832 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
833 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
834 Oid qualRow[] = {TEXTOID};
835 bool isnull;
836 int natt;
837 StringInfo pub_names = NULL;
838 Bitmapset *included_cols = NULL;
839 int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
840
841 lrel->nspname = nspname;
842 lrel->relname = relname;
843
844 /* First fetch Oid and replica identity. */
845 initStringInfo(&cmd);
846 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
847 " FROM pg_catalog.pg_class c"
848 " INNER JOIN pg_catalog.pg_namespace n"
849 " ON (c.relnamespace = n.oid)"
850 " WHERE n.nspname = %s"
851 " AND c.relname = %s",
852 quote_literal_cstr(nspname),
853 quote_literal_cstr(relname));
854 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
855 lengthof(tableRow), tableRow);
856
857 if (res->status != WALRCV_OK_TUPLES)
858 ereport(ERROR,
859 (errcode(ERRCODE_CONNECTION_FAILURE),
860 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
861 nspname, relname, res->err)));
862
863 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
864 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
865 ereport(ERROR,
866 (errcode(ERRCODE_UNDEFINED_OBJECT),
867 errmsg("table \"%s.%s\" not found on publisher",
868 nspname, relname)));
869
870 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
871 Assert(!isnull);
872 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
873 Assert(!isnull);
874 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
875 Assert(!isnull);
876
877 ExecDropSingleTupleTableSlot(slot);
878 walrcv_clear_result(res);
879
880
881 /*
882 * Get column lists for each relation.
883 *
884 * We need to do this before fetching info about column names and types,
885 * so that we can skip columns that should not be replicated.
886 */
887 if (server_version >= 150000)
888 {
889 WalRcvExecResult *pubres;
890 TupleTableSlot *tslot;
891 Oid attrsRow[] = {INT2VECTOROID};
892
893 /* Build the pub_names comma-separated string. */
894 pub_names = makeStringInfo();
895 GetPublicationsStr(MySubscription->publications, pub_names, true);
896
897 /*
898 * Fetch info about column lists for the relation (from all the
899 * publications).
900 */
901 resetStringInfo(&cmd);
902 appendStringInfo(&cmd,
903 "SELECT DISTINCT"
904 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
905 " THEN NULL ELSE gpt.attrs END)"
906 " FROM pg_publication p,"
907 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
908 " pg_class c"
909 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
910 " AND p.pubname IN ( %s )",
911 lrel->remoteid,
912 pub_names->data);
913
914 pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
915 lengthof(attrsRow), attrsRow);
916
917 if (pubres->status != WALRCV_OK_TUPLES)
918 ereport(ERROR,
919 (errcode(ERRCODE_CONNECTION_FAILURE),
920 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
921 nspname, relname, pubres->err)));
922
923 /*
924 * We don't support the case where the column list is different for
925 * the same table when combining publications. See comments atop
926 * fetch_table_list. So there should be only one row returned.
927 * Although we already checked this when creating the subscription, we
928 * still need to check here in case the column list was changed after
929 * creating the subscription and before the sync worker is started.
930 */
931 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
932 ereport(ERROR,
933 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
934 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
935 nspname, relname));
936
937 /*
938 * Get the column list and build a single bitmap with the attnums.
939 *
940 * If we find a NULL value, it means all the columns should be
941 * replicated.
942 */
943 tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
944 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
945 {
946 Datum cfval = slot_getattr(tslot, 1, &isnull);
947
948 if (!isnull)
949 {
950 ArrayType *arr;
951 int nelems;
952 int16 *elems;
953
954 arr = DatumGetArrayTypeP(cfval);
955 nelems = ARR_DIMS(arr)[0];
956 elems = (int16 *) ARR_DATA_PTR(arr);
957
958 for (natt = 0; natt < nelems; natt++)
959 included_cols = bms_add_member(included_cols, elems[natt]);
960 }
961
962 ExecClearTuple(tslot);
963 }
964 ExecDropSingleTupleTableSlot(tslot);
965
966 walrcv_clear_result(pubres);
967 }
968
969 /*
970 * Now fetch column names and types.
971 */
972 resetStringInfo(&cmd);
973 appendStringInfoString(&cmd,
974 "SELECT a.attnum,"
975 " a.attname,"
976 " a.atttypid,"
977 " a.attnum = ANY(i.indkey)");
978
979 /* Generated columns can be replicated since version 18. */
980 if (server_version >= 180000)
981 appendStringInfoString(&cmd, ", a.attgenerated != ''");
982
983 appendStringInfo(&cmd,
984 " FROM pg_catalog.pg_attribute a"
985 " LEFT JOIN pg_catalog.pg_index i"
986 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
987 " WHERE a.attnum > 0::pg_catalog.int2"
988 " AND NOT a.attisdropped %s"
989 " AND a.attrelid = %u"
990 " ORDER BY a.attnum",
991 lrel->remoteid,
992 (server_version >= 120000 && server_version < 180000 ?
993 "AND a.attgenerated = ''" : ""),
994 lrel->remoteid);
995 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
996 server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
997
998 if (res->status != WALRCV_OK_TUPLES)
999 ereport(ERROR,
1000 (errcode(ERRCODE_CONNECTION_FAILURE),
1001 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
1002 nspname, relname, res->err)));
1003
1004 /* We don't know the number of rows coming, so allocate enough space. */
1005 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
1006 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
1007 lrel->attkeys = NULL;
1008
1009 /*
1010 * Store the columns as a list of names. Ignore those that are not
1011 * present in the column list, if there is one.
1012 */
1013 natt = 0;
1014 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1015 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1016 {
1017 char *rel_colname;
1018 AttrNumber attnum;
1019
1020 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
1021 Assert(!isnull);
1022
1023 /* If the column is not in the column list, skip it. */
1024 if (included_cols != NULL && !bms_is_member(attnum, included_cols))
1025 {
1026 ExecClearTuple(slot);
1027 continue;
1028 }
1029
1030 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1031 Assert(!isnull);
1032
1033 lrel->attnames[natt] = rel_colname;
1034 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1035 Assert(!isnull);
1036
1037 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1038 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1039
1040 /* Remember if the remote table has published any generated column. */
1041 if (server_version >= 180000 && !(*gencol_published))
1042 {
1043 *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1044 Assert(!isnull);
1045 }
1046
1047 /* Should never happen. */
1048 if (++natt >= MaxTupleAttributeNumber)
1049 elog(ERROR, "too many columns in remote table \"%s.%s\"",
1050 nspname, relname);
1051
1052 ExecClearTuple(slot);
1053 }
1054 ExecDropSingleTupleTableSlot(slot);
1055
1056 lrel->natts = natt;
1057
1058 walrcv_clear_result(res);
1059
1060 /*
1061 * Get relation's row filter expressions. DISTINCT avoids the same
1062 * expression of a table in multiple publications from being included
1063 * multiple times in the final expression.
1064 *
1065 * We need to copy the row even if it matches just one of the
1066 * publications, so we later combine all the quals with OR.
1067 *
1068 * For initial synchronization, row filtering can be ignored in following
1069 * cases:
1070 *
1071 * 1) one of the subscribed publications for the table hasn't specified
1072 * any row filter
1073 *
1074 * 2) one of the subscribed publications has puballtables set to true
1075 *
1076 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1077 * that includes this relation
1078 */
1079 if (server_version >= 150000)
1080 {
1081 /* Reuse the already-built pub_names. */
1082 Assert(pub_names != NULL);
1083
1084 /* Check for row filters. */
1085 resetStringInfo(&cmd);
1086 appendStringInfo(&cmd,
1087 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1088 " FROM pg_publication p,"
1089 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1090 " WHERE gpt.relid = %u"
1091 " AND p.pubname IN ( %s )",
1092 lrel->remoteid,
1093 pub_names->data);
1094
1095 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1096
1097 if (res->status != WALRCV_OK_TUPLES)
1098 ereport(ERROR,
1099 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1100 nspname, relname, res->err)));
1101
1102 /*
1103 * Multiple row filter expressions for the same table will be combined
1104 * by COPY using OR. If any of the filter expressions for this table
1105 * are null, it means the whole table will be copied. In this case it
1106 * is not necessary to construct a unified row filter expression at
1107 * all.
1108 */
1109 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1110 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1111 {
1112 Datum rf = slot_getattr(slot, 1, &isnull);
1113
1114 if (!isnull)
1115 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1116 else
1117 {
1118 /* Ignore filters and cleanup as necessary. */
1119 if (*qual)
1120 {
1121 list_free_deep(*qual);
1122 *qual = NIL;
1123 }
1124 break;
1125 }
1126
1127 ExecClearTuple(slot);
1128 }
1129 ExecDropSingleTupleTableSlot(slot);
1130
1131 walrcv_clear_result(res);
1132 destroyStringInfo(pub_names);
1133 }
1134
1135 pfree(cmd.data);
1136}
1137
1138/*
1139 * Copy existing data of a table from publisher.
1140 *
1141 * Caller is responsible for locking the local relation.
1142 */
1143static void
1144 copy_table(Relation rel)
1145{
1146 LogicalRepRelMapEntry *relmapentry;
1147 LogicalRepRelation lrel;
1148 List *qual = NIL;
1149 WalRcvExecResult *res;
1150 StringInfoData cmd;
1151 CopyFromState cstate;
1152 List *attnamelist;
1153 ParseState *pstate;
1154 List *options = NIL;
1155 bool gencol_published = false;
1156
1157 /* Get the publisher relation info. */
1158 fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
1159 RelationGetRelationName(rel), &lrel, &qual,
1160 &gencol_published);
1161
1162 /* Put the relation into relmap. */
1163 logicalrep_relmap_update(&lrel);
1164
1165 /* Map the publisher relation to local one. */
1166 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1167 Assert(rel == relmapentry->localrel);
1168
1169 /* Start copy on the publisher. */
1170 initStringInfo(&cmd);
1171
1172 /* Regular table with no row filter or generated columns */
1173 if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1174 {
1175 appendStringInfo(&cmd, "COPY %s",
1176 quote_qualified_identifier(lrel.nspname, lrel.relname));
1177
1178 /* If the table has columns, then specify the columns */
1179 if (lrel.natts)
1180 {
1181 appendStringInfoString(&cmd, " (");
1182
1183 /*
1184 * XXX Do we need to list the columns in all cases? Maybe we're
1185 * replicating all columns?
1186 */
1187 for (int i = 0; i < lrel.natts; i++)
1188 {
1189 if (i > 0)
1190 appendStringInfoString(&cmd, ", ");
1191
1192 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1193 }
1194
1195 appendStringInfoChar(&cmd, ')');
1196 }
1197
1198 appendStringInfoString(&cmd, " TO STDOUT");
1199 }
1200 else
1201 {
1202 /*
1203 * For non-tables and tables with row filters, we need to do COPY
1204 * (SELECT ...), but we can't just do SELECT * because we may need to
1205 * copy only subset of columns including generated columns. For tables
1206 * with any row filters, build a SELECT query with OR'ed row filters
1207 * for COPY.
1208 *
1209 * We also need to use this same COPY (SELECT ...) syntax when
1210 * generated columns are published, because copy of generated columns
1211 * is not supported by the normal COPY.
1212 */
1213 appendStringInfoString(&cmd, "COPY (SELECT ");
1214 for (int i = 0; i < lrel.natts; i++)
1215 {
1216 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1217 if (i < lrel.natts - 1)
1218 appendStringInfoString(&cmd, ", ");
1219 }
1220
1221 appendStringInfoString(&cmd, " FROM ");
1222
1223 /*
1224 * For regular tables, make sure we don't copy data from a child that
1225 * inherits the named table as those will be copied separately.
1226 */
1227 if (lrel.relkind == RELKIND_RELATION)
1228 appendStringInfoString(&cmd, "ONLY ");
1229
1230 appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1231 /* list of OR'ed filters */
1232 if (qual != NIL)
1233 {
1234 ListCell *lc;
1235 char *q = strVal(linitial(qual));
1236
1237 appendStringInfo(&cmd, " WHERE %s", q);
1238 for_each_from(lc, qual, 1)
1239 {
1240 q = strVal(lfirst(lc));
1241 appendStringInfo(&cmd, " OR %s", q);
1242 }
1243 list_free_deep(qual);
1244 }
1245
1246 appendStringInfoString(&cmd, ") TO STDOUT");
1247 }
1248
1249 /*
1250 * Prior to v16, initial table synchronization will use text format even
1251 * if the binary option is enabled for a subscription.
1252 */
1253 if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1254 MySubscription->binary)
1255 {
1256 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1257 options = list_make1(makeDefElem("format",
1258 (Node *) makeString("binary"), -1));
1259 }
1260
1261 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1262 pfree(cmd.data);
1263 if (res->status != WALRCV_OK_COPY_OUT)
1264 ereport(ERROR,
1265 (errcode(ERRCODE_CONNECTION_FAILURE),
1266 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1267 lrel.nspname, lrel.relname, res->err)));
1268 walrcv_clear_result(res);
1269
1270 copybuf = makeStringInfo();
1271
1272 pstate = make_parsestate(NULL);
1273 (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1274 NULL, false, false);
1275
1276 attnamelist = make_copy_attnamelist(relmapentry);
1277 cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1278
1279 /* Do the copy */
1280 (void) CopyFrom(cstate);
1281
1282 logicalrep_rel_close(relmapentry, NoLock);
1283}
1284
1285/*
1286 * Determine the tablesync slot name.
1287 *
1288 * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1289 * on slot name length. We append system_identifier to avoid slot_name
1290 * collision with subscriptions in other clusters. With the current scheme
1291 * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '0円'), the maximum
1292 * length of slot_name will be 50.
1293 *
1294 * The returned slot name is stored in the supplied buffer (syncslotname) with
1295 * the given size.
1296 *
1297 * Note: We don't use the subscription slot name as part of tablesync slot name
1298 * because we are responsible for cleaning up these slots and it could become
1299 * impossible to recalculate what name to cleanup if the subscription slot name
1300 * had changed.
1301 */
1302void
1303 ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1304 char *syncslotname, Size szslot)
1305{
1306 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1307 relid, GetSystemIdentifier());
1308}
1309
1310/*
1311 * Start syncing the table in the sync worker.
1312 *
1313 * If nothing needs to be done to sync the table, we exit the worker without
1314 * any further action.
1315 *
1316 * The returned slot name is palloc'ed in current memory context.
1317 */
1318static char *
1319 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1320{
1321 char *slotname;
1322 char *err;
1323 char relstate;
1324 XLogRecPtr relstate_lsn;
1325 Relation rel;
1326 AclResult aclresult;
1327 WalRcvExecResult *res;
1328 char originname[NAMEDATALEN];
1329 RepOriginId originid;
1330 UserContext ucxt;
1331 bool must_use_password;
1332 bool run_as_owner;
1333
1334 /* Check the state of the table synchronization. */
1335 StartTransactionCommand();
1336 relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1337 MyLogicalRepWorker->relid,
1338 &relstate_lsn);
1339 CommitTransactionCommand();
1340
1341 /* Is the use of a password mandatory? */
1342 must_use_password = MySubscription->passwordrequired &&
1343 !MySubscription->ownersuperuser;
1344
1345 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1346 MyLogicalRepWorker->relstate = relstate;
1347 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1348 SpinLockRelease(&MyLogicalRepWorker->relmutex);
1349
1350 /*
1351 * If synchronization is already done or no longer necessary, exit now
1352 * that we've updated shared memory state.
1353 */
1354 switch (relstate)
1355 {
1356 case SUBREL_STATE_SYNCDONE:
1357 case SUBREL_STATE_READY:
1358 case SUBREL_STATE_UNKNOWN:
1359 finish_sync_worker(); /* doesn't return */
1360 }
1361
1362 /* Calculate the name of the tablesync slot. */
1363 slotname = (char *) palloc(NAMEDATALEN);
1364 ReplicationSlotNameForTablesync(MySubscription->oid,
1365 MyLogicalRepWorker->relid,
1366 slotname,
1367 NAMEDATALEN);
1368
1369 /*
1370 * Here we use the slot name instead of the subscription name as the
1371 * application_name, so that it is different from the leader apply worker,
1372 * so that synchronous replication can distinguish them.
1373 */
1374 LogRepWorkerWalRcvConn =
1375 walrcv_connect(MySubscription->conninfo, true, true,
1376 must_use_password,
1377 slotname, &err);
1378 if (LogRepWorkerWalRcvConn == NULL)
1379 ereport(ERROR,
1380 (errcode(ERRCODE_CONNECTION_FAILURE),
1381 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1382 MySubscription->name, err)));
1383
1384 Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1385 MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1386 MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1387
1388 /* Assign the origin tracking record name. */
1389 ReplicationOriginNameForLogicalRep(MySubscription->oid,
1390 MyLogicalRepWorker->relid,
1391 originname,
1392 sizeof(originname));
1393
1394 if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1395 {
1396 /*
1397 * We have previously errored out before finishing the copy so the
1398 * replication slot might exist. We want to remove the slot if it
1399 * already exists and proceed.
1400 *
1401 * XXX We could also instead try to drop the slot, last time we failed
1402 * but for that, we might need to clean up the copy state as it might
1403 * be in the middle of fetching the rows. Also, if there is a network
1404 * breakdown then it wouldn't have succeeded so trying it next time
1405 * seems like a better bet.
1406 */
1407 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1408 }
1409 else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1410 {
1411 /*
1412 * The COPY phase was previously done, but tablesync then crashed
1413 * before it was able to finish normally.
1414 */
1415 StartTransactionCommand();
1416
1417 /*
1418 * The origin tracking name must already exist. It was created first
1419 * time this tablesync was launched.
1420 */
1421 originid = replorigin_by_name(originname, false);
1422 replorigin_session_setup(originid, 0);
1423 replorigin_session_origin = originid;
1424 *origin_startpos = replorigin_session_get_progress(false);
1425
1426 CommitTransactionCommand();
1427
1428 goto copy_table_done;
1429 }
1430
1431 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1432 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1433 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1434 SpinLockRelease(&MyLogicalRepWorker->relmutex);
1435
1436 /* Update the state and make it visible to others. */
1437 StartTransactionCommand();
1438 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1439 MyLogicalRepWorker->relid,
1440 MyLogicalRepWorker->relstate,
1441 MyLogicalRepWorker->relstate_lsn,
1442 false);
1443 CommitTransactionCommand();
1444 pgstat_report_stat(true);
1445
1446 StartTransactionCommand();
1447
1448 /*
1449 * Use a standard write lock here. It might be better to disallow access
1450 * to the table while it's being synchronized. But we don't want to block
1451 * the main apply process from working and it has to open the relation in
1452 * RowExclusiveLock when remapping remote relation id to local one.
1453 */
1454 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1455
1456 /*
1457 * Start a transaction in the remote node in REPEATABLE READ mode. This
1458 * ensures that both the replication slot we create (see below) and the
1459 * COPY are consistent with each other.
1460 */
1461 res = walrcv_exec(LogRepWorkerWalRcvConn,
1462 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1463 0, NULL);
1464 if (res->status != WALRCV_OK_COMMAND)
1465 ereport(ERROR,
1466 (errcode(ERRCODE_CONNECTION_FAILURE),
1467 errmsg("table copy could not start transaction on publisher: %s",
1468 res->err)));
1469 walrcv_clear_result(res);
1470
1471 /*
1472 * Create a new permanent logical decoding slot. This slot will be used
1473 * for the catchup phase after COPY is done, so tell it to use the
1474 * snapshot to make the final data consistent.
1475 */
1476 walrcv_create_slot(LogRepWorkerWalRcvConn,
1477 slotname, false /* permanent */ , false /* two_phase */ ,
1478 MySubscription->failover,
1479 CRS_USE_SNAPSHOT, origin_startpos);
1480
1481 /*
1482 * Setup replication origin tracking. The purpose of doing this before the
1483 * copy is to avoid doing the copy again due to any error in setting up
1484 * origin tracking.
1485 */
1486 originid = replorigin_by_name(originname, true);
1487 if (!OidIsValid(originid))
1488 {
1489 /*
1490 * Origin tracking does not exist, so create it now.
1491 *
1492 * Then advance to the LSN got from walrcv_create_slot. This is WAL
1493 * logged for the purpose of recovery. Locks are to prevent the
1494 * replication origin from vanishing while advancing.
1495 */
1496 originid = replorigin_create(originname);
1497
1498 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1499 replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1500 true /* go backward */ , true /* WAL log */ );
1501 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1502
1503 replorigin_session_setup(originid, 0);
1504 replorigin_session_origin = originid;
1505 }
1506 else
1507 {
1508 ereport(ERROR,
1509 (errcode(ERRCODE_DUPLICATE_OBJECT),
1510 errmsg("replication origin \"%s\" already exists",
1511 originname)));
1512 }
1513
1514 /*
1515 * Make sure that the copy command runs as the table owner, unless the
1516 * user has opted out of that behaviour.
1517 */
1518 run_as_owner = MySubscription->runasowner;
1519 if (!run_as_owner)
1520 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1521
1522 /*
1523 * Check that our table sync worker has permission to insert into the
1524 * target table.
1525 */
1526 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1527 ACL_INSERT);
1528 if (aclresult != ACLCHECK_OK)
1529 aclcheck_error(aclresult,
1530 get_relkind_objtype(rel->rd_rel->relkind),
1531 RelationGetRelationName(rel));
1532
1533 /*
1534 * COPY FROM does not honor RLS policies. That is not a problem for
1535 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1536 * who has it implicitly), but other roles should not be able to
1537 * circumvent RLS. Disallow logical replication into RLS enabled
1538 * relations for such roles.
1539 */
1540 if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1541 ereport(ERROR,
1542 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1543 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1544 GetUserNameFromId(GetUserId(), true),
1545 RelationGetRelationName(rel))));
1546
1547 /* Now do the initial data copy */
1548 PushActiveSnapshot(GetTransactionSnapshot());
1549 copy_table(rel);
1550 PopActiveSnapshot();
1551
1552 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1553 if (res->status != WALRCV_OK_COMMAND)
1554 ereport(ERROR,
1555 (errcode(ERRCODE_CONNECTION_FAILURE),
1556 errmsg("table copy could not finish transaction on publisher: %s",
1557 res->err)));
1558 walrcv_clear_result(res);
1559
1560 if (!run_as_owner)
1561 RestoreUserContext(&ucxt);
1562
1563 table_close(rel, NoLock);
1564
1565 /* Make the copy visible. */
1566 CommandCounterIncrement();
1567
1568 /*
1569 * Update the persisted state to indicate the COPY phase is done; make it
1570 * visible to others.
1571 */
1572 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1573 MyLogicalRepWorker->relid,
1574 SUBREL_STATE_FINISHEDCOPY,
1575 MyLogicalRepWorker->relstate_lsn,
1576 false);
1577
1578 CommitTransactionCommand();
1579
1580copy_table_done:
1581
1582 elog(DEBUG1,
1583 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1584 originname, LSN_FORMAT_ARGS(*origin_startpos));
1585
1586 /*
1587 * We are done with the initial data synchronization, update the state.
1588 */
1589 SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1590 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1591 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1592 SpinLockRelease(&MyLogicalRepWorker->relmutex);
1593
1594 /*
1595 * Finally, wait until the leader apply worker tells us to catch up and
1596 * then return to let LogicalRepApplyLoop do it.
1597 */
1598 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1599 return slotname;
1600}
1601
1602/*
1603 * Common code to fetch the up-to-date sync state info into the static lists.
1604 *
1605 * Returns true if subscription has 1 or more tables, else false.
1606 *
1607 * Note: If this function started the transaction (indicated by the parameter)
1608 * then it is the caller's responsibility to commit it.
1609 */
1610static bool
1611 FetchTableStates(bool *started_tx)
1612{
1613 static bool has_subrels = false;
1614
1615 *started_tx = false;
1616
1617 if (table_states_validity != SYNC_TABLE_STATE_VALID)
1618 {
1619 MemoryContext oldctx;
1620 List *rstates;
1621 ListCell *lc;
1622 SubscriptionRelState *rstate;
1623
1624 table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1625
1626 /* Clean the old lists. */
1627 list_free_deep(table_states_not_ready);
1628 table_states_not_ready = NIL;
1629
1630 if (!IsTransactionState())
1631 {
1632 StartTransactionCommand();
1633 *started_tx = true;
1634 }
1635
1636 /* Fetch all non-ready tables. */
1637 rstates = GetSubscriptionRelations(MySubscription->oid, true);
1638
1639 /* Allocate the tracking info in a permanent memory context. */
1640 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1641 foreach(lc, rstates)
1642 {
1643 rstate = palloc(sizeof(SubscriptionRelState));
1644 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1645 table_states_not_ready = lappend(table_states_not_ready, rstate);
1646 }
1647 MemoryContextSwitchTo(oldctx);
1648
1649 /*
1650 * Does the subscription have tables?
1651 *
1652 * If there were not-READY relations found then we know it does. But
1653 * if table_states_not_ready was empty we still need to check again to
1654 * see if there are 0 tables.
1655 */
1656 has_subrels = (table_states_not_ready != NIL) ||
1657 HasSubscriptionRelations(MySubscription->oid);
1658
1659 /*
1660 * If the subscription relation cache has been invalidated since we
1661 * entered this routine, we still use and return the relations we just
1662 * finished constructing, to avoid infinite loops, but we leave the
1663 * table states marked as stale so that we'll rebuild it again on next
1664 * access. Otherwise, we mark the table states as valid.
1665 */
1666 if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1667 table_states_validity = SYNC_TABLE_STATE_VALID;
1668 }
1669
1670 return has_subrels;
1671}
1672
1673/*
1674 * Execute the initial sync with error handling. Disable the subscription,
1675 * if it's required.
1676 *
1677 * Allocate the slot name in long-lived context on return. Note that we don't
1678 * handle FATAL errors which are probably because of system resource error and
1679 * are not repeatable.
1680 */
1681static void
1682 start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1683{
1684 char *sync_slotname = NULL;
1685
1686 Assert(am_tablesync_worker());
1687
1688 PG_TRY();
1689 {
1690 /* Call initial sync. */
1691 sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1692 }
1693 PG_CATCH();
1694 {
1695 if (MySubscription->disableonerr)
1696 DisableSubscriptionAndExit();
1697 else
1698 {
1699 /*
1700 * Report the worker failed during table synchronization. Abort
1701 * the current transaction so that the stats message is sent in an
1702 * idle state.
1703 */
1704 AbortOutOfAnyTransaction();
1705 pgstat_report_subscription_error(MySubscription->oid, false);
1706
1707 PG_RE_THROW();
1708 }
1709 }
1710 PG_END_TRY();
1711
1712 /* allocate slot name in long-lived context */
1713 *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1714 pfree(sync_slotname);
1715}
1716
1717/*
1718 * Runs the tablesync worker.
1719 *
1720 * It starts syncing tables. After a successful sync, sets streaming options
1721 * and starts streaming to catchup with apply worker.
1722 */
1723static void
1724 run_tablesync_worker()
1725{
1726 char originname[NAMEDATALEN];
1727 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1728 char *slotname = NULL;
1729 WalRcvStreamOptions options;
1730
1731 start_table_sync(&origin_startpos, &slotname);
1732
1733 ReplicationOriginNameForLogicalRep(MySubscription->oid,
1734 MyLogicalRepWorker->relid,
1735 originname,
1736 sizeof(originname));
1737
1738 set_apply_error_context_origin(originname);
1739
1740 set_stream_options(&options, slotname, &origin_startpos);
1741
1742 walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1743
1744 /* Apply the changes till we catchup with the apply worker. */
1745 start_apply(origin_startpos);
1746}
1747
1748/* Logical Replication Tablesync worker entry point */
1749void
1750 TablesyncWorkerMain(Datum main_arg)
1751{
1752 int worker_slot = DatumGetInt32(main_arg);
1753
1754 SetupApplyOrSyncWorker(worker_slot);
1755
1756 run_tablesync_worker();
1757
1758 finish_sync_worker();
1759}
1760
1761/*
1762 * If the subscription has no tables then return false.
1763 *
1764 * Otherwise, are all tablesyncs READY?
1765 *
1766 * Note: This function is not suitable to be called from outside of apply or
1767 * tablesync workers because MySubscription needs to be already initialized.
1768 */
1769bool
1770 AllTablesyncsReady(void)
1771{
1772 bool started_tx = false;
1773 bool has_subrels = false;
1774
1775 /* We need up-to-date sync state info for subscription tables here. */
1776 has_subrels = FetchTableStates(&started_tx);
1777
1778 if (started_tx)
1779 {
1780 CommitTransactionCommand();
1781 pgstat_report_stat(true);
1782 }
1783
1784 /*
1785 * Return false when there are no tables in subscription or not all tables
1786 * are in ready state; true otherwise.
1787 */
1788 return has_subrels && (table_states_not_ready == NIL);
1789}
1790
1791/*
1792 * Return whether the subscription currently has any relations.
1793 *
1794 * Note: Unlike HasSubscriptionRelations(), this function relies on cached
1795 * information for subscription relations. Additionally, it should not be
1796 * invoked outside of apply or tablesync workers, as MySubscription must be
1797 * initialized first.
1798 */
1799bool
1800 HasSubscriptionRelationsCached(void)
1801{
1802 bool started_tx;
1803 bool has_subrels;
1804
1805 /* We need up-to-date subscription tables info here */
1806 has_subrels = FetchTableStates(&started_tx);
1807
1808 if (started_tx)
1809 {
1810 CommitTransactionCommand();
1811 pgstat_report_stat(true);
1812 }
1813
1814 return has_subrels;
1815}
1816
1817/*
1818 * Update the two_phase state of the specified subscription in pg_subscription.
1819 */
1820void
1821 UpdateTwoPhaseState(Oid suboid, char new_state)
1822{
1823 Relation rel;
1824 HeapTuple tup;
1825 bool nulls[Natts_pg_subscription];
1826 bool replaces[Natts_pg_subscription];
1827 Datum values[Natts_pg_subscription];
1828
1829 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1830 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1831 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1832
1833 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1834 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1835 if (!HeapTupleIsValid(tup))
1836 elog(ERROR,
1837 "cache lookup failed for subscription oid %u",
1838 suboid);
1839
1840 /* Form a new tuple. */
1841 memset(values, 0, sizeof(values));
1842 memset(nulls, false, sizeof(nulls));
1843 memset(replaces, false, sizeof(replaces));
1844
1845 /* And update/set two_phase state */
1846 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1847 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1848
1849 tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1850 values, nulls, replaces);
1851 CatalogTupleUpdate(rel, &tup->t_self, tup);
1852
1853 heap_freetuple(tup);
1854 table_close(rel, RowExclusiveLock);
1855}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4037
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define ARR_DIMS(a)
Definition: array.h:294
int16 AttrNumber
Definition: attnum.h:21
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:5483
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:5552
void DisableSubscriptionAndExit(void)
Definition: worker.c:5905
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6260
MemoryContext ApplyContext
Definition: worker.c:472
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5831
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
Subscription * MySubscription
Definition: worker.c:479
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define pg_noreturn
Definition: c.h:164
#define Max(x, y)
Definition: c.h:997
#define UINT64_FORMAT
Definition: c.h:557
int16_t int16
Definition: c.h:533
uint32_t uint32
Definition: c.h:538
#define lengthof(array)
Definition: c.h:787
#define OidIsValid(objectId)
Definition: c.h:774
size_t Size
Definition: c.h:610
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1529
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:779
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:382
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
void err(int eval, const char *fmt,...)
Definition: err.c:43
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
struct Latch * MyLatch
Definition: globals.c:63
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void proc_exit(int code)
Definition: ipc.c:104
i
int i
Definition: isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:223
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:720
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:317
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:254
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:700
static dshash_table * last_start_times
Definition: launcher.c:91
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
int max_sync_workers_per_subscription
Definition: launcher.c:53
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:874
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1101
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:637
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1746
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc0(Size size)
Definition: mcxt.c:1395
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
Oid GetUserId(void)
Definition: miscinit.c:469
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:988
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
void replorigin_session_reset(void)
Definition: origin.c:1226
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:911
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1120
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1273
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition: parsenodes.h:76
int16 attnum
Definition: pg_attribute.h:74
void * arg
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
static int server_version
Definition: pg_dumpall.c:109
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
static char ** options
Definition: pg_recvlogical.c:59
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
bool HasSubscriptionRelations(Oid subid)
static char * buf
Definition: pg_test_fsync.c:72
long pgstat_report_stat(bool force)
Definition: pgstat.c:693
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:239
#define PGINVALID_SOCKET
Definition: port.h:31
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:252
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static char DatumGetChar(Datum X)
Definition: postgres.h:122
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:172
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
tree ctl
Definition: radixtree.h:1838
#define RelationGetRelid(relation)
Definition: rel.h:514
#define RelationGetDescr(relation)
Definition: rel.h:540
#define RelationGetRelationName(relation)
Definition: rel.h:548
#define RelationGetNamespace(relation)
Definition: rel.h:555
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13119
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13035
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:271
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:680
void PopActiveSnapshot(void)
Definition: snapmgr.c:773
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:454
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:504
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:349
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
void destroyStringInfo(StringInfo str)
Definition: stringinfo.c:409
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: array.h:93
Definition: hsearch.h:66
Definition: dynahash.c:222
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
LogicalRepRelation remoterel
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Bitmapset * attkeys
Definition: logicalproto.h:115
XLogRecPtr relstate_lsn
LogicalRepWorkerType type
Definition: nodes.h:135
Definition: rel.h:56
Form_pg_class rd_rel
Definition: rel.h:111
char * data
Definition: stringinfo.h:48
bool passwordrequired
char * conninfo
bool ownersuperuser
char twophasestate
List * publications
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
Definition: oid2name.c:30
Definition: regguts.h:323
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static List * table_states_not_ready
Definition: tablesync.c:134
bool AllTablesyncsReady(void)
Definition: tablesync.c:1770
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:231
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
SyncingTablesState
Definition: tablesync.c:127
@ SYNC_TABLE_STATE_REBUILD_STARTED
Definition: tablesync.c:129
@ SYNC_TABLE_STATE_VALID
Definition: tablesync.c:130
@ SYNC_TABLE_STATE_NEEDS_REBUILD
Definition: tablesync.c:128
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:418
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:727
void TablesyncWorkerMain(Datum main_arg)
Definition: tablesync.c:1750
static pg_noreturn void finish_sync_worker(void)
Definition: tablesync.c:143
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:294
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
Definition: tablesync.c:826
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1303
static void run_tablesync_worker()
Definition: tablesync.c:1724
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:747
static SyncingTablesState table_states_validity
Definition: tablesync.c:133
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:696
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1319
static void copy_table(Relation rel)
Definition: tablesync.c:1144
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:183
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition: tablesync.c:1682
static StringInfo copybuf
Definition: tablesync.c:137
bool HasSubscriptionRelationsCached(void)
Definition: tablesync.c:1800
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1611
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1821
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:580
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457
Definition: pg_list.h:46
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:387
void CommandCounterIncrement(void)
Definition: xact.c:1100
void StartTransactionCommand(void)
Definition: xact.c:3071
void CommitTransactionCommand(void)
Definition: xact.c:3169
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4874
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4595
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9495
int wal_retrieve_retry_interval
Definition: xlog.c:135
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2780
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:46
uint16 RepOriginId
Definition: xlogdefs.h:68
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:62

AltStyle によって変換されたページ (->オリジナル) /