1/*-------------------------------------------------------------------------
4 * Logical Replication output plugin
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
9 * src/backend/replication/pgoutput/pgoutput.c
11 *-------------------------------------------------------------------------
59 bool transactional,
const char *prefix,
60 Size sz,
const char *message);
96 * Only 3 publication actions are used for row filtering ("insert", "update",
97 * "delete"). See RelationSyncEntry.exprstate[].
106 #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
109 * Entry in the map used to remember which relation schemas we sent.
111 * The schema_sent flag determines if the current schema record for the
112 * relation (and for its ancestor if publish_as_relid is set) was already
113 * sent to the subscriber (in which case we don't need to send it again).
115 * The schema cache on downstream is however updated only at commit time,
116 * and with streamed transactions the commit order may be different from
117 * the order the transactions are sent in. Also, the (sub) transactions
118 * might get aborted so we need to send the schema for each (sub) transaction
119 * so that we don't lose the schema information on abort. For handling this,
120 * we maintain the list of xids (streamed_txns) for those we have already sent
123 * For partitions, 'pubactions' considers not only the table's own
124 * publications, but also those of all of its ancestors.
135 * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 * columns and the 'publish_generated_columns' parameter is set to
137 * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 * indicating that no generated columns should be published, unless
139 * explicitly specified in the column list.
145 /* are we publishing this rel? */
149 * ExprState array for row filter. Different publication actions don't
150 * allow multiple expressions to always be combined into one, because
151 * updates or deletes restrict the column in expression to be part of the
152 * replica identity index whereas inserts do not have this restriction, so
153 * there is one ExprState per publication action.
161 * OID of the relation to publish changes as. For a partition, this may
162 * be set to one of its ancestors whose schema will be used when
163 * replicating changes, if publish_via_partition_root is set for the
169 * Map used when replicating using an ancestor's schema to convert tuples
170 * from partition's type to the ancestor's; NULL if publish_as_relid is
171 * same as 'relid' or if unnecessary due to partition and the ancestor
172 * having identical TupleDesc.
177 * Columns included in the publication, or NULL if all columns are
178 * included implicitly. Note that the attnums in this bitmap are not
179 * shifted by FirstLowInvalidHeapAttributeNumber.
184 * Private context to store additional data for this entry - state for the
185 * row filter expressions, column list, etc.
191 * Maintain a per-transaction level variable to track whether the transaction
192 * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 * messages for empty transactions which saves network bandwidth.
196 * This optimization is not used for prepared transactions because if the
197 * WALSender restarts after prepare of a transaction and before commit prepared
198 * of the same transaction then we won't be able to figure out if we have
199 * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 * because we would have lost the in-memory txndata information that was
201 * present prior to the restart. This will result in sending a spurious
202 * COMMIT PREPARED without a corresponding prepared transaction at the
203 * downstream which would lead to an error when it tries to process it.
205 * XXX We could achieve this optimization by changing protocol to send
206 * additional information so that downstream can detect that the corresponding
207 * prepare has not been sent. However, adding such a check for every
208 * transaction in the downstream could be costly so we might want to do it
211 * We also don't have this optimization for streamed transactions because
212 * they can contain prepared transactions.
219/* Map used to remember which relation schemas we sent. */
239/* row filter routines */
251/* column list routines */
257 * Specify output plugin callbacks
276 /* transaction streaming */
284 /* transaction streaming - two-phase commit */
292 bool protocol_version_given =
false;
293 bool publication_names_given =
false;
294 bool binary_option_given =
false;
295 bool messages_option_given =
false;
296 bool streaming_given =
false;
297 bool two_phase_option_given =
false;
298 bool origin_option_given =
false;
300 /* Initialize optional parameters to defaults */
301 data->binary =
false;
302 data->streaming = LOGICALREP_STREAM_OFF;
303 data->messages =
false;
304 data->two_phase =
false;
305 data->publish_no_origin =
false;
313 /* Check each param, whether or not we recognize it */
314 if (strcmp(defel->
defname,
"proto_version") == 0)
316 unsigned long parsed;
319 if (protocol_version_given)
321 (
errcode(ERRCODE_SYNTAX_ERROR),
322 errmsg(
"conflicting or redundant options")));
323 protocol_version_given =
true;
326 parsed = strtoul(
strVal(defel->
arg), &endptr, 10);
327 if (errno != 0 || *endptr !=
'0円')
329 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 errmsg(
"invalid proto_version")));
334 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
335 errmsg(
"proto_version \"%s\" out of range",
340 else if (strcmp(defel->
defname,
"publication_names") == 0)
342 if (publication_names_given)
344 (
errcode(ERRCODE_SYNTAX_ERROR),
345 errmsg(
"conflicting or redundant options")));
346 publication_names_given =
true;
349 &
data->publication_names))
351 (
errcode(ERRCODE_INVALID_NAME),
352 errmsg(
"invalid publication_names syntax")));
354 else if (strcmp(defel->
defname,
"binary") == 0)
356 if (binary_option_given)
358 (
errcode(ERRCODE_SYNTAX_ERROR),
359 errmsg(
"conflicting or redundant options")));
360 binary_option_given =
true;
364 else if (strcmp(defel->
defname,
"messages") == 0)
366 if (messages_option_given)
368 (
errcode(ERRCODE_SYNTAX_ERROR),
369 errmsg(
"conflicting or redundant options")));
370 messages_option_given =
true;
374 else if (strcmp(defel->
defname,
"streaming") == 0)
378 (
errcode(ERRCODE_SYNTAX_ERROR),
379 errmsg(
"conflicting or redundant options")));
380 streaming_given =
true;
384 else if (strcmp(defel->
defname,
"two_phase") == 0)
386 if (two_phase_option_given)
388 (
errcode(ERRCODE_SYNTAX_ERROR),
389 errmsg(
"conflicting or redundant options")));
390 two_phase_option_given =
true;
394 else if (strcmp(defel->
defname,
"origin") == 0)
398 if (origin_option_given)
401 errmsg(
"conflicting or redundant options"));
402 origin_option_given =
true;
406 data->publish_no_origin =
true;
408 data->publish_no_origin =
false;
411 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
412 errmsg(
"unrecognized origin value: \"%s\"", origin));
418 /* Check required options */
419 if (!protocol_version_given)
421 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
422 errmsg(
"option \"%s\" missing",
"proto_version"));
423 if (!publication_names_given)
425 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
426 errmsg(
"option \"%s\" missing",
"publication_names"));
430 * Initialize this plugin
437 static bool publication_callback_registered =
false;
439 /* Create our memory context for private allocations. */
441 "logical replication output context",
445 "logical replication cache context",
449 "logical replication publication list context",
454 /* This plugin uses binary protocol. */
458 * This is replication start and not slot initialization.
460 * Parse and validate options passed by the client.
464 /* Parse the params and ERROR if we see any we don't recognize */
467 /* Check if we support requested protocol */
470 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
471 errmsg(
"client sent proto_version=%d but server only supports protocol %d or lower",
476 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
477 errmsg(
"client sent proto_version=%d but server only supports protocol %d or higher",
481 * Decide whether to enable streaming. It is disabled by default, in
482 * which case we just update the flag in decoding context. Otherwise
483 * we only allow it with sufficient version of the protocol, and when
484 * the output plugin supports it.
486 if (
data->streaming == LOGICALREP_STREAM_OFF)
488 else if (
data->streaming == LOGICALREP_STREAM_ON &&
491 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
492 errmsg(
"requested proto_version=%d does not support streaming, need %d or higher",
494 else if (
data->streaming == LOGICALREP_STREAM_PARALLEL &&
497 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 errmsg(
"requested proto_version=%d does not support parallel streaming, need %d or higher",
502 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 errmsg(
"streaming requested, but not supported by output plugin")));
506 * Here, we just check whether the two-phase option is passed by
507 * plugin and decide whether to enable it at later point of time. It
508 * remains enabled if the previous start-up has done so. But we only
509 * allow the option to be passed in with sufficient version of the
510 * protocol, and when the output plugin supports it.
512 if (!
data->two_phase)
516 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 errmsg(
"requested proto_version=%d does not support two-phase commit, need %d or higher",
521 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
522 errmsg(
"two-phase commit requested, but not supported by output plugin")));
526 /* Init publication state. */
531 * Register callback for pg_publication if we didn't already do that
532 * during some previous call in this process.
534 if (!publication_callback_registered)
541 publication_callback_registered =
true;
544 /* Initialize relation schema cache. */
550 * Disable the streaming and prepared transactions during the slot
551 * initialization mode.
561 * Don't send the BEGIN message here instead postpone it until the first
562 * change. In logical replication, a common scenario is to replicate a set of
563 * tables (instead of all tables) and transactions whose changes were on
564 * the table(s) that are not published will produce empty transactions. These
565 * empty transactions will send BEGIN and COMMIT messages to subscribers,
566 * using bandwidth on something with little/no use for logical replication.
580 * This is called while processing the first change of the transaction.
596 send_replication_origin);
614 * We don't need to send the commit message unless some relevant change
615 * from this transaction has been sent to the downstream.
624 elog(
DEBUG1,
"skipped replication of an empty transaction with XID: %u", txn->
xid);
634 * BEGIN PREPARE callback
645 send_replication_origin);
665 * COMMIT PREPARED callback
679 * ROLLBACK PREPARED callback
696 * Write the current schema of the relation and its ancestor (if any) if not
710 * Remember XID of the (sub)transaction for the change. We don't care if
711 * it's top-level transaction or not (we have already sent that XID in
712 * start of the current streaming block).
714 * If we're not in a streaming block, just use InvalidTransactionId and
715 * the write methods will not include it.
726 * Do we need to send the schema? We do track streamed transactions
727 * separately, because those may be applied later (and the regular
728 * transactions won't see their effects until then) and in an order that
729 * we don't know at this point.
731 * XXX There is a scope of optimization here. Currently, we always send
732 * the schema first time in a streaming transaction but we can probably
733 * avoid that by checking 'relentry->schema_sent' flag. However, before
734 * doing that we need to study its impact on the case where we have a mix
735 * of streaming and non-streaming transactions.
737 if (
data->in_streaming)
742 /* Nothing to do if we already sent the schema. */
747 * Send the schema. If the changes will be published using an ancestor's
748 * schema, not the relation's own, send that ancestor's schema before
749 * sending relation's own (XXX - maybe sending only the former suffices?).
761 if (
data->in_streaming)
781 * Write out type info if needed. We do that only for user-created types.
782 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
783 * objects with hand-assigned OIDs to be "built in", not for instance any
784 * function or type defined in the information_schema. This is important
785 * because only hand-assigned OIDs can be expected to remain stable across
793 include_gencols_type))
806 include_gencols_type);
811 * Executor state preparation for evaluation of row filter expressions for the
812 * specified relation.
826 rte->relkind = rel->
rd_rel->relkind;
840 * Evaluates row filter.
842 * If the row filter evaluates to NULL, it is taken as false i.e. the change
855 elog(
DEBUG3,
"row filter evaluates to %s (isnull: %s)",
856 isnull ?
"false" :
DatumGetBool(ret) ?
"true" :
"false",
857 isnull ?
"true" :
"false");
866 * Make sure the per-entry memory context exists.
873 /* The context may already exist, in which case bail out. */
880 "entry private context",
888 * Initialize the row filter.
896 bool no_filter[] = {
false,
false,
false};
/* One per pubaction */
899 bool has_filter =
true;
903 * Find if there are any row filters for this relation. If there are, then
904 * prepare the necessary ExprState and cache it in entry->exprstate. To
905 * build an expression state, we need to ensure the following:
907 * All the given publication-table mappings must be checked.
909 * Multiple publications might have multiple row filters for this
910 * relation. Since row filter usage depends on the DML operation, there
911 * are multiple lists (one for each operation) to which row filters will
914 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
915 * expression" so it takes precedence.
917 foreach(lc, publications)
922 bool pub_no_filter =
true;
925 * If the publication is FOR ALL TABLES, or the publication includes a
926 * FOR TABLES IN SCHEMA where the table belongs to the referred
927 * schema, then it is treated the same as if there are no row filters
928 * (even if other publications have a row filter).
936 * Check for the presence of a row filter in this publication.
944 /* Null indicates no filter. */
946 Anum_pg_publication_rel_prqual,
961 * Quick exit if all the DML actions are publicized via this
972 /* No additional work for this publication. Next one. */
976 /* Form the per pubaction row filter lists. */
988 }
/* loop all subscribed publications */
990 /* Clean the row filter */
1007 * Now all the filters for all pubactions are known. Combine them when
1008 * their pubactions are the same.
1020 foreach(lc, rfnodes[
idx])
1023 /* combine the row filter and cache the ExprState */
1026 }
/* for each pubaction */
1034 * If the table contains a generated column, check for any conflicting
1035 * values of 'publish_generated_columns' parameter in the publications.
1043 bool gencolpresent =
false;
1046 /* Check if there is any generated column present. */
1047 for (
int i = 0;
i < desc->
natts;
i++)
1051 if (att->attgenerated)
1053 gencolpresent =
true;
1058 /* There are no generated columns to be published. */
1066 * There may be a conflicting value for 'publish_generated_columns'
1067 * parameter in the publications.
1072 * The column list takes precedence over the
1073 * 'publish_generated_columns' parameter. Those will be checked later,
1074 * see pgoutput_column_list_init.
1086 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1087 errmsg(
"cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1094 * Initialize the column list.
1103 bool found_pub_collist =
false;
1109 * Find if there are any column lists for this relation. If there are,
1110 * build a bitmap using the column lists.
1112 * Multiple publications might have multiple column lists for this
1115 * Note that we don't support the case where the column list is different
1116 * for the same table when combining publications. See comments atop
1117 * fetch_table_list. But one can later change the publication so we still
1118 * need to check all the given publication-table mappings and report an
1119 * error if any publications have a different column list.
1121 foreach(lc, publications)
1126 /* Retrieve the bitmap of columns for a column list publication. */
1132 * For non-column list publications — e.g. TABLE (without a column
1133 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1134 * of the table (including generated columns when
1135 * 'publish_generated_columns' parameter is true).
1140 * Cache the table columns for the first publication with no
1141 * specified column list to detect publication with a different
1163 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1164 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
1167 }
/* loop all subscribed publications */
1170 * If no column list publications exist, columns to be published will be
1171 * computed later according to the 'publish_generated_columns' parameter.
1173 if (!found_pub_collist)
1180 * Initialize the slot for storing new and old tuples, and build the map that
1181 * will be used to convert the relation's tuples into the ancestor's format.
1194 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1195 * live as long as the cache remains.
1206 * Cache the map that will be used to convert the relation's tuples into
1207 * the ancestor's format, if needed.
1215 /* Map must live as long as the logical decoding context. */
1226 * Change is checked against the row filter if any.
1228 * Returns true if the change is to be replicated, else false.
1230 * For inserts, evaluate the row filter for new tuple.
1231 * For deletes, evaluate the row filter for old tuple.
1232 * For updates, evaluate the row filter for old and new tuple.
1234 * For updates, if both evaluations are true, we allow sending the UPDATE and
1235 * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1236 * only one of the tuples matches the row filter expression, we transform
1237 * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1240 * Case 1: old-row (no match) new-row (no match) -> (drop change)
1241 * Case 2: old-row (no match) new row (match) -> INSERT
1242 * Case 3: old-row (match) new-row (no match) -> DELETE
1243 * Case 4: old-row (match) new row (match) -> UPDATE
1245 * The new action is updated in the action parameter.
1247 * The new slot could be updated when transforming the UPDATE into INSERT,
1248 * because the original new tuple might not have column values from the replica
1252 * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1253 * Since the old tuple satisfies, the initial table synchronization copied this
1254 * row (or another method was used to guarantee that there is data
1255 * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1256 * row filter, so from a data consistency perspective, that row should be
1257 * removed on the subscriber. The UPDATE should be transformed into a DELETE
1258 * statement and be sent to the subscriber. Keeping this row on the subscriber
1259 * is undesirable because it doesn't reflect what was defined in the row filter
1260 * expression on the publisher. This row on the subscriber would likely not be
1261 * modified by replication again. If someone inserted a new row with the same
1262 * old identifier, replication could stop due to a constraint violation.
1264 * Let's say the old tuple doesn't match the row filter but the new tuple does.
1265 * Since the old tuple doesn't satisfy, the initial table synchronization
1266 * probably didn't copy this row. However, after the UPDATE the new tuple does
1267 * satisfy the row filter, so from a data consistency perspective, that row
1268 * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1269 * statements have no effect (it matches no row -- see
1270 * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1271 * INSERT statement and be sent to the subscriber. However, this might surprise
1272 * someone who expects the data set to satisfy the row filter expression on the
1291 * We need this map to avoid relying on ReorderBufferChangeType enums
1292 * having specific values.
1294 static const int map_changetype_pubaction[] = {
1304 Assert(new_slot || old_slot);
1306 /* Get the corresponding row filter */
1307 filter_exprstate = entry->
exprstate[map_changetype_pubaction[*
action]];
1309 /* Bail out if there is no row filter */
1310 if (!filter_exprstate)
1313 elog(
DEBUG3,
"table \"%s.%s\" has row filter",
1322 * For the following occasions where there is only one tuple, we can
1323 * evaluate the row filter for that tuple and return.
1325 * For inserts, we only have the new tuple.
1327 * For updates, we can have only a new tuple when none of the replica
1328 * identity columns changed and none of those columns have external data
1329 * but we still need to evaluate the row filter for the new tuple as the
1330 * existing values of those columns might not match the filter. Also,
1331 * users can use constant expressions in the row filter, so we anyway need
1332 * to evaluate it for the new tuple.
1334 * For deletes, we only have the old tuple.
1336 if (!new_slot || !old_slot)
1345 * Both the old and new tuples must be valid only for updates and need to
1346 * be checked against the row filter.
1353 tmp_new_slot = NULL;
1357 * The new tuple might not have all the replica identity columns, in which
1358 * case it needs to be copied over from the old tuple.
1365 * if the column in the new tuple or old tuple is null, nothing to do
1371 * Unchanged toasted replica identity columns are only logged in the
1372 * old tuple. Copy this over to the new tuple. The changed (or WAL
1373 * Logged) toast values are always assembled in memory and set as
1374 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1388 desc->
natts *
sizeof(
bool));
1410 * Case 1: if both tuples don't match the row filter, bailout. Send
1413 if (!old_matched && !new_matched)
1417 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1418 * tuple does, transform the UPDATE into INSERT.
1420 * Use the newly transformed tuple that must contain the column values for
1421 * all the replica identity columns. This is required to ensure that the
1422 * while inserting the tuple in the downstream node, we have all the
1423 * required column values.
1425 if (!old_matched && new_matched)
1430 *new_slot_ptr = tmp_new_slot;
1434 * Case 3: if the old tuple satisfies the row filter but the new tuple
1435 * doesn't, transform the UPDATE into DELETE.
1437 * This transformation does not require another tuple. The Old tuple will
1438 * be used for DELETE.
1440 else if (old_matched && !new_matched)
1444 * Case 4: if both tuples match the row filter, transformation isn't
1445 * required. (*action is default UPDATE).
1452 * Sends the decoded DML over wire.
1454 * This is called both in streaming and non-streaming modes.
1475 * Remember the xid for the change in streaming mode. We need to send xid
1476 * with each change in the streaming mode so that subscriber can make
1477 * their association and on aborts, it can discard the corresponding
1480 if (
data->in_streaming)
1485 /* First check the table filter */
1501 * This is only possible if deletes are allowed even when replica
1502 * identity is not defined for a table. Since the DELETE action
1503 * can't be published, we simply return.
1507 elog(
DEBUG1,
"didn't send DELETE change because of missing oldtuple");
1515 /* Avoid leaking memory by using and resetting our own context */
1518 /* Switch relation if publishing via root. */
1523 targetrel = ancestor;
1531 /* Convert tuple if needed. */
1546 /* Convert tuple if needed. */
1559 * Updates could be transformed to inserts or deletes based on the results
1560 * of the row filter for old and new tuple.
1566 * Send BEGIN if we haven't yet.
1568 * We send the BEGIN message after ensuring that we will actually send the
1569 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1572 if (txndata && !txndata->sent_begin_txn)
1576 * Schema should be sent using the original relation because it also sends
1577 * the ancestor's relation.
1614 /* Drop the new slots that were used to store the converted tuples. */
1641 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1642 if (
data->in_streaming)
1650 for (
i = 0;
i < nrelations;
i++)
1664 * Don't send partitions if the publication wants to send only the
1665 * root tables through it.
1667 if (relation->
rd_rel->relispartition &&
1671 relids[nrelids++] = relid;
1673 /* Send BEGIN if we haven't yet */
1674 if (txndata && !txndata->sent_begin_txn)
1698 XLogRecPtr message_lsn,
bool transactional,
const char *prefix,
Size sz,
1699 const char *message)
1708 * Remember the xid for the message in streaming mode. See
1711 if (
data->in_streaming)
1715 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1721 /* Send BEGIN if we haven't yet */
1738 * Return true if the data is associated with an origin and the user has
1739 * requested the changes that don't have an origin, false otherwise.
1754 * Shutdown the output plugin.
1756 * Note, we don't need to clean the data->context, data->cachectx, and
1757 * data->pubctx as they are child contexts of the ctx->context so they
1758 * will be cleaned up by logical decoding machinery.
1771 * Load publications from the list of publication names.
1773 * Here, we skip the publications that don't exist yet. This will allow us
1774 * to silently continue the replication in the absence of a missing publication.
1775 * This is required because we allow the users to create publications after they
1776 * have specified the required publications at the time of replication start.
1784 foreach(lc, pubnames)
1786 char *pubname = (
char *)
lfirst(lc);
1790 result =
lappend(result, pub);
1793 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1794 errmsg(
"skipped loading publication \"%s\"", pubname),
1795 errdetail(
"The publication does not exist at this point in the WAL."),
1796 errhint(
"Create the publication if it does not exist."));
1803 * Publication syscache invalidation callback.
1805 * Called for invalidations on pg_publication.
1814 * START STREAM callback
1823 /* we can't nest streaming of transactions */
1827 * If we already sent the first stream for this transaction then don't
1828 * send the origin id in the subsequent streams.
1831 send_replication_origin =
false;
1837 send_replication_origin);
1841 /* we're streaming a chunk of transaction now */
1842 data->in_streaming =
true;
1846 * STOP STREAM callback
1854 /* we should be streaming a transaction */
1861 /* we've stopped streaming a transaction */
1862 data->in_streaming =
false;
1866 * Notify downstream to discard the streamed transaction (along with all
1867 * its subtransactions, if it's a toplevel transaction).
1876 bool write_abort_info = (
data->streaming == LOGICALREP_STREAM_PARALLEL);
1879 * The abort should happen outside streaming block, even for streamed
1880 * transactions. The transaction has to be marked as streamed, though.
1884 /* determine the toplevel transaction */
1899 * Notify downstream to apply the streamed transaction (along with all
1900 * its subtransactions).
1910 * The commit should happen outside streaming block, even for streamed
1911 * transactions. The transaction has to be marked as streamed, though.
1926 * PREPARE callback (for streaming two-phase commit).
1928 * Notify the downstream to prepare the transaction.
1944 * Initialize the relation schema sync cache for a decoding session.
1946 * The hash table is destroyed at the end of a decoding session. While
1947 * relcache invalidations still exist and will still be invoked, they
1948 * will just see the null hash table global and take no action.
1954 static bool relation_callbacks_registered =
false;
1956 /* Nothing to do if hash table already exists */
1960 /* Make a new hash table for the cache */
1961 ctl.keysize =
sizeof(
Oid);
1963 ctl.hcxt = cachectx;
1971 /* No more to do if we already registered callbacks */
1972 if (relation_callbacks_registered)
1975 /* We must update the cache entry for a relation after a relcache flush */
1979 * Flush all cache entries after a pg_namespace change, in case it was a
1980 * schema rename affecting a relation being replicated.
1982 * XXX: It is not a good idea to invalidate all the relation entries in
1983 * RelationSyncCache on schema rename. We can optimize it to invalidate
1984 * only the required relations by either having a specific invalidation
1985 * message containing impacted relations or by having schema information
1986 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1987 * passed to the callback.
1993 relation_callbacks_registered =
true;
1997 * We expect relatively small number of streamed transactions.
2006 * Add the xid in the rel sync entry for which we have already sent the schema
2022 * Find or create entry in the relation schema cache.
2024 * This looks up publications that the given relation is directly or
2025 * indirectly part of (the latter if it's really the relation's ancestor that
2026 * is part of a publication) and fills up the found entry with the information
2027 * about which operations to publish and whether to use an ancestor's schema
2040 /* Find cached relation info, creating if not found */
2046 /* initialize entry, if it's new */
2064 /* Validate the entry */
2071 * We don't acquire a lock on the namespace system table as we build
2072 * the cache entry using a historic snapshot and all the later changes
2073 * are absorbed while decoding WAL.
2077 Oid publish_as_relid = relid;
2078 int publish_ancestor_level = 0;
2083 /* Reload publications if needed before use. */
2095 * Reset schema_sent status as the relation definition may have
2096 * changed. Also reset pubactions to empty in case rel was dropped
2097 * from a publication. Also free any objects that depended on the
2098 * earlier definition.
2112 * Tuple slots cleanups. (Will be rebuilt later if needed).
2123 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2124 * do it now to avoid any leaks.
2137 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2138 * do it now to avoid any leaks.
2151 * Row filter cache cleanups.
2161 * Build publication cache. We can't use one provided by relcache as
2162 * relcache considers all publications that the given relation is in,
2163 * but here we only need to consider ones that the subscriber
2166 foreach(lc,
data->publications)
2169 bool publish =
false;
2172 * Under what relid should we publish changes in this publication?
2173 * We'll use the top-most relid across all publications. Also
2174 * track the ancestor level for this publication.
2176 Oid pub_relid = relid;
2177 int ancestor_level = 0;
2180 * If this is a FOR ALL TABLES publication, pick the partition
2181 * root and set the ancestor level accordingly.
2197 bool ancestor_published =
false;
2200 * For a partition, check if any of the ancestors are
2201 * published. If so, note down the topmost ancestor that is
2202 * published via this publication, which will be used as the
2203 * relation via which to publish the partition's changes.
2217 ancestor_published =
true;
2220 pub_relid = ancestor;
2221 ancestor_level = level;
2233 * If the relation is to be published, determine actions to
2234 * publish, and list of columns, if appropriate.
2236 * Don't publish changes for partitioned tables, because
2237 * publishing those of its partitions suffices, unless partition
2238 * changes won't be published due to pubviaroot being set.
2241 (relkind != RELKIND_PARTITIONED_TABLE || pub->
pubviaroot))
2249 * We want to publish the changes as the top-most ancestor
2250 * across all publications. So we need to check if the already
2251 * calculated level is higher than the new one. If yes, we can
2252 * ignore the new value (as it's a child). Otherwise the new
2253 * value is an ancestor, so we keep it.
2255 if (publish_ancestor_level > ancestor_level)
2259 * If we found an ancestor higher up in the tree, discard the
2260 * list of publications through which we replicate it, and use
2263 if (publish_ancestor_level < ancestor_level)
2265 publish_as_relid = pub_relid;
2266 publish_ancestor_level = ancestor_level;
2268 /* reset the publication list for this relation */
2269 rel_publications =
NIL;
2273 /* Same ancestor level, has to be the same OID. */
2274 Assert(publish_as_relid == pub_relid);
2277 /* Track publications for this ancestor. */
2278 rel_publications =
lappend(rel_publications, pub);
2285 * Initialize the tuple slot, map, and row filter. These are only used
2286 * when publishing inserts, updates, or deletes.
2291 /* Initialize the tuple slot and map */
2294 /* Initialize the row filter */
2297 /* Check whether to publish generated columns. */
2300 /* Initialize the column list */
2315 * Cleanup list of streamed transactions and update the schema_sent flag.
2317 * When a streamed transaction commits or aborts, we need to remove the
2318 * toplevel XID from the schema cache. If the transaction aborted, the
2319 * subscriber will simply throw away the schema records we streamed, so
2320 * we don't need to do anything else.
2322 * If the transaction is committed, the subscriber will update the relation
2323 * cache - so tweak the schema_sent flag accordingly.
2337 * We can set the schema_sent flag for an entry that has committed xid
2338 * in the list as that ensures that the subscriber would have the
2339 * corresponding schema and we don't need to send it unless there is
2340 * any invalidation for that relation.
2344 if (xid == streamed_txn)
2358 * Relcache invalidation callback
2366 * We can get here if the plugin was used in SQL interface as the
2367 * RelationSyncCache is destroyed when the decoding finishes, but there is
2368 * no way to unregister the relcache invalidation callback.
2374 * Nobody keeps pointers to entries in this hash table around outside
2375 * logical decoding callback calls - but invalidation events can come in
2376 * *during* a callback if we do any syscache access in the callback.
2377 * Because of that we must mark the cache entry as invalid but not damage
2378 * any of its substructure here. The next get_rel_sync_entry() call will
2384 * Getting invalidations for relations that aren't in the table is
2385 * entirely normal. So we don't care if it's found or not.
2394 /* Whole cache must be flushed. */
2406 * Publication relation/schema map syscache invalidation callback
2408 * Called for invalidations on pg_namespace.
2417 * We can get here if the plugin was used in SQL interface as the
2418 * RelationSyncCache is destroyed when the decoding finishes, but there is
2419 * no way to unregister the invalidation callbacks.
2425 * We have no easy way to identify which cache entries this invalidation
2426 * event might have affected, so just mark them all invalid.
2435/* Send Replication origin */
2445 * XXX: which behaviour do we want here?
2448 * - don't send origin message if origin name not found
2449 * (that's what we do now)
2450 * - throw error - that will break replication, not good
2451 * - send some special "unknown" origin
2456 /* Message boundary */
Datum idx(PG_FUNCTION_ARGS)
void free_attrmap(AttrMap *map)
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Bitmapset * bms_make_singleton(int x)
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
void bms_free(Bitmapset *a)
static void cleanup(void)
#define TextDatumGetCString(d)
#define PG_USED_FOR_ASSERTS_ONLY
#define OidIsValid(objectId)
char * defGetString(DefElem *def)
bool defGetBoolean(DefElem *def)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void hash_destroy(HTAB *hashp)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsVirtual
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsHeapTuple
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
EState * CreateExecutorState(void)
#define ResetPerTupleExprContext(estate)
#define GetPerTupleExprContext(estate)
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, Datum arg)
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
if(TABLE==NULL||TABLE_index==NULL)
List * lappend(List *list, void *datum)
List * lappend_xid(List *list, TransactionId datum)
bool list_member_xid(const List *list, TransactionId datum)
void list_free(List *list)
bool list_member_oid(const List *list, Oid datum)
void list_free_deep(List *list)
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
#define LOGICALREP_PROTO_MIN_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_MAX_VERSION_NUM
bool get_rel_relispartition(Oid relid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_namespace_name(Oid nspid)
Expr * make_orclause(List *orclauses)
void MemoryContextReset(MemoryContext context)
void * MemoryContextAllocZero(MemoryContext context, Size size)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CacheMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define ALLOCSET_SMALL_SIZES
#define MemoryContextCopyAndSetIdentifier(cxt, id)
#define IsA(nodeptr, _type_)
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
#define InvalidRepOriginId
@ OUTPUT_PLUGIN_BINARY_OUTPUT
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
List * get_partition_ancestors(Oid relid)
FormData_pg_attribute * Form_pg_attribute
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
#define foreach_xid(var, lst)
#define foreach_ptr(type, var, lst)
List * GetRelationPublications(Oid relid)
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
bool is_publishable_relation(Relation rel)
static List * LoadPublications(List *pubnames)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
struct RelationSyncEntry RelationSyncEntry
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
static void parse_output_parameters(List *options, PGOutputData *data)
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
#define NUM_ROWFILTER_PUBACTIONS
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
struct PGOutputTxnData PGOutputTxnData
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
static void init_rel_sync_cache(MemoryContext cachectx)
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
PG_MODULE_MAGIC_EXT(.name="pgoutput",.version=PG_VERSION)
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static HTAB * RelationSyncCache
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
static bool publications_valid
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
static EState * create_estate_for_relation(Relation rel)
int pg_strcasecmp(const char *s1, const char *s2)
static bool DatumGetBool(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static Pointer DatumGetPointer(Datum X)
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
void logicalrep_write_stream_stop(StringInfo out)
void * stringToNode(const char *str)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationIsValid(relation)
#define RelationGetNamespace(relation)
Relation RelationIdGetRelation(Oid relationId)
void RelationClose(Relation relation)
#define rbtxn_is_streamed(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
@ REORDER_BUFFER_CHANGE_INSERT
@ REORDER_BUFFER_CHANGE_DELETE
@ REORDER_BUFFER_CHANGE_UPDATE
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
TupleTableSlot * ecxt_scantuple
void * output_plugin_private
List * output_plugin_options
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
PublicationActions pubactions
TupleTableSlot * old_slot
PublishGencolsType include_gencols_type
TupleTableSlot * new_slot
struct ReorderBufferChange::@114::@116 truncate
ReorderBufferChangeType action
struct ReorderBufferTXN * txn
struct ReorderBufferChange::@114::@115 tp
union ReorderBufferChange::@114 data
void * output_plugin_private
TupleDesc tts_tupleDescriptor
char defGetStreamingMode(DefElem *def)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
#define SearchSysCacheExists2(cacheId, key1, key2)
#define InvalidTransactionId
#define FirstGenbkiObjectId
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
void FreeTupleDesc(TupleDesc tupdesc)
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static void slot_getallattrs(TupleTableSlot *slot)
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
CommandId GetCurrentCommandId(bool used)
#define InvalidXLogRecPtr