1/*-------------------------------------------------------------------------
3 * PostgreSQL logical replication: initial table data synchronization
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 * src/backend/replication/logical/tablesync.c
11 * This file contains code for initial table data synchronization for
12 * logical replication.
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the leader apply worker.
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
29 * The stream position synchronization works in multiple steps:
30 * - Apply worker requests a tablesync worker to start, setting the new
31 * table state to INIT.
32 * - Tablesync worker starts; changes table state from INIT to DATASYNC while
34 * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 * worker specific) state to indicate when the copy phase has completed, so
36 * if the worker crashes with this (non-memory) state then the copy will not
38 * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 * until either the table state is set to SYNCDONE or the sync worker
43 * - After the sync worker has seen the state change to CATCHUP, it will
44 * read the stream and apply changes (acting like an apply worker) until
45 * it catches up to the specified stream position. Then it sets the
46 * state to SYNCDONE. There might be zero changes applied between
47 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
49 * - Once the state is set to SYNCDONE, the apply will continue tracking
50 * the table until it reaches the SYNCDONE stream position, at which
51 * point it sets state to READY and stops tracking. Again, there might
52 * be zero changes in between.
54 * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
57 * The catalog pg_subscription_rel is used to keep information about
58 * subscribed tables and their state. The catalog holds all states
59 * except SYNCWAIT and CATCHUP which are only in shared memory.
61 * Example flows look like this:
62 * - Apply is in front:
64 * -> set in catalog FINISHEDCOPY
65 * -> set in memory SYNCWAIT
67 * -> set in memory CATCHUP
70 * -> set in catalog SYNCDONE
76 * -> set in catalog READY
80 * -> set in catalog FINISHEDCOPY
81 * -> set in memory SYNCWAIT
83 * -> set in memory CATCHUP
84 * -> continue per-table filtering
86 * -> set in catalog SYNCDONE
89 * -> set in catalog READY
90 * -> stop per-table filtering
92 *-------------------------------------------------------------------------
140 * Exit routine for synchronization worker.
146 * Commit any outstanding transaction. This is the usual case, unless
147 * there was nothing to do for the table.
155 /* And flush all writes. */
160 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
165 /* Find the leader apply worker and signal it. */
168 /* Stop gracefully */
173 * Wait until the relation sync state is set in the catalog to the expected
174 * one; return true when it happens.
176 * Returns false if the table sync worker or the table itself have
177 * disappeared, or the table state has been reset.
179 * Currently, this is used in the apply worker when transitioning from
180 * CATCHUP state to SYNCDONE.
198 if (
state == SUBREL_STATE_UNKNOWN)
201 if (
state == expected_state)
204 /* Check if the sync worker is still running and bail if not. */
214 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
223 * Wait until the apply worker changes the state of our synchronization
224 * worker to the expected one.
226 * Used when transitioning from SYNCWAIT state to CATCHUP.
228 * Returns false if the apply worker has disappeared.
242 * Done if already in correct state. (We assume this fetch is atomic
243 * enough to not give a misleading answer if we do it with no lock.)
249 * Bail out if the apply worker has died, else signal it we're
255 if (worker && worker->
proc)
262 * Wait. We expect to get a latch signal back from the apply worker,
263 * but use a timeout in case it dies without sending one.
267 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
277 * Callback from syscache invalidation.
286 * Handle table synchronization cooperation from the synchronization
289 * If the sync worker is in CATCHUP state and reached (or passed) the
290 * predetermined synchronization point in the WAL stream, mark the table as
291 * SYNCDONE and finish.
311 * UpdateSubscriptionRelState must be called within a transaction.
323 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
329 * Cleanup the tablesync slot.
331 * This has to be done after updating the state because otherwise if
332 * there is an error while doing the database operations we won't be
333 * able to rollback dropped slot.
338 sizeof(syncslotname));
341 * It is important to give an error if we are unable to drop the slot,
342 * otherwise, it won't be dropped till the corresponding subscription
343 * is dropped. So passing missing_ok = false.
351 * Start a new transaction to clean up the tablesync origin tracking.
352 * This transaction will be ended within the finish_sync_worker().
353 * Now, even, if we fail to remove this here, the apply worker will
354 * ensure to clean it up afterward.
356 * We need to do this after the table state is set to SYNCDONE.
357 * Otherwise, if an error occurs while performing the database
358 * operation, the worker will be restarted and the in-memory state of
359 * replication progress (remote_lsn) won't be rolled-back which would
360 * have been cleared before restart. So, the restarted worker will use
361 * invalid replication progress state resulting in replay of
362 * transactions that have already been applied.
372 * Resetting the origin session removes the ownership of the slot.
373 * This is needed to allow the origin to be dropped.
381 * Drop the tablesync's origin tracking if exists.
383 * There is a chance that the user is concurrently performing refresh
384 * for the subscription where we remove the table state and its origin
385 * or the apply worker would have removed this origin. So passing
397 * Handle table synchronization cooperation from the apply worker.
399 * Walk over all subscription tables that are individually tracked by the
400 * apply process (currently, all that have state other than
401 * SUBREL_STATE_READY) and manage synchronization for them.
403 * If there are tables that need synchronizing and are not being synchronized
404 * yet, start sync workers for them (if there are free slots for sync
405 * workers). To prevent starting the sync worker for the same relation at a
406 * high frequency after a failure, we store its last start time with each sync
407 * state info. We start the sync worker for the same relation after waiting
408 * at least wal_retrieve_retry_interval.
410 * For tables that are being synchronized already, check if sync workers
411 * either need action from the apply worker or have finished. This is the
412 * SYNCWAIT to CATCHUP transition.
414 * If the synchronization position is reached (SYNCDONE), then the table can
415 * be marked as READY and is no longer tracked.
420 struct tablesync_start_time_mapping
427 bool started_tx =
false;
428 bool should_exit =
false;
433 /* We need up-to-date sync state info for subscription tables here. */
437 * Prepare a hash table for tracking last start times of workers, to avoid
438 * immediate restarts. We don't need it if there are no tables that need
445 ctl.keysize =
sizeof(
Oid);
446 ctl.entrysize =
sizeof(
struct tablesync_start_time_mapping);
452 * Clean up the hash table when we're done with all tables (just to
453 * release the bit of memory).
462 * Process all tables that are being synchronized.
468 if (rstate->
state == SUBREL_STATE_SYNCDONE)
471 * Apply has caught up to the position where the table sync has
472 * finished. Mark the table as ready so that the apply will just
473 * continue to replicate it normally.
475 if (current_lsn >= rstate->
lsn)
479 rstate->
state = SUBREL_STATE_READY;
480 rstate->
lsn = current_lsn;
488 * Remove the tablesync origin tracking if exists.
490 * There is a chance that the user is concurrently performing
491 * refresh for the subscription where we remove the table
492 * state and its origin or the tablesync worker would have
493 * already removed this origin. We can't rely on tablesync
494 * worker to remove the origin tracking as if there is any
495 * error while dropping we won't restart it to drop the
496 * origin. So passing missing_ok = true.
498 * Lock the subscription and origin in the same order as we
499 * are doing during DDL commands to avoid deadlocks. See
500 * AlterSubscription_refresh.
515 * Update the state to READY only after the origin cleanup.
527 * Look for a sync worker for this relation.
532 rstate->
relid,
false);
536 /* Found one, update our copy of its state */
540 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
543 * Sync worker is waiting for apply. Tell sync worker it
546 syncworker->
relstate = SUBREL_STATE_CATCHUP;
552 /* If we told worker to catch up, wait for it. */
553 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
555 /* Signal the sync worker, as it may be waiting for us. */
556 if (syncworker->
proc)
559 /* Now safe to release the LWLock */
565 * We must commit the existing transaction to release
566 * the existing locks before entering a busy loop.
567 * This is required to avoid any undetected deadlocks
568 * due to any existing lock as deadlock detector won't
569 * be able to detect the waits on the latch.
571 * Also close any tables prior to the commit.
583 * Enter busy loop and wait for synchronization worker to
584 * reach expected state (or die trying).
590 SUBREL_STATE_SYNCDONE);
598 * If there is no sync worker for this table yet, count
599 * running sync workers for this subscription, while we have
605 /* Now safe to release the LWLock */
609 * If there are free sync worker slot(s), start a new sync
610 * worker for the table.
615 struct tablesync_start_time_mapping *hentry;
626 * Set the last_start_time even if we fail to start
627 * the worker, so that we won't retry until
628 * wal_retrieve_retry_interval has elapsed.
630 hentry->last_start_time =
now;
645 /* Close table if opened */
653 * Even when the two_phase mode is requested by the user, it remains
654 * as 'pending' until all tablesyncs have reached READY state.
656 * When this happens, we restart the apply worker and (if the
657 * conditions are still ok) then the two_phase tri-state will become
658 * 'enabled' at that time.
660 * Note: If the subscription has no tables then leave the state as
661 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
670 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
683 * Reset the last-start time for this worker so that the launcher will
684 * restart it without waiting for wal_retrieve_retry_interval.
693 * Process possible state change(s) of tables that are being synchronized.
703 * Skip for parallel apply workers because they only operate on
704 * tables that are in a READY state. See pa_can_start() and
705 * should_apply_changes_for_rel().
718 /* Should never happen. */
724 * Create list of columns for COPY based on logical relation mapping.
734 attnamelist =
lappend(attnamelist,
743 * Data source callback for the COPY FROM, which reads from the remote
744 * connection and passes the data back to our local COPY.
752 /* If there are some leftover data from previous read, use it. */
764 while (maxread > 0 && bytesread < minread)
772 /* Try read the data. */
783 /* Process the data */
792 outbuf = (
char *) outbuf + avail;
798 if (maxread <= 0 || bytesread >= minread)
803 * Wait for more data or latch.
808 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
818 * Get information about remote relation in similar fashion the RELATION
819 * message provides during replication.
821 * This function also returns (a) the relation qualifications to be used in
822 * the COPY command, and (b) whether the remote relation has published any
827 List **qual,
bool *gencol_published)
832 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
833 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
834 Oid qualRow[] = {TEXTOID};
844 /* First fetch Oid and replica identity. */
847 " FROM pg_catalog.pg_class c"
848 " INNER JOIN pg_catalog.pg_namespace n"
849 " ON (c.relnamespace = n.oid)"
850 " WHERE n.nspname = %s"
851 " AND c.relname = %s",
859 (
errcode(ERRCODE_CONNECTION_FAILURE),
860 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
866 (
errcode(ERRCODE_UNDEFINED_OBJECT),
867 errmsg(
"table \"%s.%s\" not found on publisher",
882 * Get column lists for each relation.
884 * We need to do this before fetching info about column names and types,
885 * so that we can skip columns that should not be replicated.
891 Oid attrsRow[] = {INT2VECTOROID};
893 /* Build the pub_names comma-separated string. */
898 * Fetch info about column lists for the relation (from all the
904 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
905 " THEN NULL ELSE gpt.attrs END)"
906 " FROM pg_publication p,"
907 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
909 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
910 " AND p.pubname IN ( %s )",
919 (
errcode(ERRCODE_CONNECTION_FAILURE),
920 errmsg(
"could not fetch column list info for table \"%s.%s\" from publisher: %s",
924 * We don't support the case where the column list is different for
925 * the same table when combining publications. See comments atop
926 * fetch_table_list. So there should be only one row returned.
927 * Although we already checked this when creating the subscription, we
928 * still need to check here in case the column list was changed after
929 * creating the subscription and before the sync worker is started.
933 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
934 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
938 * Get the column list and build a single bitmap with the attnums.
940 * If we find a NULL value, it means all the columns should be
958 for (natt = 0; natt < nelems; natt++)
970 * Now fetch column names and types.
977 " a.attnum = ANY(i.indkey)");
979 /* Generated columns can be replicated since version 18. */
984 " FROM pg_catalog.pg_attribute a"
985 " LEFT JOIN pg_catalog.pg_index i"
986 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
987 " WHERE a.attnum > 0::pg_catalog.int2"
988 " AND NOT a.attisdropped %s"
989 " AND a.attrelid = %u"
990 " ORDER BY a.attnum",
993 "AND a.attgenerated = ''" :
""),
1000 (
errcode(ERRCODE_CONNECTION_FAILURE),
1001 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
1004 /* We don't know the number of rows coming, so allocate enough space. */
1010 * Store the columns as a list of names. Ignore those that are not
1011 * present in the column list, if there is one.
1023 /* If the column is not in the column list, skip it. */
1033 lrel->
attnames[natt] = rel_colname;
1040 /* Remember if the remote table has published any generated column. */
1047 /* Should never happen. */
1049 elog(
ERROR,
"too many columns in remote table \"%s.%s\"",
1061 * Get relation's row filter expressions. DISTINCT avoids the same
1062 * expression of a table in multiple publications from being included
1063 * multiple times in the final expression.
1065 * We need to copy the row even if it matches just one of the
1066 * publications, so we later combine all the quals with OR.
1068 * For initial synchronization, row filtering can be ignored in following
1071 * 1) one of the subscribed publications for the table hasn't specified
1074 * 2) one of the subscribed publications has puballtables set to true
1076 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1077 * that includes this relation
1081 /* Reuse the already-built pub_names. */
1082 Assert(pub_names != NULL);
1084 /* Check for row filters. */
1087 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1088 " FROM pg_publication p,"
1089 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1090 " WHERE gpt.relid = %u"
1091 " AND p.pubname IN ( %s )",
1099 (
errmsg(
"could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1103 * Multiple row filter expressions for the same table will be combined
1104 * by COPY using OR. If any of the filter expressions for this table
1105 * are null, it means the whole table will be copied. In this case it
1106 * is not necessary to construct a unified row filter expression at
1118 /* Ignore filters and cleanup as necessary. */
1139 * Copy existing data of a table from publisher.
1141 * Caller is responsible for locking the local relation.
1155 bool gencol_published =
false;
1157 /* Get the publisher relation info. */
1162 /* Put the relation into relmap. */
1165 /* Map the publisher relation to local one. */
1169 /* Start copy on the publisher. */
1172 /* Regular table with no row filter or generated columns */
1173 if (lrel.
relkind == RELKIND_RELATION && qual ==
NIL && !gencol_published)
1178 /* If the table has columns, then specify the columns */
1184 * XXX Do we need to list the columns in all cases? Maybe we're
1185 * replicating all columns?
1187 for (
int i = 0;
i < lrel.
natts;
i++)
1203 * For non-tables and tables with row filters, we need to do COPY
1204 * (SELECT ...), but we can't just do SELECT * because we may need to
1205 * copy only subset of columns including generated columns. For tables
1206 * with any row filters, build a SELECT query with OR'ed row filters
1209 * We also need to use this same COPY (SELECT ...) syntax when
1210 * generated columns are published, because copy of generated columns
1211 * is not supported by the normal COPY.
1214 for (
int i = 0;
i < lrel.
natts;
i++)
1224 * For regular tables, make sure we don't copy data from a child that
1225 * inherits the named table as those will be copied separately.
1227 if (lrel.
relkind == RELKIND_RELATION)
1231 /* list of OR'ed filters */
1250 * Prior to v16, initial table synchronization will use text format even
1251 * if the binary option is enabled for a subscription.
1265 (
errcode(ERRCODE_CONNECTION_FAILURE),
1266 errmsg(
"could not start initial contents copy for table \"%s.%s\": %s",
1274 NULL,
false,
false);
1286 * Determine the tablesync slot name.
1288 * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1289 * on slot name length. We append system_identifier to avoid slot_name
1290 * collision with subscriptions in other clusters. With the current scheme
1291 * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '0円'), the maximum
1292 * length of slot_name will be 50.
1294 * The returned slot name is stored in the supplied buffer (syncslotname) with
1297 * Note: We don't use the subscription slot name as part of tablesync slot name
1298 * because we are responsible for cleaning up these slots and it could become
1299 * impossible to recalculate what name to cleanup if the subscription slot name
1304 char *syncslotname,
Size szslot)
1311 * Start syncing the table in the sync worker.
1313 * If nothing needs to be done to sync the table, we exit the worker without
1314 * any further action.
1316 * The returned slot name is palloc'ed in current memory context.
1331 bool must_use_password;
1334 /* Check the state of the table synchronization. */
1341 /* Is the use of a password mandatory? */
1351 * If synchronization is already done or no longer necessary, exit now
1352 * that we've updated shared memory state.
1356 case SUBREL_STATE_SYNCDONE:
1357 case SUBREL_STATE_READY:
1358 case SUBREL_STATE_UNKNOWN:
1362 /* Calculate the name of the tablesync slot. */
1370 * Here we use the slot name instead of the subscription name as the
1371 * application_name, so that it is different from the leader apply worker,
1372 * so that synchronous replication can distinguish them.
1380 (
errcode(ERRCODE_CONNECTION_FAILURE),
1381 errmsg(
"table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1388 /* Assign the origin tracking record name. */
1392 sizeof(originname));
1397 * We have previously errored out before finishing the copy so the
1398 * replication slot might exist. We want to remove the slot if it
1399 * already exists and proceed.
1401 * XXX We could also instead try to drop the slot, last time we failed
1402 * but for that, we might need to clean up the copy state as it might
1403 * be in the middle of fetching the rows. Also, if there is a network
1404 * breakdown then it wouldn't have succeeded so trying it next time
1405 * seems like a better bet.
1412 * The COPY phase was previously done, but tablesync then crashed
1413 * before it was able to finish normally.
1418 * The origin tracking name must already exist. It was created first
1419 * time this tablesync was launched.
1428 goto copy_table_done;
1436 /* Update the state and make it visible to others. */
1449 * Use a standard write lock here. It might be better to disallow access
1450 * to the table while it's being synchronized. But we don't want to block
1451 * the main apply process from working and it has to open the relation in
1452 * RowExclusiveLock when remapping remote relation id to local one.
1457 * Start a transaction in the remote node in REPEATABLE READ mode. This
1458 * ensures that both the replication slot we create (see below) and the
1459 * COPY are consistent with each other.
1462 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1466 (
errcode(ERRCODE_CONNECTION_FAILURE),
1467 errmsg(
"table copy could not start transaction on publisher: %s",
1472 * Create a new permanent logical decoding slot. This slot will be used
1473 * for the catchup phase after COPY is done, so tell it to use the
1474 * snapshot to make the final data consistent.
1477 slotname,
false /* permanent */ ,
false /* two_phase */ ,
1482 * Setup replication origin tracking. The purpose of doing this before the
1483 * copy is to avoid doing the copy again due to any error in setting up
1490 * Origin tracking does not exist, so create it now.
1492 * Then advance to the LSN got from walrcv_create_slot. This is WAL
1493 * logged for the purpose of recovery. Locks are to prevent the
1494 * replication origin from vanishing while advancing.
1500 true /* go backward */ ,
true /* WAL log */ );
1510 errmsg(
"replication origin \"%s\" already exists",
1515 * Make sure that the copy command runs as the table owner, unless the
1516 * user has opted out of that behaviour.
1523 * Check that our table sync worker has permission to insert into the
1534 * COPY FROM does not honor RLS policies. That is not a problem for
1535 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1536 * who has it implicitly), but other roles should not be able to
1537 * circumvent RLS. Disallow logical replication into RLS enabled
1538 * relations for such roles.
1542 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1543 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1547 /* Now do the initial data copy */
1555 (
errcode(ERRCODE_CONNECTION_FAILURE),
1556 errmsg(
"table copy could not finish transaction on publisher: %s",
1565 /* Make the copy visible. */
1569 * Update the persisted state to indicate the COPY phase is done; make it
1570 * visible to others.
1574 SUBREL_STATE_FINISHEDCOPY,
1583 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1587 * We are done with the initial data synchronization, update the state.
1595 * Finally, wait until the leader apply worker tells us to catch up and
1596 * then return to let LogicalRepApplyLoop do it.
1603 * Common code to fetch the up-to-date sync state info into the static lists.
1605 * Returns true if subscription has 1 or more tables, else false.
1607 * Note: If this function started the transaction (indicated by the parameter)
1608 * then it is the caller's responsibility to commit it.
1613 static bool has_subrels =
false;
1615 *started_tx =
false;
1626 /* Clean the old lists. */
1636 /* Fetch all non-ready tables. */
1639 /* Allocate the tracking info in a permanent memory context. */
1641 foreach(lc, rstates)
1650 * Does the subscription have tables?
1652 * If there were not-READY relations found then we know it does. But
1653 * if table_states_not_ready was empty we still need to check again to
1654 * see if there are 0 tables.
1660 * If the subscription relation cache has been invalidated since we
1661 * entered this routine, we still use and return the relations we just
1662 * finished constructing, to avoid infinite loops, but we leave the
1663 * table states marked as stale so that we'll rebuild it again on next
1664 * access. Otherwise, we mark the table states as valid.
1674 * Execute the initial sync with error handling. Disable the subscription,
1677 * Allocate the slot name in long-lived context on return. Note that we don't
1678 * handle FATAL errors which are probably because of system resource error and
1679 * are not repeatable.
1684 char *sync_slotname = NULL;
1690 /* Call initial sync. */
1700 * Report the worker failed during table synchronization. Abort
1701 * the current transaction so that the stats message is sent in an
1712 /* allocate slot name in long-lived context */
1714 pfree(sync_slotname);
1718 * Runs the tablesync worker.
1720 * It starts syncing tables. After a successful sync, sets streaming options
1721 * and starts streaming to catchup with apply worker.
1728 char *slotname = NULL;
1736 sizeof(originname));
1744 /* Apply the changes till we catchup with the apply worker. */
1748/* Logical Replication Tablesync worker entry point */
1762 * If the subscription has no tables then return false.
1764 * Otherwise, are all tablesyncs READY?
1766 * Note: This function is not suitable to be called from outside of apply or
1767 * tablesync workers because MySubscription needs to be already initialized.
1772 bool started_tx =
false;
1773 bool has_subrels =
false;
1775 /* We need up-to-date sync state info for subscription tables here. */
1785 * Return false when there are no tables in subscription or not all tables
1786 * are in ready state; true otherwise.
1792 * Return whether the subscription currently has any relations.
1794 * Note: Unlike HasSubscriptionRelations(), this function relies on cached
1795 * information for subscription relations. Additionally, it should not be
1796 * invoked outside of apply or tablesync workers, as MySubscription must be
1797 * initialized first.
1805 /* We need up-to-date subscription tables info here */
1818 * Update the two_phase state of the specified subscription in pg_subscription.
1825 bool nulls[Natts_pg_subscription];
1826 bool replaces[Natts_pg_subscription];
1829 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1830 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1831 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1837 "cache lookup failed for subscription oid %u",
1840 /* Form a new tuple. */
1842 memset(nulls,
false,
sizeof(nulls));
1843 memset(replaces,
false,
sizeof(replaces));
1845 /* And update/set two_phase state */
1847 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1850 values, nulls, replaces);
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
#define DSM_HANDLE_INVALID
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void hash_destroy(HTAB *hashp)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char * get_rel_name(Oid relid)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CacheMemoryContext
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
static int server_version
#define for_each_from(cell, lst, N)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
bool HasSubscriptionRelations(Oid subid)
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
LogicalRepWorkerType type
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
@ SYNC_TABLE_STATE_REBUILD_STARTED
@ SYNC_TABLE_STATE_NEEDS_REBUILD
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
void TablesyncWorkerMain(Datum main_arg)
static pg_noreturn void finish_sync_worker(void)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
static SyncingTablesState table_states_validity
void process_syncing_tables(XLogRecPtr current_lsn)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
static void copy_table(Relation rel)
static bool wait_for_relation_state_change(Oid relid, char expected_state)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
bool HasSubscriptionRelationsCached(void)
static bool FetchTableStates(bool *started_tx)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ WORKERTYPE_PARALLEL_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr