1/*-------------------------------------------------------------------------
3 * PostgreSQL logical decoding coordination
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 * src/backend/replication/logical/logical.c
11 * This file coordinates interaction between the various modules that
12 * together provide logical decoding, primarily by providing so
13 * called LogicalDecodingContexts. The goal is to encapsulate most of the
14 * internal complexity for consumers of logical decoding, so they can
15 * create and consume a changestream with a low amount of code. Builtin
16 * consumers are the walsender and SQL SRF interface, but it's possible to
17 * add further ones without changing core code, e.g. to consume changes in
20 * The idea is that a consumer provides three callbacks, one to read WAL,
21 * one to prepare a data write, and a final one for actually writing since
22 * their implementation depends on the type of consumer. Check
23 * logicalfuncs.c for an example implementation of a fairly simple consumer
24 * and an implementation of a WAL reading callback that's suitable for
26 *-------------------------------------------------------------------------
49/* data for errcontext callback */
57/* wrappers around output plugin callbacks */
78 const char *prefix,
Size message_size,
const char *message);
80/* streaming callbacks */
95 const char *prefix,
Size message_size,
const char *message);
99/* callback to update txn's progress */
107 * Make sure the current settings & environment are capable of doing logical
116 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
117 * needs the same check.
122 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
123 errmsg(
"logical decoding requires \"wal_level\" >= \"logical\"")));
127 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
128 errmsg(
"logical decoding requires a database connection")));
133 * This check may have race conditions, but whenever
134 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
135 * verify that there are no existing logical replication slots. And to
136 * avoid races around creating a new slot,
137 * CheckLogicalDecodingRequirements() is called once before creating
138 * the slot, and once when logical decoding is initially starting up.
142 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
143 errmsg(
"logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
148 * Helper function for CreateInitDecodingContext() and
149 * CreateDecodingContext() performing common tasks.
155 bool need_full_snapshot,
168 /* shorter lines... */
172 "Logical decoding context",
180 * (re-)load output plugins, so we detect a bad (removed) output plugin
187 * Now that the slot's xmin has been set, we can announce ourselves as a
188 * logical decoding backend which doesn't need to be checked individually
189 * when computing the xmin horizon because the xmin is enforced via
192 * We can only do so if we're outside of a transaction (i.e. the case when
193 * streaming changes via walsender), otherwise an already setup
194 * snapshot/xid would end up being ignored. That's not a particularly
195 * bothersome restriction since the SQL interface can't be used for
211 (
errcode(ERRCODE_OUT_OF_MEMORY),
213 errdetail(
"Failed while allocating a WAL reading processor.")));
222 /* wrap output plugin callbacks, so we can add error context information */
230 * To support streaming, we require start/stop/abort/commit/change
231 * callbacks. The message and truncate callbacks are optional, similar to
232 * regular output plugins. We however enable streaming when at least one
233 * of the methods is enabled so that we can easily identify missing
236 * We decide it here, but only check it later in the wrappers.
247 * streaming callbacks
249 * stream_message and stream_truncate callbacks are optional, so we do not
250 * fail with ERROR when missing, but the wrappers simply do nothing. We
251 * must set the ReorderBuffer callbacks to something, otherwise the calls
252 * from there will crash (we don't want to move the checks there).
265 * To support two-phase logical decoding, we require
266 * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
267 * filter_prepare callback is optional. We however enable two-phase
268 * logical decoding when at least one of the methods is enabled so that we
269 * can easily identify missing methods.
271 * We decide it here, but only check it later in the wrappers.
281 * Callback to support decoding at prepare time.
289 * Callback to support updating progress during sending data of a
290 * transaction (and its subtransactions) to the output plugin.
296 ctx->
write = do_write;
309 * Create a new decoding context, for a new logical slot.
311 * plugin -- contains the name of the output plugin
312 * output_plugin_options -- contains options passed to the output plugin
313 * need_full_snapshot -- if true, must obtain a snapshot able to read all
314 * tables; if false, one that can read only catalogs is acceptable.
315 * restart_lsn -- if given as invalid, it's this routine's responsibility to
316 * mark WAL as reserved by setting a convenient restart_lsn for the slot.
317 * Otherwise, we set for decoding to start from the given LSN without
318 * marking WAL reserved beforehand. In that scenario, it's up to the
319 * caller to guarantee that WAL remains available.
320 * xl_routine -- XLogReaderRoutine for underlying XLogReader
321 * prepare_write, do_write, update_progress --
322 * callbacks that perform the use-case dependent, actual, work.
324 * Needs to be called while in a memory context that's at least as long lived
325 * as the decoding context because further memory contexts will be created
328 * Returns an initialized decoding context after calling the output plugin's
333 List *output_plugin_options,
334 bool need_full_snapshot,
348 * On a standby, this check is also required while creating the slot.
349 * Check the comments in the function.
353 /* shorter lines... */
356 /* first some sanity checks that are unlikely to be violated */
358 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
361 elog(
ERROR,
"cannot initialize logical decoding without a specified plugin");
363 /* Make sure the passed slot is suitable. These are user facing errors. */
366 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
367 errmsg(
"cannot use physical replication slot for logical decoding")));
371 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
372 errmsg(
"replication slot \"%s\" was not created in this database",
378 (
errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
379 errmsg(
"cannot create logical replication slot in transaction that has performed writes")));
382 * Register output plugin name with slot. We need the mutex to avoid
383 * concurrent reading of a partially copied string. But we don't want any
384 * complicated code while holding a spinlock, so do namestrcpy() outside.
401 * This is a bit tricky: We need to determine a safe xmin horizon to start
402 * decoding from, to avoid starting from a running xacts record referring
403 * to xids whose rows have been vacuumed or pruned
404 * already. GetOldestSafeDecodingTransactionId() returns such a value, but
405 * without further interlock its return value might immediately be out of
408 * So we have to acquire the ProcArrayLock to prevent computation of new
409 * xmin horizons by other backends, get the safe decoding xid, and inform
410 * the slot machinery about the new limit. Once that's done the
411 * ProcArrayLock can be released as the slot machinery now is
412 * protecting against vacuum.
414 * Note that, temporarily, the data, not just the catalog, xmin has to be
415 * reserved if a data snapshot is to be exported. Otherwise the initial
416 * data snapshot created here is not guaranteed to be valid. After that
417 * the data xmin doesn't need to be managed anymore and the global xmin
418 * should be recomputed. As we are fine with losing the pegged data xmin
419 * after crash - no chance a snapshot would get exported anymore - we can
420 * get away with just setting the slot's
421 * effective_xmin. ReplicationSlotRelease will reset it again.
432 if (need_full_snapshot)
444 need_full_snapshot,
false,
true,
445 xl_routine, prepare_write, do_write,
448 /* call output plugin initialization callback */
455 * We allow decoding of prepared transactions when the two_phase is
456 * enabled at the time of slot creation, or when the two_phase option is
457 * given at the streaming start, provided the plugin supports all the
458 * callbacks for two-phase.
468 * Create a new decoding context, for a logical slot that has previously been
472 * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
473 * from the slot's confirmed_flush; otherwise, start from the specified
474 * location (but move it forwards to confirmed_flush if it's older than
477 * output_plugin_options
478 * options passed to the output plugin.
481 * bypass the generation of logical changes.
484 * XLogReaderRoutine used by underlying xlogreader
486 * prepare_write, do_write, update_progress
487 * callbacks that have to be filled to perform the use-case dependent,
490 * Needs to be called while in a memory context that's at least as long lived
491 * as the decoding context because further memory contexts will be created
494 * Returns an initialized decoding context after calling the output plugin's
499 List *output_plugin_options,
510 /* shorter lines... */
513 /* first some sanity checks that are unlikely to be violated */
515 elog(
ERROR,
"cannot perform logical decoding without an acquired slot");
517 /* make sure the passed slot is suitable, these are user facing errors */
520 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
521 errmsg(
"cannot use physical replication slot for logical decoding")));
524 * We need to access the system tables during decoding to build the
525 * logical changes unless we are in fast_forward mode where no changes are
530 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
531 errmsg(
"replication slot \"%s\" was not created in this database",
535 * The slots being synced from the primary can't be used for decoding as
536 * they are used after failover. However, we do allow advancing the LSNs
537 * during the synchronization of slots. See update_local_synced_slot.
541 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
542 errmsg(
"cannot use replication slot \"%s\" for logical decoding",
544 errdetail(
"This replication slot is being synchronized from the primary server."),
545 errhint(
"Specify another replication slot."));
547 /* slot must be valid to allow decoding */
553 /* continue from last position */
556 else if (start_lsn < slot->
data.confirmed_flush)
559 * It might seem like we should error out in this case, but it's
560 * pretty common for a client to acknowledge a LSN it doesn't have to
561 * do anything for, and thus didn't store persistently, because the
562 * xlog records didn't result in anything relevant for logical
563 * decoding. Clients have to be able to do that to support synchronous
566 * Starting at a different LSN than requested might not catch certain
567 * kinds of client errors; so the client may wish to check that
568 * confirmed_flush_lsn matches its expectations.
570 elog(
LOG,
"%X/%08X has been already streamed, forwarding to %X/%08X",
579 fast_forward,
false, xl_routine, prepare_write,
580 do_write, update_progress);
582 /* call output plugin initialization callback */
589 * We allow decoding of prepared transactions when the two_phase is
590 * enabled at the time of slot creation, or when the two_phase option is
591 * given at the streaming start, provided the plugin supports all the
592 * callbacks for two-phase.
596 /* Mark slot to allow two_phase decoding if not already marked */
611 (
errmsg(
"starting logical decoding for slot \"%s\"",
613 errdetail(
"Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
621 * Returns true if a consistent initial decoding snapshot has been built.
630 * Read from the decoding slot, until it is ready to start extracting changes.
637 /* Initialize from where to start reading WAL. */
640 elog(
DEBUG1,
"searching for logical decoding starting point, starting at %X/%08X",
643 /* Wait for a consistent starting point */
649 /* the read_page callback waits for new WAL */
652 elog(
ERROR,
"could not find logical decoding starting point: %s",
err);
654 elog(
ERROR,
"could not find logical decoding starting point");
658 /* only continue till we found a consistent spot */
673 * Free a previously allocated decoding context, invoking the shutdown
674 * callback if necessary.
689 * Prepare a write using the context's output routine.
695 elog(
ERROR,
"writes are only accepted in commit, begin and change callbacks");
702 * Perform a write using the context's output routine.
708 elog(
ERROR,
"OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
715 * Update progress tracking (if supported).
729 * Load the output plugin, lookup its output plugin init function, and check
730 * that it provides the required callbacks.
740 if (plugin_init == NULL)
741 elog(
ERROR,
"output plugins have to declare the _PG_output_plugin_init symbol");
743 /* ask the output plugin to fill the callback struct */
744 plugin_init(callbacks);
747 elog(
ERROR,
"output plugins have to register a begin callback");
749 elog(
ERROR,
"output plugins have to register a change callback");
751 elog(
ERROR,
"output plugins have to register a commit callback");
759 /* not all callbacks have an associated LSN */
761 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
764 state->callback_name,
767 errcontext(
"slot \"%s\", output plugin \"%s\", in the %s callback",
770 state->callback_name);
781 /* Push callback + info on the error context stack */
783 state.callback_name =
"startup";
790 /* set output state */
794 /* do the actual work: call callback */
797 /* Pop the error context stack */
809 /* Push callback + info on the error context stack */
811 state.callback_name =
"shutdown";
818 /* set output state */
822 /* do the actual work: call callback */
825 /* Pop the error context stack */
831 * Callbacks for ReorderBuffer which add in some more information and then call
832 * output_plugin.h plugins.
843 /* Push callback + info on the error context stack */
845 state.callback_name =
"begin";
852 /* set output state */
858 /* do the actual work: call callback */
861 /* Pop the error context stack */
875 /* Push callback + info on the error context stack */
877 state.callback_name =
"commit";
878 state.report_location = txn->
final_lsn;
/* beginning of commit record */
884 /* set output state */
890 /* do the actual work: call callback */
893 /* Pop the error context stack */
898 * The functionality of begin_prepare is quite similar to begin with the
899 * exception that this will have gid (global transaction id) information which
900 * can be used by plugin. Now, we thought about extending the existing begin
901 * but that would break the replication protocol and additionally this looks
913 /* We're only supposed to call this when two-phase commits are supported */
916 /* Push callback + info on the error context stack */
918 state.callback_name =
"begin_prepare";
925 /* set output state */
932 * If the plugin supports two-phase commits then begin prepare callback is
937 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
938 errmsg(
"logical replication at prepare time requires a %s callback",
939 "begin_prepare_cb")));
941 /* do the actual work: call callback */
944 /* Pop the error context stack */
958 /* We're only supposed to call this when two-phase commits are supported */
961 /* Push callback + info on the error context stack */
963 state.callback_name =
"prepare";
964 state.report_location = txn->
final_lsn;
/* beginning of prepare record */
970 /* set output state */
977 * If the plugin supports two-phase commits then prepare callback is
982 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
983 errmsg(
"logical replication at prepare time requires a %s callback",
986 /* do the actual work: call callback */
989 /* Pop the error context stack */
1003 /* We're only supposed to call this when two-phase commits are supported */
1006 /* Push callback + info on the error context stack */
1008 state.callback_name =
"commit_prepared";
1009 state.report_location = txn->
final_lsn;
/* beginning of commit record */
1015 /* set output state */
1022 * If the plugin support two-phase commits then commit prepared callback
1027 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1028 errmsg(
"logical replication at prepare time requires a %s callback",
1029 "commit_prepared_cb")));
1031 /* do the actual work: call callback */
1034 /* Pop the error context stack */
1049 /* We're only supposed to call this when two-phase commits are supported */
1052 /* Push callback + info on the error context stack */
1054 state.callback_name =
"rollback_prepared";
1055 state.report_location = txn->
final_lsn;
/* beginning of commit record */
1061 /* set output state */
1068 * If the plugin support two-phase commits then rollback prepared callback
1073 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1074 errmsg(
"logical replication at prepare time requires a %s callback",
1075 "rollback_prepared_cb")));
1077 /* do the actual work: call callback */
1081 /* Pop the error context stack */
1095 /* Push callback + info on the error context stack */
1097 state.callback_name =
"change";
1098 state.report_location = change->
lsn;
1104 /* set output state */
1109 * Report this change's lsn so replies from clients can give an up-to-date
1110 * answer. This won't ever be enough (and shouldn't be!) to confirm
1111 * receipt of this transaction, but it might allow another transaction's
1112 * commit to be confirmed with one message.
1120 /* Pop the error context stack */
1137 /* Push callback + info on the error context stack */
1139 state.callback_name =
"truncate";
1140 state.report_location = change->
lsn;
1146 /* set output state */
1151 * Report this change's lsn so replies from clients can give an up-to-date
1152 * answer. This won't ever be enough (and shouldn't be!) to confirm
1153 * receipt of this transaction, but it might allow another transaction's
1154 * commit to be confirmed with one message.
1162 /* Pop the error context stack */
1176 /* Push callback + info on the error context stack */
1178 state.callback_name =
"filter_prepare";
1185 /* set output state */
1189 /* do the actual work: call callback */
1192 /* Pop the error context stack */
1207 /* Push callback + info on the error context stack */
1209 state.callback_name =
"filter_by_origin";
1216 /* set output state */
1220 /* do the actual work: call callback */
1223 /* Pop the error context stack */
1232 const char *prefix,
Size message_size,
const char *message)
1243 /* Push callback + info on the error context stack */
1245 state.callback_name =
"message";
1246 state.report_location = message_lsn;
1252 /* set output state */
1258 /* do the actual work: call callback */
1260 message_size, message);
1262 /* Pop the error context stack */
1276 /* We're only supposed to call this when streaming is supported. */
1279 /* Push callback + info on the error context stack */
1281 state.callback_name =
"stream_start";
1282 state.report_location = first_lsn;
1288 /* set output state */
1293 * Report this message's lsn so replies from clients can give an
1294 * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1295 * confirm receipt of this transaction, but it might allow another
1296 * transaction's commit to be confirmed with one message.
1302 /* in streaming mode, stream_start_cb is required */
1305 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1306 errmsg(
"logical streaming requires a %s callback",
1307 "stream_start_cb")));
1311 /* Pop the error context stack */
1325 /* We're only supposed to call this when streaming is supported. */
1328 /* Push callback + info on the error context stack */
1330 state.callback_name =
"stream_stop";
1331 state.report_location = last_lsn;
1337 /* set output state */
1342 * Report this message's lsn so replies from clients can give an
1343 * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1344 * confirm receipt of this transaction, but it might allow another
1345 * transaction's commit to be confirmed with one message.
1351 /* in streaming mode, stream_stop_cb is required */
1354 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1355 errmsg(
"logical streaming requires a %s callback",
1356 "stream_stop_cb")));
1360 /* Pop the error context stack */
1374 /* We're only supposed to call this when streaming is supported. */
1377 /* Push callback + info on the error context stack */
1379 state.callback_name =
"stream_abort";
1380 state.report_location = abort_lsn;
1386 /* set output state */
1392 /* in streaming mode, stream_abort_cb is required */
1395 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1396 errmsg(
"logical streaming requires a %s callback",
1397 "stream_abort_cb")));
1401 /* Pop the error context stack */
1416 * We're only supposed to call this when streaming and two-phase commits
1422 /* Push callback + info on the error context stack */
1424 state.callback_name =
"stream_prepare";
1431 /* set output state */
1437 /* in streaming mode with two-phase commits, stream_prepare_cb is required */
1440 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1441 errmsg(
"logical streaming at prepare time requires a %s callback",
1442 "stream_prepare_cb")));
1446 /* Pop the error context stack */
1460 /* We're only supposed to call this when streaming is supported. */
1463 /* Push callback + info on the error context stack */
1465 state.callback_name =
"stream_commit";
1472 /* set output state */
1478 /* in streaming mode, stream_commit_cb is required */
1481 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1482 errmsg(
"logical streaming requires a %s callback",
1483 "stream_commit_cb")));
1487 /* Pop the error context stack */
1501 /* We're only supposed to call this when streaming is supported. */
1504 /* Push callback + info on the error context stack */
1506 state.callback_name =
"stream_change";
1507 state.report_location = change->
lsn;
1513 /* set output state */
1518 * Report this change's lsn so replies from clients can give an up-to-date
1519 * answer. This won't ever be enough (and shouldn't be!) to confirm
1520 * receipt of this transaction, but it might allow another transaction's
1521 * commit to be confirmed with one message.
1527 /* in streaming mode, stream_change_cb is required */
1530 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1531 errmsg(
"logical streaming requires a %s callback",
1532 "stream_change_cb")));
1536 /* Pop the error context stack */
1543 const char *prefix,
Size message_size,
const char *message)
1551 /* We're only supposed to call this when streaming is supported. */
1554 /* this callback is optional */
1558 /* Push callback + info on the error context stack */
1560 state.callback_name =
"stream_message";
1561 state.report_location = message_lsn;
1567 /* set output state */
1573 /* do the actual work: call callback */
1575 message_size, message);
1577 /* Pop the error context stack */
1583 int nrelations,
Relation relations[],
1592 /* We're only supposed to call this when streaming is supported. */
1595 /* this callback is optional */
1599 /* Push callback + info on the error context stack */
1601 state.callback_name =
"stream_truncate";
1602 state.report_location = change->
lsn;
1608 /* set output state */
1613 * Report this change's lsn so replies from clients can give an up-to-date
1614 * answer. This won't ever be enough (and shouldn't be!) to confirm
1615 * receipt of this transaction, but it might allow another transaction's
1616 * commit to be confirmed with one message.
1624 /* Pop the error context stack */
1638 /* Push callback + info on the error context stack */
1640 state.callback_name =
"update_progress_txn";
1641 state.report_location = lsn;
1647 /* set output state */
1652 * Report this change's lsn so replies from clients can give an up-to-date
1653 * answer. This won't ever be enough (and shouldn't be!) to confirm
1654 * receipt of this transaction, but it might allow another transaction's
1655 * commit to be confirmed with one message.
1663 /* Pop the error context stack */
1668 * Set the required catalog xmin horizon for historic snapshots in the current
1671 * Note that in the most cases, we won't be able to immediately use the xmin
1672 * to increase the xmin horizon: we need to wait till the client has confirmed
1673 * receiving current_lsn with LogicalConfirmReceivedLocation().
1678 bool updated_xmin =
false;
1680 bool got_new_xmin =
false;
1689 * don't overwrite if we already have a newer xmin. This can happen if we
1690 * restart decoding in a slot.
1697 * If the client has already confirmed up to this lsn, we directly can
1698 * mark this as accepted. This can happen if we restart decoding in a
1701 else if (current_lsn <= slot->
data.confirmed_flush)
1706 /* our candidate can directly be used */
1707 updated_xmin =
true;
1711 * Only increase if the previous values have been applied, otherwise we
1712 * might never end up updating if the receiver acks too slowly.
1720 * Log new xmin at an appropriate log level after releasing the
1723 got_new_xmin =
true;
1728 elog(
DEBUG1,
"got new catalog xmin %u at %X/%08X", xmin,
1731 /* candidate already valid with the current flush position, apply */
1737 * Mark the minimal LSN (restart_lsn) we need to read to replay all
1738 * transactions that have not yet committed at current_lsn.
1740 * Just like LogicalIncreaseXminForSlot this only takes effect when the
1741 * client has confirmed to have received current_lsn.
1746 bool updated_lsn =
false;
1757 /* don't overwrite if have a newer restart lsn */
1758 if (restart_lsn <= slot->
data.restart_lsn)
1764 * We might have already flushed far enough to directly accept this lsn,
1765 * in this case there is no need to check for existing candidate LSNs
1767 else if (current_lsn <= slot->
data.confirmed_flush)
1773 /* our candidate can directly be used */
1778 * Only increase if the previous values have been applied, otherwise we
1779 * might never end up updating if the receiver acks too slowly. A missed
1780 * value here will just cause some extra effort after reconnecting.
1788 elog(
DEBUG1,
"got new restart lsn %X/%08X at %X/%08X",
1803 elog(
DEBUG1,
"failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
1811 /* candidates are already valid with the current flush position, apply */
1817 * Handle a consumer's confirmation having received all changes up to lsn.
1824 /* Do an unlocked check for candidate_lsn first. */
1828 bool updated_xmin =
false;
1829 bool updated_restart =
false;
1834 /* remember the old restart lsn */
1838 * Prevent moving the confirmed_flush backwards, as this could lead to
1839 * data duplication issues caused by replicating already replicated
1842 * This can happen when a client acknowledges an LSN it doesn't have
1843 * to do anything for, and thus didn't store persistently. After a
1844 * restart, the client can send the prior LSN that it stored
1845 * persistently as an acknowledgement, but we need to ignore such an
1846 * LSN. See similar case handling in CreateDecodingContext.
1851 /* if we're past the location required for bumping xmin, do so */
1856 * We have to write the changed xmin to disk *before* we change
1857 * the in-memory value, otherwise after a crash we wouldn't know
1858 * that some catalog tuples might have been removed already.
1860 * Ensure that by first writing to ->xmin and only update
1861 * ->effective_xmin once the new state is synced to disk. After a
1862 * crash ->effective_xmin is set to ->xmin.
1870 updated_xmin =
true;
1882 updated_restart =
true;
1887 /* first write new xmin to disk, so we know what's up after a crash */
1888 if (updated_xmin || updated_restart)
1890#ifdef USE_INJECTION_POINTS
1897 /* trigger injection point, but only if segment changes */
1904 elog(
DEBUG1,
"updated xmin: %u restart: %u", updated_xmin, updated_restart);
1908 * Now the new xmin is safely on disk, we can let the global value
1909 * advance. We do not take ProcArrayLock or similar since we only
1910 * advance xmin here and there's not much harm done by a concurrent
1911 * computation missing that.
1928 * Prevent moving the confirmed_flush backwards. See comments above
1939 * Clear logical streaming state during (sub)transaction abort.
1949 * Report stats for a slot.
1957 /* Nothing to do if we don't have any replication stats to be sent. */
1961 elog(
DEBUG2,
"UpdateDecodingStats: updating stats %p %" PRId64
" %" PRId64
" %" PRId64
" %" PRId64
" %" PRId64
" %" PRId64
" %" PRId64
" %" PRId64,
1994 * Read up to the end of WAL starting from the decoding slot's restart_lsn.
1995 * Return true if any meaningful/decodable WAL records are encountered,
2001 bool has_pending_wal =
false;
2010 * Create our decoding context in fast_forward mode, passing start_lsn
2011 * as InvalidXLogRecPtr, so that we start processing from the slot's
2016 true,
/* fast_forward */
2023 * Start reading at the slot's restart_lsn, which we know points to a
2028 /* Invalidate non-timetravel entries */
2031 /* Loop until the end of WAL or some changes are processed */
2040 elog(
ERROR,
"could not find record for logical decoding: %s", errm);
2056 /* clear all timetravel entries */
2063 return has_pending_wal;
2067 * Helper function for advancing our logical replication slot forward.
2069 * The slot's restart_lsn is used as start point for reading records, while
2070 * confirmed_flush is used as base point for the decoding context.
2072 * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
2073 * because we need to digest WAL to advance restart_lsn allowing to recycle
2074 * WAL and removal of old catalog tuples. As decoding is done in fast_forward
2075 * mode, no changes are generated anyway.
2077 * *found_consistent_snapshot will be true if the initial decoding snapshot has
2078 * been built; Otherwise, it will be false.
2082 bool *found_consistent_snapshot)
2090 if (found_consistent_snapshot)
2091 *found_consistent_snapshot =
false;
2096 * Create our decoding context in fast_forward mode, passing start_lsn
2097 * as InvalidXLogRecPtr, so that we start processing from my slot's
2102 true,
/* fast_forward */
2109 * Wait for specified streaming replication standby servers (if any)
2110 * to confirm receipt of WAL up to moveto lsn.
2115 * Start reading at the slot's restart_lsn, which we know to point to
2120 /* invalidate non-timetravel entries */
2123 /* Decode records until we reach the requested target */
2130 * Read records. No changes are generated in fast_forward mode,
2131 * but snapbuilder/slot statuses are updated properly.
2135 elog(
ERROR,
"could not find record while advancing replication slot: %s",
2139 * Process the record. Storage-level changes are ignored in
2140 * fast_forward mode, but other modules (such as snapbuilder)
2141 * might still have critical updates to do.
2148 * We used to have bugs where logical decoding would fail to
2149 * preserve the resource owner. That's important here, so
2150 * verify that that doesn't happen anymore. XXX this could be
2151 * removed once it's been battle-tested.
2160 *found_consistent_snapshot =
true;
2167 * If only the confirmed_flush LSN has changed the slot won't get
2168 * marked as dirty by the above. Callers on the walsender
2169 * interface are expected to keep track of their own progress and
2170 * don't need it written out. But SQL-interface users cannot
2171 * specify their own start positions and it's harder for them to
2172 * keep track of their progress, so we should make more of an
2173 * effort to save it for them.
2175 * Dirty the slot so it is written out at the next checkpoint. The
2176 * LSN position advanced to may still be lost on a crash but this
2177 * makes the data consistent after a clean shutdown.
2184 /* free context, call shutdown callback */
2191 /* clear all timetravel entries */
#define pg_attribute_unused()
#define PG_USED_FOR_ASSERTS_ONLY
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
void InvalidateSystemCaches(void)
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr lsn)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
void FreeDecodingContext(LogicalDecodingContext *ctx)
bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void output_plugin_error_callback(void *arg)
static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
bool DecodingContextReady(LogicalDecodingContext *ctx)
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
void UpdateDecodingStats(LogicalDecodingContext *ctx)
void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
void ResetLogicalStreamingState(void)
void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
static LogicalDecodingContext * StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, bool in_create, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
struct LogicalErrorCallbackState LogicalErrorCallbackState
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
void CheckLogicalDecodingRequirements(void)
bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message)
void(* LogicalOutputPluginWriterUpdateProgress)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool skipped_xact)
void(* LogicalOutputPluginWriterWrite)(struct LogicalDecodingContext *lr, XLogRecPtr Ptr, TransactionId xid, bool last_write)
LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
void namestrcpy(Name name, const char *str)
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static const char * plugin
void pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat)
#define PROC_IN_LOGICAL_DECODING
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
ReorderBuffer * ReorderBufferAllocate(void)
void ReorderBufferFree(ReorderBuffer *rb)
ResourceOwner CurrentResourceOwner
void ReplicationSlotMarkDirty(void)
void ReplicationSlotReserveWal(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
void ReplicationSlotsComputeRequiredLSN(void)
void CheckSlotRequirements(void)
#define SlotIsPhysical(slot)
bool IsSyncingReplicationSlots(void)
void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, bool need_full_snapshot, bool in_slot_creation, XLogRecPtr two_phase_at)
void FreeSnapshotBuilder(SnapBuild *builder)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
StringInfo makeStringInfo(void)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
OutputPluginOptions options
struct SnapBuild * snapshot_builder
XLogRecPtr write_location
LogicalOutputPluginWriterPrepareWrite prepare_write
OutputPluginCallbacks callbacks
List * output_plugin_options
LogicalOutputPluginWriterWrite write
struct ReorderBuffer * reorder
LogicalOutputPluginWriterUpdateProgress update_progress
XLogRecPtr report_location
LogicalDecodingContext * ctx
const char * callback_name
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterPrepareCB filter_prepare_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
PgStat_Counter stream_count
PgStat_Counter total_txns
PgStat_Counter total_bytes
PgStat_Counter spill_txns
PgStat_Counter stream_txns
PgStat_Counter spill_count
PgStat_Counter stream_bytes
PgStat_Counter spill_bytes
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
ReorderBufferBeginCB begin
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotInvalidationCause invalidated
XLogRecPtr candidate_xmin_lsn
TransactionId effective_catalog_xmin
XLogRecPtr candidate_restart_valid
TransactionId effective_xmin
XLogRecPtr candidate_restart_lsn
TransactionId candidate_catalog_xmin
ReplicationSlotPersistentData data
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
bool IsTransactionOrTransactionBlock(void)
TransactionId CheckXidAlive
bool IsTransactionState(void)
TransactionId GetTopTransactionIdIfAny(void)
bool RecoveryInProgress(void)
WalLevel GetActiveWalLevelOnStandby(void)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
void XLogReaderFree(XLogReaderState *state)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
void wal_segment_close(XLogReaderState *state)
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)