PostgreSQL Source Code: src/backend/replication/pgoutput/pgoutput.c Source File

PostgreSQL Source Code git master
pgoutput.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pgoutput.c
4 * Logical Replication output plugin
5 *
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/tupconvert.h"
16#include "catalog/partition.h"
17#include "catalog/pg_publication.h"
18#include "catalog/pg_publication_rel.h"
19#include "catalog/pg_subscription.h"
20#include "commands/defrem.h"
21#include "commands/subscriptioncmds.h"
22#include "executor/executor.h"
23#include "fmgr.h"
24#include "nodes/makefuncs.h"
25#include "parser/parse_relation.h"
26#include "replication/logical.h"
27#include "replication/logicalproto.h"
28#include "replication/origin.h"
29#include "replication/pgoutput.h"
30#include "rewrite/rewriteHandler.h"
31#include "utils/builtins.h"
32#include "utils/inval.h"
33#include "utils/lsyscache.h"
34#include "utils/memutils.h"
35#include "utils/rel.h"
36#include "utils/syscache.h"
37#include "utils/varlena.h"
38
39 PG_MODULE_MAGIC_EXT(
40 .name = "pgoutput",
41 .version = PG_VERSION
42);
43
44static void pgoutput_startup(LogicalDecodingContext *ctx,
45 OutputPluginOptions *opt, bool is_init);
46static void pgoutput_shutdown(LogicalDecodingContext *ctx);
47static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
48 ReorderBufferTXN *txn);
49static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
50 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
51static void pgoutput_change(LogicalDecodingContext *ctx,
52 ReorderBufferTXN *txn, Relation relation,
53 ReorderBufferChange *change);
54static void pgoutput_truncate(LogicalDecodingContext *ctx,
55 ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 ReorderBufferChange *change);
57static void pgoutput_message(LogicalDecodingContext *ctx,
58 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 bool transactional, const char *prefix,
60 Size sz, const char *message);
61static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
62 RepOriginId origin_id);
63static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
64 ReorderBufferTXN *txn);
65static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
66 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
67static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
68 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
69static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
70 ReorderBufferTXN *txn,
71 XLogRecPtr prepare_end_lsn,
72 TimestampTz prepare_time);
73static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 ReorderBufferTXN *txn);
75static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 ReorderBufferTXN *txn);
77static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
78 ReorderBufferTXN *txn,
79 XLogRecPtr abort_lsn);
80static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
81 ReorderBufferTXN *txn,
82 XLogRecPtr commit_lsn);
83static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
84 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85
86 static bool publications_valid;
87
88static List *LoadPublications(List *pubnames);
89static void publication_invalidation_cb(Datum arg, int cacheid,
90 uint32 hashvalue);
91static void send_repl_origin(LogicalDecodingContext *ctx,
92 RepOriginId origin_id, XLogRecPtr origin_lsn,
93 bool send_origin);
94
95/*
96 * Only 3 publication actions are used for row filtering ("insert", "update",
97 * "delete"). See RelationSyncEntry.exprstate[].
98 */
99 enum RowFilterPubAction
100{
101 PUBACTION_INSERT,
102 PUBACTION_UPDATE,
103 PUBACTION_DELETE,
104};
105
106 #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107
108/*
109 * Entry in the map used to remember which relation schemas we sent.
110 *
111 * The schema_sent flag determines if the current schema record for the
112 * relation (and for its ancestor if publish_as_relid is set) was already
113 * sent to the subscriber (in which case we don't need to send it again).
114 *
115 * The schema cache on downstream is however updated only at commit time,
116 * and with streamed transactions the commit order may be different from
117 * the order the transactions are sent in. Also, the (sub) transactions
118 * might get aborted so we need to send the schema for each (sub) transaction
119 * so that we don't lose the schema information on abort. For handling this,
120 * we maintain the list of xids (streamed_txns) for those we have already sent
121 * the schema.
122 *
123 * For partitions, 'pubactions' considers not only the table's own
124 * publications, but also those of all of its ancestors.
125 */
126 typedef struct RelationSyncEntry
127{
128 Oid relid; /* relation oid */
129
130 bool replicate_valid; /* overall validity flag for entry */
131
132 bool schema_sent;
133
134 /*
135 * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 * columns and the 'publish_generated_columns' parameter is set to
137 * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 * indicating that no generated columns should be published, unless
139 * explicitly specified in the column list.
140 */
141 PublishGencolsType include_gencols_type;
142 List *streamed_txns; /* streamed toplevel transactions with this
143 * schema */
144
145 /* are we publishing this rel? */
146 PublicationActions pubactions;
147
148 /*
149 * ExprState array for row filter. Different publication actions don't
150 * allow multiple expressions to always be combined into one, because
151 * updates or deletes restrict the column in expression to be part of the
152 * replica identity index whereas inserts do not have this restriction, so
153 * there is one ExprState per publication action.
154 */
155 ExprState *exprstate[NUM_ROWFILTER_PUBACTIONS];
156 EState *estate; /* executor state used for row filter */
157 TupleTableSlot *new_slot; /* slot for storing new tuple */
158 TupleTableSlot *old_slot; /* slot for storing old tuple */
159
160 /*
161 * OID of the relation to publish changes as. For a partition, this may
162 * be set to one of its ancestors whose schema will be used when
163 * replicating changes, if publish_via_partition_root is set for the
164 * publication.
165 */
166 Oid publish_as_relid;
167
168 /*
169 * Map used when replicating using an ancestor's schema to convert tuples
170 * from partition's type to the ancestor's; NULL if publish_as_relid is
171 * same as 'relid' or if unnecessary due to partition and the ancestor
172 * having identical TupleDesc.
173 */
174 AttrMap *attrmap;
175
176 /*
177 * Columns included in the publication, or NULL if all columns are
178 * included implicitly. Note that the attnums in this bitmap are not
179 * shifted by FirstLowInvalidHeapAttributeNumber.
180 */
181 Bitmapset *columns;
182
183 /*
184 * Private context to store additional data for this entry - state for the
185 * row filter expressions, column list, etc.
186 */
187 MemoryContext entry_cxt;
188 } RelationSyncEntry;
189
190/*
191 * Maintain a per-transaction level variable to track whether the transaction
192 * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 * messages for empty transactions which saves network bandwidth.
195 *
196 * This optimization is not used for prepared transactions because if the
197 * WALSender restarts after prepare of a transaction and before commit prepared
198 * of the same transaction then we won't be able to figure out if we have
199 * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 * because we would have lost the in-memory txndata information that was
201 * present prior to the restart. This will result in sending a spurious
202 * COMMIT PREPARED without a corresponding prepared transaction at the
203 * downstream which would lead to an error when it tries to process it.
204 *
205 * XXX We could achieve this optimization by changing protocol to send
206 * additional information so that downstream can detect that the corresponding
207 * prepare has not been sent. However, adding such a check for every
208 * transaction in the downstream could be costly so we might want to do it
209 * optionally.
210 *
211 * We also don't have this optimization for streamed transactions because
212 * they can contain prepared transactions.
213 */
214 typedef struct PGOutputTxnData
215{
216 bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
217 } PGOutputTxnData;
218
219/* Map used to remember which relation schemas we sent. */
220 static HTAB *RelationSyncCache = NULL;
221
222static void init_rel_sync_cache(MemoryContext cachectx);
223static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
224static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
225 Relation relation);
226static void send_relation_and_attrs(Relation relation, TransactionId xid,
227 LogicalDecodingContext *ctx,
228 RelationSyncEntry *relentry);
229static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 uint32 hashvalue);
232static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
233 TransactionId xid);
234static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
235 TransactionId xid);
236static void init_tuple_slot(PGOutputData *data, Relation relation,
237 RelationSyncEntry *entry);
238
239/* row filter routines */
240static EState *create_estate_for_relation(Relation rel);
241static void pgoutput_row_filter_init(PGOutputData *data,
242 List *publications,
243 RelationSyncEntry *entry);
244static bool pgoutput_row_filter_exec_expr(ExprState *state,
245 ExprContext *econtext);
246static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
247 TupleTableSlot **new_slot_ptr,
248 RelationSyncEntry *entry,
249 ReorderBufferChangeType *action);
250
251/* column list routines */
252static void pgoutput_column_list_init(PGOutputData *data,
253 List *publications,
254 RelationSyncEntry *entry);
255
256/*
257 * Specify output plugin callbacks
258 */
259void
260 _PG_output_plugin_init(OutputPluginCallbacks *cb)
261{
262 cb->startup_cb = pgoutput_startup;
263 cb->begin_cb = pgoutput_begin_txn;
264 cb->change_cb = pgoutput_change;
265 cb->truncate_cb = pgoutput_truncate;
266 cb->message_cb = pgoutput_message;
267 cb->commit_cb = pgoutput_commit_txn;
268
269 cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
270 cb->prepare_cb = pgoutput_prepare_txn;
271 cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
272 cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
273 cb->filter_by_origin_cb = pgoutput_origin_filter;
274 cb->shutdown_cb = pgoutput_shutdown;
275
276 /* transaction streaming */
277 cb->stream_start_cb = pgoutput_stream_start;
278 cb->stream_stop_cb = pgoutput_stream_stop;
279 cb->stream_abort_cb = pgoutput_stream_abort;
280 cb->stream_commit_cb = pgoutput_stream_commit;
281 cb->stream_change_cb = pgoutput_change;
282 cb->stream_message_cb = pgoutput_message;
283 cb->stream_truncate_cb = pgoutput_truncate;
284 /* transaction streaming - two-phase commit */
285 cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
286}
287
288static void
289 parse_output_parameters(List *options, PGOutputData *data)
290{
291 ListCell *lc;
292 bool protocol_version_given = false;
293 bool publication_names_given = false;
294 bool binary_option_given = false;
295 bool messages_option_given = false;
296 bool streaming_given = false;
297 bool two_phase_option_given = false;
298 bool origin_option_given = false;
299
300 /* Initialize optional parameters to defaults */
301 data->binary = false;
302 data->streaming = LOGICALREP_STREAM_OFF;
303 data->messages = false;
304 data->two_phase = false;
305 data->publish_no_origin = false;
306
307 foreach(lc, options)
308 {
309 DefElem *defel = (DefElem *) lfirst(lc);
310
311 Assert(defel->arg == NULL || IsA(defel->arg, String));
312
313 /* Check each param, whether or not we recognize it */
314 if (strcmp(defel->defname, "proto_version") == 0)
315 {
316 unsigned long parsed;
317 char *endptr;
318
319 if (protocol_version_given)
320 ereport(ERROR,
321 (errcode(ERRCODE_SYNTAX_ERROR),
322 errmsg("conflicting or redundant options")));
323 protocol_version_given = true;
324
325 errno = 0;
326 parsed = strtoul(strVal(defel->arg), &endptr, 10);
327 if (errno != 0 || *endptr != '0円')
328 ereport(ERROR,
329 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 errmsg("invalid proto_version")));
331
332 if (parsed > PG_UINT32_MAX)
333 ereport(ERROR,
334 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
335 errmsg("proto_version \"%s\" out of range",
336 strVal(defel->arg))));
337
338 data->protocol_version = (uint32) parsed;
339 }
340 else if (strcmp(defel->defname, "publication_names") == 0)
341 {
342 if (publication_names_given)
343 ereport(ERROR,
344 (errcode(ERRCODE_SYNTAX_ERROR),
345 errmsg("conflicting or redundant options")));
346 publication_names_given = true;
347
348 if (!SplitIdentifierString(strVal(defel->arg), ',',
349 &data->publication_names))
350 ereport(ERROR,
351 (errcode(ERRCODE_INVALID_NAME),
352 errmsg("invalid publication_names syntax")));
353 }
354 else if (strcmp(defel->defname, "binary") == 0)
355 {
356 if (binary_option_given)
357 ereport(ERROR,
358 (errcode(ERRCODE_SYNTAX_ERROR),
359 errmsg("conflicting or redundant options")));
360 binary_option_given = true;
361
362 data->binary = defGetBoolean(defel);
363 }
364 else if (strcmp(defel->defname, "messages") == 0)
365 {
366 if (messages_option_given)
367 ereport(ERROR,
368 (errcode(ERRCODE_SYNTAX_ERROR),
369 errmsg("conflicting or redundant options")));
370 messages_option_given = true;
371
372 data->messages = defGetBoolean(defel);
373 }
374 else if (strcmp(defel->defname, "streaming") == 0)
375 {
376 if (streaming_given)
377 ereport(ERROR,
378 (errcode(ERRCODE_SYNTAX_ERROR),
379 errmsg("conflicting or redundant options")));
380 streaming_given = true;
381
382 data->streaming = defGetStreamingMode(defel);
383 }
384 else if (strcmp(defel->defname, "two_phase") == 0)
385 {
386 if (two_phase_option_given)
387 ereport(ERROR,
388 (errcode(ERRCODE_SYNTAX_ERROR),
389 errmsg("conflicting or redundant options")));
390 two_phase_option_given = true;
391
392 data->two_phase = defGetBoolean(defel);
393 }
394 else if (strcmp(defel->defname, "origin") == 0)
395 {
396 char *origin;
397
398 if (origin_option_given)
399 ereport(ERROR,
400 errcode(ERRCODE_SYNTAX_ERROR),
401 errmsg("conflicting or redundant options"));
402 origin_option_given = true;
403
404 origin = defGetString(defel);
405 if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
406 data->publish_no_origin = true;
407 else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
408 data->publish_no_origin = false;
409 else
410 ereport(ERROR,
411 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
412 errmsg("unrecognized origin value: \"%s\"", origin));
413 }
414 else
415 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
416 }
417
418 /* Check required options */
419 if (!protocol_version_given)
420 ereport(ERROR,
421 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
422 errmsg("option \"%s\" missing", "proto_version"));
423 if (!publication_names_given)
424 ereport(ERROR,
425 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
426 errmsg("option \"%s\" missing", "publication_names"));
427}
428
429/*
430 * Initialize this plugin
431 */
432static void
433 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
434 bool is_init)
435{
436 PGOutputData *data = palloc0(sizeof(PGOutputData));
437 static bool publication_callback_registered = false;
438
439 /* Create our memory context for private allocations. */
440 data->context = AllocSetContextCreate(ctx->context,
441 "logical replication output context",
442 ALLOCSET_DEFAULT_SIZES);
443
444 data->cachectx = AllocSetContextCreate(ctx->context,
445 "logical replication cache context",
446 ALLOCSET_DEFAULT_SIZES);
447
448 data->pubctx = AllocSetContextCreate(ctx->context,
449 "logical replication publication list context",
450 ALLOCSET_SMALL_SIZES);
451
452 ctx->output_plugin_private = data;
453
454 /* This plugin uses binary protocol. */
455 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
456
457 /*
458 * This is replication start and not slot initialization.
459 *
460 * Parse and validate options passed by the client.
461 */
462 if (!is_init)
463 {
464 /* Parse the params and ERROR if we see any we don't recognize */
465 parse_output_parameters(ctx->output_plugin_options, data);
466
467 /* Check if we support requested protocol */
468 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
469 ereport(ERROR,
470 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
471 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
472 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
473
474 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
475 ereport(ERROR,
476 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
477 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
478 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
479
480 /*
481 * Decide whether to enable streaming. It is disabled by default, in
482 * which case we just update the flag in decoding context. Otherwise
483 * we only allow it with sufficient version of the protocol, and when
484 * the output plugin supports it.
485 */
486 if (data->streaming == LOGICALREP_STREAM_OFF)
487 ctx->streaming = false;
488 else if (data->streaming == LOGICALREP_STREAM_ON &&
489 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
490 ereport(ERROR,
491 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
492 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
493 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
494 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
495 data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
496 ereport(ERROR,
497 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
499 data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
500 else if (!ctx->streaming)
501 ereport(ERROR,
502 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503 errmsg("streaming requested, but not supported by output plugin")));
504
505 /*
506 * Here, we just check whether the two-phase option is passed by
507 * plugin and decide whether to enable it at later point of time. It
508 * remains enabled if the previous start-up has done so. But we only
509 * allow the option to be passed in with sufficient version of the
510 * protocol, and when the output plugin supports it.
511 */
512 if (!data->two_phase)
513 ctx->twophase_opt_given = false;
514 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
515 ereport(ERROR,
516 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
518 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
519 else if (!ctx->twophase)
520 ereport(ERROR,
521 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
522 errmsg("two-phase commit requested, but not supported by output plugin")));
523 else
524 ctx->twophase_opt_given = true;
525
526 /* Init publication state. */
527 data->publications = NIL;
528 publications_valid = false;
529
530 /*
531 * Register callback for pg_publication if we didn't already do that
532 * during some previous call in this process.
533 */
534 if (!publication_callback_registered)
535 {
536 CacheRegisterSyscacheCallback(PUBLICATIONOID,
537 publication_invalidation_cb,
538 (Datum) 0);
539 CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
540 (Datum) 0);
541 publication_callback_registered = true;
542 }
543
544 /* Initialize relation schema cache. */
545 init_rel_sync_cache(CacheMemoryContext);
546 }
547 else
548 {
549 /*
550 * Disable the streaming and prepared transactions during the slot
551 * initialization mode.
552 */
553 ctx->streaming = false;
554 ctx->twophase = false;
555 }
556}
557
558/*
559 * BEGIN callback.
560 *
561 * Don't send the BEGIN message here instead postpone it until the first
562 * change. In logical replication, a common scenario is to replicate a set of
563 * tables (instead of all tables) and transactions whose changes were on
564 * the table(s) that are not published will produce empty transactions. These
565 * empty transactions will send BEGIN and COMMIT messages to subscribers,
566 * using bandwidth on something with little/no use for logical replication.
567 */
568static void
569 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
570{
571 PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
572 sizeof(PGOutputTxnData));
573
574 txn->output_plugin_private = txndata;
575}
576
577/*
578 * Send BEGIN.
579 *
580 * This is called while processing the first change of the transaction.
581 */
582static void
583 pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
584{
585 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
586 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
587
588 Assert(txndata);
589 Assert(!txndata->sent_begin_txn);
590
591 OutputPluginPrepareWrite(ctx, !send_replication_origin);
592 logicalrep_write_begin(ctx->out, txn);
593 txndata->sent_begin_txn = true;
594
595 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
596 send_replication_origin);
597
598 OutputPluginWrite(ctx, true);
599}
600
601/*
602 * COMMIT callback
603 */
604static void
605 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
606 XLogRecPtr commit_lsn)
607{
608 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
609 bool sent_begin_txn;
610
611 Assert(txndata);
612
613 /*
614 * We don't need to send the commit message unless some relevant change
615 * from this transaction has been sent to the downstream.
616 */
617 sent_begin_txn = txndata->sent_begin_txn;
618 OutputPluginUpdateProgress(ctx, !sent_begin_txn);
619 pfree(txndata);
620 txn->output_plugin_private = NULL;
621
622 if (!sent_begin_txn)
623 {
624 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
625 return;
626 }
627
628 OutputPluginPrepareWrite(ctx, true);
629 logicalrep_write_commit(ctx->out, txn, commit_lsn);
630 OutputPluginWrite(ctx, true);
631}
632
633/*
634 * BEGIN PREPARE callback
635 */
636static void
637 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
638{
639 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
640
641 OutputPluginPrepareWrite(ctx, !send_replication_origin);
642 logicalrep_write_begin_prepare(ctx->out, txn);
643
644 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
645 send_replication_origin);
646
647 OutputPluginWrite(ctx, true);
648}
649
650/*
651 * PREPARE callback
652 */
653static void
654 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
655 XLogRecPtr prepare_lsn)
656{
657 OutputPluginUpdateProgress(ctx, false);
658
659 OutputPluginPrepareWrite(ctx, true);
660 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
661 OutputPluginWrite(ctx, true);
662}
663
664/*
665 * COMMIT PREPARED callback
666 */
667static void
668 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
669 XLogRecPtr commit_lsn)
670{
671 OutputPluginUpdateProgress(ctx, false);
672
673 OutputPluginPrepareWrite(ctx, true);
674 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
675 OutputPluginWrite(ctx, true);
676}
677
678/*
679 * ROLLBACK PREPARED callback
680 */
681static void
682 pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
683 ReorderBufferTXN *txn,
684 XLogRecPtr prepare_end_lsn,
685 TimestampTz prepare_time)
686{
687 OutputPluginUpdateProgress(ctx, false);
688
689 OutputPluginPrepareWrite(ctx, true);
690 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
691 prepare_time);
692 OutputPluginWrite(ctx, true);
693}
694
695/*
696 * Write the current schema of the relation and its ancestor (if any) if not
697 * done yet.
698 */
699static void
700 maybe_send_schema(LogicalDecodingContext *ctx,
701 ReorderBufferChange *change,
702 Relation relation, RelationSyncEntry *relentry)
703{
704 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
705 bool schema_sent;
706 TransactionId xid = InvalidTransactionId;
707 TransactionId topxid = InvalidTransactionId;
708
709 /*
710 * Remember XID of the (sub)transaction for the change. We don't care if
711 * it's top-level transaction or not (we have already sent that XID in
712 * start of the current streaming block).
713 *
714 * If we're not in a streaming block, just use InvalidTransactionId and
715 * the write methods will not include it.
716 */
717 if (data->in_streaming)
718 xid = change->txn->xid;
719
720 if (rbtxn_is_subtxn(change->txn))
721 topxid = rbtxn_get_toptxn(change->txn)->xid;
722 else
723 topxid = xid;
724
725 /*
726 * Do we need to send the schema? We do track streamed transactions
727 * separately, because those may be applied later (and the regular
728 * transactions won't see their effects until then) and in an order that
729 * we don't know at this point.
730 *
731 * XXX There is a scope of optimization here. Currently, we always send
732 * the schema first time in a streaming transaction but we can probably
733 * avoid that by checking 'relentry->schema_sent' flag. However, before
734 * doing that we need to study its impact on the case where we have a mix
735 * of streaming and non-streaming transactions.
736 */
737 if (data->in_streaming)
738 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
739 else
740 schema_sent = relentry->schema_sent;
741
742 /* Nothing to do if we already sent the schema. */
743 if (schema_sent)
744 return;
745
746 /*
747 * Send the schema. If the changes will be published using an ancestor's
748 * schema, not the relation's own, send that ancestor's schema before
749 * sending relation's own (XXX - maybe sending only the former suffices?).
750 */
751 if (relentry->publish_as_relid != RelationGetRelid(relation))
752 {
753 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
754
755 send_relation_and_attrs(ancestor, xid, ctx, relentry);
756 RelationClose(ancestor);
757 }
758
759 send_relation_and_attrs(relation, xid, ctx, relentry);
760
761 if (data->in_streaming)
762 set_schema_sent_in_streamed_txn(relentry, topxid);
763 else
764 relentry->schema_sent = true;
765}
766
767/*
768 * Sends a relation
769 */
770static void
771 send_relation_and_attrs(Relation relation, TransactionId xid,
772 LogicalDecodingContext *ctx,
773 RelationSyncEntry *relentry)
774{
775 TupleDesc desc = RelationGetDescr(relation);
776 Bitmapset *columns = relentry->columns;
777 PublishGencolsType include_gencols_type = relentry->include_gencols_type;
778 int i;
779
780 /*
781 * Write out type info if needed. We do that only for user-created types.
782 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
783 * objects with hand-assigned OIDs to be "built in", not for instance any
784 * function or type defined in the information_schema. This is important
785 * because only hand-assigned OIDs can be expected to remain stable across
786 * major versions.
787 */
788 for (i = 0; i < desc->natts; i++)
789 {
790 Form_pg_attribute att = TupleDescAttr(desc, i);
791
792 if (!logicalrep_should_publish_column(att, columns,
793 include_gencols_type))
794 continue;
795
796 if (att->atttypid < FirstGenbkiObjectId)
797 continue;
798
799 OutputPluginPrepareWrite(ctx, false);
800 logicalrep_write_typ(ctx->out, xid, att->atttypid);
801 OutputPluginWrite(ctx, false);
802 }
803
804 OutputPluginPrepareWrite(ctx, false);
805 logicalrep_write_rel(ctx->out, xid, relation, columns,
806 include_gencols_type);
807 OutputPluginWrite(ctx, false);
808}
809
810/*
811 * Executor state preparation for evaluation of row filter expressions for the
812 * specified relation.
813 */
814static EState *
815 create_estate_for_relation(Relation rel)
816{
817 EState *estate;
818 RangeTblEntry *rte;
819 List *perminfos = NIL;
820
821 estate = CreateExecutorState();
822
823 rte = makeNode(RangeTblEntry);
824 rte->rtekind = RTE_RELATION;
825 rte->relid = RelationGetRelid(rel);
826 rte->relkind = rel->rd_rel->relkind;
827 rte->rellockmode = AccessShareLock;
828
829 addRTEPermissionInfo(&perminfos, rte);
830
831 ExecInitRangeTable(estate, list_make1(rte), perminfos,
832 bms_make_singleton(1));
833
834 estate->es_output_cid = GetCurrentCommandId(false);
835
836 return estate;
837}
838
839/*
840 * Evaluates row filter.
841 *
842 * If the row filter evaluates to NULL, it is taken as false i.e. the change
843 * isn't replicated.
844 */
845static bool
846 pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
847{
848 Datum ret;
849 bool isnull;
850
851 Assert(state != NULL);
852
853 ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
854
855 elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
856 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
857 isnull ? "true" : "false");
858
859 if (isnull)
860 return false;
861
862 return DatumGetBool(ret);
863}
864
865/*
866 * Make sure the per-entry memory context exists.
867 */
868static void
869 pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
870{
871 Relation relation;
872
873 /* The context may already exist, in which case bail out. */
874 if (entry->entry_cxt)
875 return;
876
877 relation = RelationIdGetRelation(entry->publish_as_relid);
878
879 entry->entry_cxt = AllocSetContextCreate(data->cachectx,
880 "entry private context",
881 ALLOCSET_SMALL_SIZES);
882
883 MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
884 RelationGetRelationName(relation));
885}
886
887/*
888 * Initialize the row filter.
889 */
890static void
891 pgoutput_row_filter_init(PGOutputData *data, List *publications,
892 RelationSyncEntry *entry)
893{
894 ListCell *lc;
895 List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
896 bool no_filter[] = {false, false, false}; /* One per pubaction */
897 MemoryContext oldctx;
898 int idx;
899 bool has_filter = true;
900 Oid schemaid = get_rel_namespace(entry->publish_as_relid);
901
902 /*
903 * Find if there are any row filters for this relation. If there are, then
904 * prepare the necessary ExprState and cache it in entry->exprstate. To
905 * build an expression state, we need to ensure the following:
906 *
907 * All the given publication-table mappings must be checked.
908 *
909 * Multiple publications might have multiple row filters for this
910 * relation. Since row filter usage depends on the DML operation, there
911 * are multiple lists (one for each operation) to which row filters will
912 * be appended.
913 *
914 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
915 * expression" so it takes precedence.
916 */
917 foreach(lc, publications)
918 {
919 Publication *pub = lfirst(lc);
920 HeapTuple rftuple = NULL;
921 Datum rfdatum = 0;
922 bool pub_no_filter = true;
923
924 /*
925 * If the publication is FOR ALL TABLES, or the publication includes a
926 * FOR TABLES IN SCHEMA where the table belongs to the referred
927 * schema, then it is treated the same as if there are no row filters
928 * (even if other publications have a row filter).
929 */
930 if (!pub->alltables &&
931 !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
932 ObjectIdGetDatum(schemaid),
933 ObjectIdGetDatum(pub->oid)))
934 {
935 /*
936 * Check for the presence of a row filter in this publication.
937 */
938 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
939 ObjectIdGetDatum(entry->publish_as_relid),
940 ObjectIdGetDatum(pub->oid));
941
942 if (HeapTupleIsValid(rftuple))
943 {
944 /* Null indicates no filter. */
945 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
946 Anum_pg_publication_rel_prqual,
947 &pub_no_filter);
948 }
949 }
950
951 if (pub_no_filter)
952 {
953 if (rftuple)
954 ReleaseSysCache(rftuple);
955
956 no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
957 no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
958 no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
959
960 /*
961 * Quick exit if all the DML actions are publicized via this
962 * publication.
963 */
964 if (no_filter[PUBACTION_INSERT] &&
965 no_filter[PUBACTION_UPDATE] &&
966 no_filter[PUBACTION_DELETE])
967 {
968 has_filter = false;
969 break;
970 }
971
972 /* No additional work for this publication. Next one. */
973 continue;
974 }
975
976 /* Form the per pubaction row filter lists. */
977 if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
978 rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
979 TextDatumGetCString(rfdatum));
980 if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
981 rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
982 TextDatumGetCString(rfdatum));
983 if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
984 rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
985 TextDatumGetCString(rfdatum));
986
987 ReleaseSysCache(rftuple);
988 } /* loop all subscribed publications */
989
990 /* Clean the row filter */
991 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
992 {
993 if (no_filter[idx])
994 {
995 list_free_deep(rfnodes[idx]);
996 rfnodes[idx] = NIL;
997 }
998 }
999
1000 if (has_filter)
1001 {
1002 Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1003
1004 pgoutput_ensure_entry_cxt(data, entry);
1005
1006 /*
1007 * Now all the filters for all pubactions are known. Combine them when
1008 * their pubactions are the same.
1009 */
1010 oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1011 entry->estate = create_estate_for_relation(relation);
1012 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1013 {
1014 List *filters = NIL;
1015 Expr *rfnode;
1016
1017 if (rfnodes[idx] == NIL)
1018 continue;
1019
1020 foreach(lc, rfnodes[idx])
1021 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1022
1023 /* combine the row filter and cache the ExprState */
1024 rfnode = make_orclause(filters);
1025 entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1026 } /* for each pubaction */
1027 MemoryContextSwitchTo(oldctx);
1028
1029 RelationClose(relation);
1030 }
1031}
1032
1033/*
1034 * If the table contains a generated column, check for any conflicting
1035 * values of 'publish_generated_columns' parameter in the publications.
1036 */
1037static void
1038 check_and_init_gencol(PGOutputData *data, List *publications,
1039 RelationSyncEntry *entry)
1040{
1041 Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1042 TupleDesc desc = RelationGetDescr(relation);
1043 bool gencolpresent = false;
1044 bool first = true;
1045
1046 /* Check if there is any generated column present. */
1047 for (int i = 0; i < desc->natts; i++)
1048 {
1049 Form_pg_attribute att = TupleDescAttr(desc, i);
1050
1051 if (att->attgenerated)
1052 {
1053 gencolpresent = true;
1054 break;
1055 }
1056 }
1057
1058 /* There are no generated columns to be published. */
1059 if (!gencolpresent)
1060 {
1061 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1062 return;
1063 }
1064
1065 /*
1066 * There may be a conflicting value for 'publish_generated_columns'
1067 * parameter in the publications.
1068 */
1069 foreach_ptr(Publication, pub, publications)
1070 {
1071 /*
1072 * The column list takes precedence over the
1073 * 'publish_generated_columns' parameter. Those will be checked later,
1074 * see pgoutput_column_list_init.
1075 */
1076 if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1077 continue;
1078
1079 if (first)
1080 {
1081 entry->include_gencols_type = pub->pubgencols_type;
1082 first = false;
1083 }
1084 else if (entry->include_gencols_type != pub->pubgencols_type)
1085 ereport(ERROR,
1086 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1087 errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1088 get_namespace_name(RelationGetNamespace(relation)),
1089 RelationGetRelationName(relation)));
1090 }
1091}
1092
1093/*
1094 * Initialize the column list.
1095 */
1096static void
1097 pgoutput_column_list_init(PGOutputData *data, List *publications,
1098 RelationSyncEntry *entry)
1099{
1100 ListCell *lc;
1101 bool first = true;
1102 Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1103 bool found_pub_collist = false;
1104 Bitmapset *relcols = NULL;
1105
1106 pgoutput_ensure_entry_cxt(data, entry);
1107
1108 /*
1109 * Find if there are any column lists for this relation. If there are,
1110 * build a bitmap using the column lists.
1111 *
1112 * Multiple publications might have multiple column lists for this
1113 * relation.
1114 *
1115 * Note that we don't support the case where the column list is different
1116 * for the same table when combining publications. See comments atop
1117 * fetch_table_list. But one can later change the publication so we still
1118 * need to check all the given publication-table mappings and report an
1119 * error if any publications have a different column list.
1120 */
1121 foreach(lc, publications)
1122 {
1123 Publication *pub = lfirst(lc);
1124 Bitmapset *cols = NULL;
1125
1126 /* Retrieve the bitmap of columns for a column list publication. */
1127 found_pub_collist |= check_and_fetch_column_list(pub,
1128 entry->publish_as_relid,
1129 entry->entry_cxt, &cols);
1130
1131 /*
1132 * For non-column list publications — e.g. TABLE (without a column
1133 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1134 * of the table (including generated columns when
1135 * 'publish_generated_columns' parameter is true).
1136 */
1137 if (!cols)
1138 {
1139 /*
1140 * Cache the table columns for the first publication with no
1141 * specified column list to detect publication with a different
1142 * column list.
1143 */
1144 if (!relcols && (list_length(publications) > 1))
1145 {
1146 MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
1147
1148 relcols = pub_form_cols_map(relation,
1149 entry->include_gencols_type);
1150 MemoryContextSwitchTo(oldcxt);
1151 }
1152
1153 cols = relcols;
1154 }
1155
1156 if (first)
1157 {
1158 entry->columns = cols;
1159 first = false;
1160 }
1161 else if (!bms_equal(entry->columns, cols))
1162 ereport(ERROR,
1163 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1164 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1165 get_namespace_name(RelationGetNamespace(relation)),
1166 RelationGetRelationName(relation)));
1167 } /* loop all subscribed publications */
1168
1169 /*
1170 * If no column list publications exist, columns to be published will be
1171 * computed later according to the 'publish_generated_columns' parameter.
1172 */
1173 if (!found_pub_collist)
1174 entry->columns = NULL;
1175
1176 RelationClose(relation);
1177}
1178
1179/*
1180 * Initialize the slot for storing new and old tuples, and build the map that
1181 * will be used to convert the relation's tuples into the ancestor's format.
1182 */
1183static void
1184 init_tuple_slot(PGOutputData *data, Relation relation,
1185 RelationSyncEntry *entry)
1186{
1187 MemoryContext oldctx;
1188 TupleDesc oldtupdesc;
1189 TupleDesc newtupdesc;
1190
1191 oldctx = MemoryContextSwitchTo(data->cachectx);
1192
1193 /*
1194 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1195 * live as long as the cache remains.
1196 */
1197 oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1198 newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1199
1200 entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1201 entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1202
1203 MemoryContextSwitchTo(oldctx);
1204
1205 /*
1206 * Cache the map that will be used to convert the relation's tuples into
1207 * the ancestor's format, if needed.
1208 */
1209 if (entry->publish_as_relid != RelationGetRelid(relation))
1210 {
1211 Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1212 TupleDesc indesc = RelationGetDescr(relation);
1213 TupleDesc outdesc = RelationGetDescr(ancestor);
1214
1215 /* Map must live as long as the logical decoding context. */
1216 oldctx = MemoryContextSwitchTo(data->cachectx);
1217
1218 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1219
1220 MemoryContextSwitchTo(oldctx);
1221 RelationClose(ancestor);
1222 }
1223}
1224
1225/*
1226 * Change is checked against the row filter if any.
1227 *
1228 * Returns true if the change is to be replicated, else false.
1229 *
1230 * For inserts, evaluate the row filter for new tuple.
1231 * For deletes, evaluate the row filter for old tuple.
1232 * For updates, evaluate the row filter for old and new tuple.
1233 *
1234 * For updates, if both evaluations are true, we allow sending the UPDATE and
1235 * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1236 * only one of the tuples matches the row filter expression, we transform
1237 * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1238 * following rules:
1239 *
1240 * Case 1: old-row (no match) new-row (no match) -> (drop change)
1241 * Case 2: old-row (no match) new row (match) -> INSERT
1242 * Case 3: old-row (match) new-row (no match) -> DELETE
1243 * Case 4: old-row (match) new row (match) -> UPDATE
1244 *
1245 * The new action is updated in the action parameter.
1246 *
1247 * The new slot could be updated when transforming the UPDATE into INSERT,
1248 * because the original new tuple might not have column values from the replica
1249 * identity.
1250 *
1251 * Examples:
1252 * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1253 * Since the old tuple satisfies, the initial table synchronization copied this
1254 * row (or another method was used to guarantee that there is data
1255 * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1256 * row filter, so from a data consistency perspective, that row should be
1257 * removed on the subscriber. The UPDATE should be transformed into a DELETE
1258 * statement and be sent to the subscriber. Keeping this row on the subscriber
1259 * is undesirable because it doesn't reflect what was defined in the row filter
1260 * expression on the publisher. This row on the subscriber would likely not be
1261 * modified by replication again. If someone inserted a new row with the same
1262 * old identifier, replication could stop due to a constraint violation.
1263 *
1264 * Let's say the old tuple doesn't match the row filter but the new tuple does.
1265 * Since the old tuple doesn't satisfy, the initial table synchronization
1266 * probably didn't copy this row. However, after the UPDATE the new tuple does
1267 * satisfy the row filter, so from a data consistency perspective, that row
1268 * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1269 * statements have no effect (it matches no row -- see
1270 * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1271 * INSERT statement and be sent to the subscriber. However, this might surprise
1272 * someone who expects the data set to satisfy the row filter expression on the
1273 * provider.
1274 */
1275static bool
1276 pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
1277 TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1278 ReorderBufferChangeType *action)
1279{
1280 TupleDesc desc;
1281 int i;
1282 bool old_matched,
1283 new_matched,
1284 result;
1285 TupleTableSlot *tmp_new_slot;
1286 TupleTableSlot *new_slot = *new_slot_ptr;
1287 ExprContext *ecxt;
1288 ExprState *filter_exprstate;
1289
1290 /*
1291 * We need this map to avoid relying on ReorderBufferChangeType enums
1292 * having specific values.
1293 */
1294 static const int map_changetype_pubaction[] = {
1295 [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
1296 [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
1297 [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
1298 };
1299
1300 Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
1301 *action == REORDER_BUFFER_CHANGE_UPDATE ||
1302 *action == REORDER_BUFFER_CHANGE_DELETE);
1303
1304 Assert(new_slot || old_slot);
1305
1306 /* Get the corresponding row filter */
1307 filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1308
1309 /* Bail out if there is no row filter */
1310 if (!filter_exprstate)
1311 return true;
1312
1313 elog(DEBUG3, "table \"%s.%s\" has row filter",
1314 get_namespace_name(RelationGetNamespace(relation)),
1315 RelationGetRelationName(relation));
1316
1317 ResetPerTupleExprContext(entry->estate);
1318
1319 ecxt = GetPerTupleExprContext(entry->estate);
1320
1321 /*
1322 * For the following occasions where there is only one tuple, we can
1323 * evaluate the row filter for that tuple and return.
1324 *
1325 * For inserts, we only have the new tuple.
1326 *
1327 * For updates, we can have only a new tuple when none of the replica
1328 * identity columns changed and none of those columns have external data
1329 * but we still need to evaluate the row filter for the new tuple as the
1330 * existing values of those columns might not match the filter. Also,
1331 * users can use constant expressions in the row filter, so we anyway need
1332 * to evaluate it for the new tuple.
1333 *
1334 * For deletes, we only have the old tuple.
1335 */
1336 if (!new_slot || !old_slot)
1337 {
1338 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1339 result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1340
1341 return result;
1342 }
1343
1344 /*
1345 * Both the old and new tuples must be valid only for updates and need to
1346 * be checked against the row filter.
1347 */
1348 Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1349
1350 slot_getallattrs(new_slot);
1351 slot_getallattrs(old_slot);
1352
1353 tmp_new_slot = NULL;
1354 desc = RelationGetDescr(relation);
1355
1356 /*
1357 * The new tuple might not have all the replica identity columns, in which
1358 * case it needs to be copied over from the old tuple.
1359 */
1360 for (i = 0; i < desc->natts; i++)
1361 {
1362 CompactAttribute *att = TupleDescCompactAttr(desc, i);
1363
1364 /*
1365 * if the column in the new tuple or old tuple is null, nothing to do
1366 */
1367 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1368 continue;
1369
1370 /*
1371 * Unchanged toasted replica identity columns are only logged in the
1372 * old tuple. Copy this over to the new tuple. The changed (or WAL
1373 * Logged) toast values are always assembled in memory and set as
1374 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1375 */
1376 if (att->attlen == -1 &&
1377 VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
1378 !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
1379 {
1380 if (!tmp_new_slot)
1381 {
1382 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1383 ExecClearTuple(tmp_new_slot);
1384
1385 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1386 desc->natts * sizeof(Datum));
1387 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1388 desc->natts * sizeof(bool));
1389 }
1390
1391 tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1392 tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1393 }
1394 }
1395
1396 ecxt->ecxt_scantuple = old_slot;
1397 old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1398
1399 if (tmp_new_slot)
1400 {
1401 ExecStoreVirtualTuple(tmp_new_slot);
1402 ecxt->ecxt_scantuple = tmp_new_slot;
1403 }
1404 else
1405 ecxt->ecxt_scantuple = new_slot;
1406
1407 new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1408
1409 /*
1410 * Case 1: if both tuples don't match the row filter, bailout. Send
1411 * nothing.
1412 */
1413 if (!old_matched && !new_matched)
1414 return false;
1415
1416 /*
1417 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1418 * tuple does, transform the UPDATE into INSERT.
1419 *
1420 * Use the newly transformed tuple that must contain the column values for
1421 * all the replica identity columns. This is required to ensure that the
1422 * while inserting the tuple in the downstream node, we have all the
1423 * required column values.
1424 */
1425 if (!old_matched && new_matched)
1426 {
1427 *action = REORDER_BUFFER_CHANGE_INSERT;
1428
1429 if (tmp_new_slot)
1430 *new_slot_ptr = tmp_new_slot;
1431 }
1432
1433 /*
1434 * Case 3: if the old tuple satisfies the row filter but the new tuple
1435 * doesn't, transform the UPDATE into DELETE.
1436 *
1437 * This transformation does not require another tuple. The Old tuple will
1438 * be used for DELETE.
1439 */
1440 else if (old_matched && !new_matched)
1441 *action = REORDER_BUFFER_CHANGE_DELETE;
1442
1443 /*
1444 * Case 4: if both tuples match the row filter, transformation isn't
1445 * required. (*action is default UPDATE).
1446 */
1447
1448 return true;
1449}
1450
1451/*
1452 * Sends the decoded DML over wire.
1453 *
1454 * This is called both in streaming and non-streaming modes.
1455 */
1456static void
1457 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1458 Relation relation, ReorderBufferChange *change)
1459{
1460 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1461 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1462 MemoryContext old;
1463 RelationSyncEntry *relentry;
1464 TransactionId xid = InvalidTransactionId;
1465 Relation ancestor = NULL;
1466 Relation targetrel = relation;
1467 ReorderBufferChangeType action = change->action;
1468 TupleTableSlot *old_slot = NULL;
1469 TupleTableSlot *new_slot = NULL;
1470
1471 if (!is_publishable_relation(relation))
1472 return;
1473
1474 /*
1475 * Remember the xid for the change in streaming mode. We need to send xid
1476 * with each change in the streaming mode so that subscriber can make
1477 * their association and on aborts, it can discard the corresponding
1478 * changes.
1479 */
1480 if (data->in_streaming)
1481 xid = change->txn->xid;
1482
1483 relentry = get_rel_sync_entry(data, relation);
1484
1485 /* First check the table filter */
1486 switch (action)
1487 {
1488 case REORDER_BUFFER_CHANGE_INSERT:
1489 if (!relentry->pubactions.pubinsert)
1490 return;
1491 break;
1492 case REORDER_BUFFER_CHANGE_UPDATE:
1493 if (!relentry->pubactions.pubupdate)
1494 return;
1495 break;
1496 case REORDER_BUFFER_CHANGE_DELETE:
1497 if (!relentry->pubactions.pubdelete)
1498 return;
1499
1500 /*
1501 * This is only possible if deletes are allowed even when replica
1502 * identity is not defined for a table. Since the DELETE action
1503 * can't be published, we simply return.
1504 */
1505 if (!change->data.tp.oldtuple)
1506 {
1507 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1508 return;
1509 }
1510 break;
1511 default:
1512 Assert(false);
1513 }
1514
1515 /* Avoid leaking memory by using and resetting our own context */
1516 old = MemoryContextSwitchTo(data->context);
1517
1518 /* Switch relation if publishing via root. */
1519 if (relentry->publish_as_relid != RelationGetRelid(relation))
1520 {
1521 Assert(relation->rd_rel->relispartition);
1522 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1523 targetrel = ancestor;
1524 }
1525
1526 if (change->data.tp.oldtuple)
1527 {
1528 old_slot = relentry->old_slot;
1529 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1530
1531 /* Convert tuple if needed. */
1532 if (relentry->attrmap)
1533 {
1534 TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1535 &TTSOpsVirtual);
1536
1537 old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1538 }
1539 }
1540
1541 if (change->data.tp.newtuple)
1542 {
1543 new_slot = relentry->new_slot;
1544 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1545
1546 /* Convert tuple if needed. */
1547 if (relentry->attrmap)
1548 {
1549 TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
1550 &TTSOpsVirtual);
1551
1552 new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1553 }
1554 }
1555
1556 /*
1557 * Check row filter.
1558 *
1559 * Updates could be transformed to inserts or deletes based on the results
1560 * of the row filter for old and new tuple.
1561 */
1562 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1563 goto cleanup;
1564
1565 /*
1566 * Send BEGIN if we haven't yet.
1567 *
1568 * We send the BEGIN message after ensuring that we will actually send the
1569 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1570 * transactions.
1571 */
1572 if (txndata && !txndata->sent_begin_txn)
1573 pgoutput_send_begin(ctx, txn);
1574
1575 /*
1576 * Schema should be sent using the original relation because it also sends
1577 * the ancestor's relation.
1578 */
1579 maybe_send_schema(ctx, change, relation, relentry);
1580
1581 OutputPluginPrepareWrite(ctx, true);
1582
1583 /* Send the data */
1584 switch (action)
1585 {
1586 case REORDER_BUFFER_CHANGE_INSERT:
1587 logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1588 data->binary, relentry->columns,
1589 relentry->include_gencols_type);
1590 break;
1591 case REORDER_BUFFER_CHANGE_UPDATE:
1592 logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1593 new_slot, data->binary, relentry->columns,
1594 relentry->include_gencols_type);
1595 break;
1596 case REORDER_BUFFER_CHANGE_DELETE:
1597 logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1598 data->binary, relentry->columns,
1599 relentry->include_gencols_type);
1600 break;
1601 default:
1602 Assert(false);
1603 }
1604
1605 OutputPluginWrite(ctx, true);
1606
1607cleanup:
1608 if (RelationIsValid(ancestor))
1609 {
1610 RelationClose(ancestor);
1611 ancestor = NULL;
1612 }
1613
1614 /* Drop the new slots that were used to store the converted tuples. */
1615 if (relentry->attrmap)
1616 {
1617 if (old_slot)
1618 ExecDropSingleTupleTableSlot(old_slot);
1619
1620 if (new_slot)
1621 ExecDropSingleTupleTableSlot(new_slot);
1622 }
1623
1624 MemoryContextSwitchTo(old);
1625 MemoryContextReset(data->context);
1626}
1627
1628static void
1629 pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1630 int nrelations, Relation relations[], ReorderBufferChange *change)
1631{
1632 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1633 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1634 MemoryContext old;
1635 RelationSyncEntry *relentry;
1636 int i;
1637 int nrelids;
1638 Oid *relids;
1639 TransactionId xid = InvalidTransactionId;
1640
1641 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1642 if (data->in_streaming)
1643 xid = change->txn->xid;
1644
1645 old = MemoryContextSwitchTo(data->context);
1646
1647 relids = palloc0(nrelations * sizeof(Oid));
1648 nrelids = 0;
1649
1650 for (i = 0; i < nrelations; i++)
1651 {
1652 Relation relation = relations[i];
1653 Oid relid = RelationGetRelid(relation);
1654
1655 if (!is_publishable_relation(relation))
1656 continue;
1657
1658 relentry = get_rel_sync_entry(data, relation);
1659
1660 if (!relentry->pubactions.pubtruncate)
1661 continue;
1662
1663 /*
1664 * Don't send partitions if the publication wants to send only the
1665 * root tables through it.
1666 */
1667 if (relation->rd_rel->relispartition &&
1668 relentry->publish_as_relid != relid)
1669 continue;
1670
1671 relids[nrelids++] = relid;
1672
1673 /* Send BEGIN if we haven't yet */
1674 if (txndata && !txndata->sent_begin_txn)
1675 pgoutput_send_begin(ctx, txn);
1676
1677 maybe_send_schema(ctx, change, relation, relentry);
1678 }
1679
1680 if (nrelids > 0)
1681 {
1682 OutputPluginPrepareWrite(ctx, true);
1683 logicalrep_write_truncate(ctx->out,
1684 xid,
1685 nrelids,
1686 relids,
1687 change->data.truncate.cascade,
1688 change->data.truncate.restart_seqs);
1689 OutputPluginWrite(ctx, true);
1690 }
1691
1692 MemoryContextSwitchTo(old);
1693 MemoryContextReset(data->context);
1694}
1695
1696static void
1697 pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
1698 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1699 const char *message)
1700{
1701 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1702 TransactionId xid = InvalidTransactionId;
1703
1704 if (!data->messages)
1705 return;
1706
1707 /*
1708 * Remember the xid for the message in streaming mode. See
1709 * pgoutput_change.
1710 */
1711 if (data->in_streaming)
1712 xid = txn->xid;
1713
1714 /*
1715 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1716 */
1717 if (transactional)
1718 {
1719 PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
1720
1721 /* Send BEGIN if we haven't yet */
1722 if (txndata && !txndata->sent_begin_txn)
1723 pgoutput_send_begin(ctx, txn);
1724 }
1725
1726 OutputPluginPrepareWrite(ctx, true);
1727 logicalrep_write_message(ctx->out,
1728 xid,
1729 message_lsn,
1730 transactional,
1731 prefix,
1732 sz,
1733 message);
1734 OutputPluginWrite(ctx, true);
1735}
1736
1737/*
1738 * Return true if the data is associated with an origin and the user has
1739 * requested the changes that don't have an origin, false otherwise.
1740 */
1741static bool
1742 pgoutput_origin_filter(LogicalDecodingContext *ctx,
1743 RepOriginId origin_id)
1744{
1745 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1746
1747 if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1748 return true;
1749
1750 return false;
1751}
1752
1753/*
1754 * Shutdown the output plugin.
1755 *
1756 * Note, we don't need to clean the data->context, data->cachectx, and
1757 * data->pubctx as they are child contexts of the ctx->context so they
1758 * will be cleaned up by logical decoding machinery.
1759 */
1760static void
1761 pgoutput_shutdown(LogicalDecodingContext *ctx)
1762{
1763 if (RelationSyncCache)
1764 {
1765 hash_destroy(RelationSyncCache);
1766 RelationSyncCache = NULL;
1767 }
1768}
1769
1770/*
1771 * Load publications from the list of publication names.
1772 *
1773 * Here, we skip the publications that don't exist yet. This will allow us
1774 * to silently continue the replication in the absence of a missing publication.
1775 * This is required because we allow the users to create publications after they
1776 * have specified the required publications at the time of replication start.
1777 */
1778static List *
1779 LoadPublications(List *pubnames)
1780{
1781 List *result = NIL;
1782 ListCell *lc;
1783
1784 foreach(lc, pubnames)
1785 {
1786 char *pubname = (char *) lfirst(lc);
1787 Publication *pub = GetPublicationByName(pubname, true);
1788
1789 if (pub)
1790 result = lappend(result, pub);
1791 else
1792 ereport(WARNING,
1793 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1794 errmsg("skipped loading publication \"%s\"", pubname),
1795 errdetail("The publication does not exist at this point in the WAL."),
1796 errhint("Create the publication if it does not exist."));
1797 }
1798
1799 return result;
1800}
1801
1802/*
1803 * Publication syscache invalidation callback.
1804 *
1805 * Called for invalidations on pg_publication.
1806 */
1807static void
1808 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
1809{
1810 publications_valid = false;
1811}
1812
1813/*
1814 * START STREAM callback
1815 */
1816static void
1817 pgoutput_stream_start(struct LogicalDecodingContext *ctx,
1818 ReorderBufferTXN *txn)
1819{
1820 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1821 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1822
1823 /* we can't nest streaming of transactions */
1824 Assert(!data->in_streaming);
1825
1826 /*
1827 * If we already sent the first stream for this transaction then don't
1828 * send the origin id in the subsequent streams.
1829 */
1830 if (rbtxn_is_streamed(txn))
1831 send_replication_origin = false;
1832
1833 OutputPluginPrepareWrite(ctx, !send_replication_origin);
1834 logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
1835
1836 send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
1837 send_replication_origin);
1838
1839 OutputPluginWrite(ctx, true);
1840
1841 /* we're streaming a chunk of transaction now */
1842 data->in_streaming = true;
1843}
1844
1845/*
1846 * STOP STREAM callback
1847 */
1848static void
1849 pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
1850 ReorderBufferTXN *txn)
1851{
1852 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1853
1854 /* we should be streaming a transaction */
1855 Assert(data->in_streaming);
1856
1857 OutputPluginPrepareWrite(ctx, true);
1858 logicalrep_write_stream_stop(ctx->out);
1859 OutputPluginWrite(ctx, true);
1860
1861 /* we've stopped streaming a transaction */
1862 data->in_streaming = false;
1863}
1864
1865/*
1866 * Notify downstream to discard the streamed transaction (along with all
1867 * its subtransactions, if it's a toplevel transaction).
1868 */
1869static void
1870 pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
1871 ReorderBufferTXN *txn,
1872 XLogRecPtr abort_lsn)
1873{
1874 ReorderBufferTXN *toptxn;
1875 PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
1876 bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1877
1878 /*
1879 * The abort should happen outside streaming block, even for streamed
1880 * transactions. The transaction has to be marked as streamed, though.
1881 */
1882 Assert(!data->in_streaming);
1883
1884 /* determine the toplevel transaction */
1885 toptxn = rbtxn_get_toptxn(txn);
1886
1887 Assert(rbtxn_is_streamed(toptxn));
1888
1889 OutputPluginPrepareWrite(ctx, true);
1890 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1891 txn->abort_time, write_abort_info);
1892
1893 OutputPluginWrite(ctx, true);
1894
1895 cleanup_rel_sync_cache(toptxn->xid, false);
1896}
1897
1898/*
1899 * Notify downstream to apply the streamed transaction (along with all
1900 * its subtransactions).
1901 */
1902static void
1903 pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
1904 ReorderBufferTXN *txn,
1905 XLogRecPtr commit_lsn)
1906{
1907 PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
1908
1909 /*
1910 * The commit should happen outside streaming block, even for streamed
1911 * transactions. The transaction has to be marked as streamed, though.
1912 */
1913 Assert(!data->in_streaming);
1914 Assert(rbtxn_is_streamed(txn));
1915
1916 OutputPluginUpdateProgress(ctx, false);
1917
1918 OutputPluginPrepareWrite(ctx, true);
1919 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1920 OutputPluginWrite(ctx, true);
1921
1922 cleanup_rel_sync_cache(txn->xid, true);
1923}
1924
1925/*
1926 * PREPARE callback (for streaming two-phase commit).
1927 *
1928 * Notify the downstream to prepare the transaction.
1929 */
1930static void
1931 pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1932 ReorderBufferTXN *txn,
1933 XLogRecPtr prepare_lsn)
1934{
1935 Assert(rbtxn_is_streamed(txn));
1936
1937 OutputPluginUpdateProgress(ctx, false);
1938 OutputPluginPrepareWrite(ctx, true);
1939 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1940 OutputPluginWrite(ctx, true);
1941}
1942
1943/*
1944 * Initialize the relation schema sync cache for a decoding session.
1945 *
1946 * The hash table is destroyed at the end of a decoding session. While
1947 * relcache invalidations still exist and will still be invoked, they
1948 * will just see the null hash table global and take no action.
1949 */
1950static void
1951 init_rel_sync_cache(MemoryContext cachectx)
1952{
1953 HASHCTL ctl;
1954 static bool relation_callbacks_registered = false;
1955
1956 /* Nothing to do if hash table already exists */
1957 if (RelationSyncCache != NULL)
1958 return;
1959
1960 /* Make a new hash table for the cache */
1961 ctl.keysize = sizeof(Oid);
1962 ctl.entrysize = sizeof(RelationSyncEntry);
1963 ctl.hcxt = cachectx;
1964
1965 RelationSyncCache = hash_create("logical replication output relation cache",
1966 128, &ctl,
1967 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
1968
1969 Assert(RelationSyncCache != NULL);
1970
1971 /* No more to do if we already registered callbacks */
1972 if (relation_callbacks_registered)
1973 return;
1974
1975 /* We must update the cache entry for a relation after a relcache flush */
1976 CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
1977
1978 /*
1979 * Flush all cache entries after a pg_namespace change, in case it was a
1980 * schema rename affecting a relation being replicated.
1981 *
1982 * XXX: It is not a good idea to invalidate all the relation entries in
1983 * RelationSyncCache on schema rename. We can optimize it to invalidate
1984 * only the required relations by either having a specific invalidation
1985 * message containing impacted relations or by having schema information
1986 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1987 * passed to the callback.
1988 */
1989 CacheRegisterSyscacheCallback(NAMESPACEOID,
1990 rel_sync_cache_publication_cb,
1991 (Datum) 0);
1992
1993 relation_callbacks_registered = true;
1994}
1995
1996/*
1997 * We expect relatively small number of streamed transactions.
1998 */
1999static bool
2000 get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2001{
2002 return list_member_xid(entry->streamed_txns, xid);
2003}
2004
2005/*
2006 * Add the xid in the rel sync entry for which we have already sent the schema
2007 * of the relation.
2008 */
2009static void
2010 set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
2011{
2012 MemoryContext oldctx;
2013
2014 oldctx = MemoryContextSwitchTo(CacheMemoryContext);
2015
2016 entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2017
2018 MemoryContextSwitchTo(oldctx);
2019}
2020
2021/*
2022 * Find or create entry in the relation schema cache.
2023 *
2024 * This looks up publications that the given relation is directly or
2025 * indirectly part of (the latter if it's really the relation's ancestor that
2026 * is part of a publication) and fills up the found entry with the information
2027 * about which operations to publish and whether to use an ancestor's schema
2028 * when publishing.
2029 */
2030static RelationSyncEntry *
2031 get_rel_sync_entry(PGOutputData *data, Relation relation)
2032{
2033 RelationSyncEntry *entry;
2034 bool found;
2035 MemoryContext oldctx;
2036 Oid relid = RelationGetRelid(relation);
2037
2038 Assert(RelationSyncCache != NULL);
2039
2040 /* Find cached relation info, creating if not found */
2041 entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
2042 &relid,
2043 HASH_ENTER, &found);
2044 Assert(entry != NULL);
2045
2046 /* initialize entry, if it's new */
2047 if (!found)
2048 {
2049 entry->replicate_valid = false;
2050 entry->schema_sent = false;
2051 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2052 entry->streamed_txns = NIL;
2053 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2054 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2055 entry->new_slot = NULL;
2056 entry->old_slot = NULL;
2057 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2058 entry->entry_cxt = NULL;
2059 entry->publish_as_relid = InvalidOid;
2060 entry->columns = NULL;
2061 entry->attrmap = NULL;
2062 }
2063
2064 /* Validate the entry */
2065 if (!entry->replicate_valid)
2066 {
2067 Oid schemaId = get_rel_namespace(relid);
2068 List *pubids = GetRelationPublications(relid);
2069
2070 /*
2071 * We don't acquire a lock on the namespace system table as we build
2072 * the cache entry using a historic snapshot and all the later changes
2073 * are absorbed while decoding WAL.
2074 */
2075 List *schemaPubids = GetSchemaPublications(schemaId);
2076 ListCell *lc;
2077 Oid publish_as_relid = relid;
2078 int publish_ancestor_level = 0;
2079 bool am_partition = get_rel_relispartition(relid);
2080 char relkind = get_rel_relkind(relid);
2081 List *rel_publications = NIL;
2082
2083 /* Reload publications if needed before use. */
2084 if (!publications_valid)
2085 {
2086 MemoryContextReset(data->pubctx);
2087
2088 oldctx = MemoryContextSwitchTo(data->pubctx);
2089 data->publications = LoadPublications(data->publication_names);
2090 MemoryContextSwitchTo(oldctx);
2091 publications_valid = true;
2092 }
2093
2094 /*
2095 * Reset schema_sent status as the relation definition may have
2096 * changed. Also reset pubactions to empty in case rel was dropped
2097 * from a publication. Also free any objects that depended on the
2098 * earlier definition.
2099 */
2100 entry->schema_sent = false;
2101 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2102 list_free(entry->streamed_txns);
2103 entry->streamed_txns = NIL;
2104 bms_free(entry->columns);
2105 entry->columns = NULL;
2106 entry->pubactions.pubinsert = false;
2107 entry->pubactions.pubupdate = false;
2108 entry->pubactions.pubdelete = false;
2109 entry->pubactions.pubtruncate = false;
2110
2111 /*
2112 * Tuple slots cleanups. (Will be rebuilt later if needed).
2113 */
2114 if (entry->old_slot)
2115 {
2116 TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2117
2118 Assert(desc->tdrefcount == -1);
2119
2120 ExecDropSingleTupleTableSlot(entry->old_slot);
2121
2122 /*
2123 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2124 * do it now to avoid any leaks.
2125 */
2126 FreeTupleDesc(desc);
2127 }
2128 if (entry->new_slot)
2129 {
2130 TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2131
2132 Assert(desc->tdrefcount == -1);
2133
2134 ExecDropSingleTupleTableSlot(entry->new_slot);
2135
2136 /*
2137 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2138 * do it now to avoid any leaks.
2139 */
2140 FreeTupleDesc(desc);
2141 }
2142
2143 entry->old_slot = NULL;
2144 entry->new_slot = NULL;
2145
2146 if (entry->attrmap)
2147 free_attrmap(entry->attrmap);
2148 entry->attrmap = NULL;
2149
2150 /*
2151 * Row filter cache cleanups.
2152 */
2153 if (entry->entry_cxt)
2154 MemoryContextDelete(entry->entry_cxt);
2155
2156 entry->entry_cxt = NULL;
2157 entry->estate = NULL;
2158 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2159
2160 /*
2161 * Build publication cache. We can't use one provided by relcache as
2162 * relcache considers all publications that the given relation is in,
2163 * but here we only need to consider ones that the subscriber
2164 * requested.
2165 */
2166 foreach(lc, data->publications)
2167 {
2168 Publication *pub = lfirst(lc);
2169 bool publish = false;
2170
2171 /*
2172 * Under what relid should we publish changes in this publication?
2173 * We'll use the top-most relid across all publications. Also
2174 * track the ancestor level for this publication.
2175 */
2176 Oid pub_relid = relid;
2177 int ancestor_level = 0;
2178
2179 /*
2180 * If this is a FOR ALL TABLES publication, pick the partition
2181 * root and set the ancestor level accordingly.
2182 */
2183 if (pub->alltables)
2184 {
2185 publish = true;
2186 if (pub->pubviaroot && am_partition)
2187 {
2188 List *ancestors = get_partition_ancestors(relid);
2189
2190 pub_relid = llast_oid(ancestors);
2191 ancestor_level = list_length(ancestors);
2192 }
2193 }
2194
2195 if (!publish)
2196 {
2197 bool ancestor_published = false;
2198
2199 /*
2200 * For a partition, check if any of the ancestors are
2201 * published. If so, note down the topmost ancestor that is
2202 * published via this publication, which will be used as the
2203 * relation via which to publish the partition's changes.
2204 */
2205 if (am_partition)
2206 {
2207 Oid ancestor;
2208 int level;
2209 List *ancestors = get_partition_ancestors(relid);
2210
2211 ancestor = GetTopMostAncestorInPublication(pub->oid,
2212 ancestors,
2213 &level);
2214
2215 if (ancestor != InvalidOid)
2216 {
2217 ancestor_published = true;
2218 if (pub->pubviaroot)
2219 {
2220 pub_relid = ancestor;
2221 ancestor_level = level;
2222 }
2223 }
2224 }
2225
2226 if (list_member_oid(pubids, pub->oid) ||
2227 list_member_oid(schemaPubids, pub->oid) ||
2228 ancestor_published)
2229 publish = true;
2230 }
2231
2232 /*
2233 * If the relation is to be published, determine actions to
2234 * publish, and list of columns, if appropriate.
2235 *
2236 * Don't publish changes for partitioned tables, because
2237 * publishing those of its partitions suffices, unless partition
2238 * changes won't be published due to pubviaroot being set.
2239 */
2240 if (publish &&
2241 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2242 {
2243 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2244 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2245 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2246 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
2247
2248 /*
2249 * We want to publish the changes as the top-most ancestor
2250 * across all publications. So we need to check if the already
2251 * calculated level is higher than the new one. If yes, we can
2252 * ignore the new value (as it's a child). Otherwise the new
2253 * value is an ancestor, so we keep it.
2254 */
2255 if (publish_ancestor_level > ancestor_level)
2256 continue;
2257
2258 /*
2259 * If we found an ancestor higher up in the tree, discard the
2260 * list of publications through which we replicate it, and use
2261 * the new ancestor.
2262 */
2263 if (publish_ancestor_level < ancestor_level)
2264 {
2265 publish_as_relid = pub_relid;
2266 publish_ancestor_level = ancestor_level;
2267
2268 /* reset the publication list for this relation */
2269 rel_publications = NIL;
2270 }
2271 else
2272 {
2273 /* Same ancestor level, has to be the same OID. */
2274 Assert(publish_as_relid == pub_relid);
2275 }
2276
2277 /* Track publications for this ancestor. */
2278 rel_publications = lappend(rel_publications, pub);
2279 }
2280 }
2281
2282 entry->publish_as_relid = publish_as_relid;
2283
2284 /*
2285 * Initialize the tuple slot, map, and row filter. These are only used
2286 * when publishing inserts, updates, or deletes.
2287 */
2288 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2289 entry->pubactions.pubdelete)
2290 {
2291 /* Initialize the tuple slot and map */
2292 init_tuple_slot(data, relation, entry);
2293
2294 /* Initialize the row filter */
2295 pgoutput_row_filter_init(data, rel_publications, entry);
2296
2297 /* Check whether to publish generated columns. */
2298 check_and_init_gencol(data, rel_publications, entry);
2299
2300 /* Initialize the column list */
2301 pgoutput_column_list_init(data, rel_publications, entry);
2302 }
2303
2304 list_free(pubids);
2305 list_free(schemaPubids);
2306 list_free(rel_publications);
2307
2308 entry->replicate_valid = true;
2309 }
2310
2311 return entry;
2312}
2313
2314/*
2315 * Cleanup list of streamed transactions and update the schema_sent flag.
2316 *
2317 * When a streamed transaction commits or aborts, we need to remove the
2318 * toplevel XID from the schema cache. If the transaction aborted, the
2319 * subscriber will simply throw away the schema records we streamed, so
2320 * we don't need to do anything else.
2321 *
2322 * If the transaction is committed, the subscriber will update the relation
2323 * cache - so tweak the schema_sent flag accordingly.
2324 */
2325static void
2326 cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
2327{
2328 HASH_SEQ_STATUS hash_seq;
2329 RelationSyncEntry *entry;
2330
2331 Assert(RelationSyncCache != NULL);
2332
2333 hash_seq_init(&hash_seq, RelationSyncCache);
2334 while ((entry = hash_seq_search(&hash_seq)) != NULL)
2335 {
2336 /*
2337 * We can set the schema_sent flag for an entry that has committed xid
2338 * in the list as that ensures that the subscriber would have the
2339 * corresponding schema and we don't need to send it unless there is
2340 * any invalidation for that relation.
2341 */
2342 foreach_xid(streamed_txn, entry->streamed_txns)
2343 {
2344 if (xid == streamed_txn)
2345 {
2346 if (is_commit)
2347 entry->schema_sent = true;
2348
2349 entry->streamed_txns =
2350 foreach_delete_current(entry->streamed_txns, streamed_txn);
2351 break;
2352 }
2353 }
2354 }
2355}
2356
2357/*
2358 * Relcache invalidation callback
2359 */
2360static void
2361 rel_sync_cache_relation_cb(Datum arg, Oid relid)
2362{
2363 RelationSyncEntry *entry;
2364
2365 /*
2366 * We can get here if the plugin was used in SQL interface as the
2367 * RelationSyncCache is destroyed when the decoding finishes, but there is
2368 * no way to unregister the relcache invalidation callback.
2369 */
2370 if (RelationSyncCache == NULL)
2371 return;
2372
2373 /*
2374 * Nobody keeps pointers to entries in this hash table around outside
2375 * logical decoding callback calls - but invalidation events can come in
2376 * *during* a callback if we do any syscache access in the callback.
2377 * Because of that we must mark the cache entry as invalid but not damage
2378 * any of its substructure here. The next get_rel_sync_entry() call will
2379 * rebuild it all.
2380 */
2381 if (OidIsValid(relid))
2382 {
2383 /*
2384 * Getting invalidations for relations that aren't in the table is
2385 * entirely normal. So we don't care if it's found or not.
2386 */
2387 entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2388 HASH_FIND, NULL);
2389 if (entry != NULL)
2390 entry->replicate_valid = false;
2391 }
2392 else
2393 {
2394 /* Whole cache must be flushed. */
2395 HASH_SEQ_STATUS status;
2396
2397 hash_seq_init(&status, RelationSyncCache);
2398 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2399 {
2400 entry->replicate_valid = false;
2401 }
2402 }
2403}
2404
2405/*
2406 * Publication relation/schema map syscache invalidation callback
2407 *
2408 * Called for invalidations on pg_namespace.
2409 */
2410static void
2411 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
2412{
2413 HASH_SEQ_STATUS status;
2414 RelationSyncEntry *entry;
2415
2416 /*
2417 * We can get here if the plugin was used in SQL interface as the
2418 * RelationSyncCache is destroyed when the decoding finishes, but there is
2419 * no way to unregister the invalidation callbacks.
2420 */
2421 if (RelationSyncCache == NULL)
2422 return;
2423
2424 /*
2425 * We have no easy way to identify which cache entries this invalidation
2426 * event might have affected, so just mark them all invalid.
2427 */
2428 hash_seq_init(&status, RelationSyncCache);
2429 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2430 {
2431 entry->replicate_valid = false;
2432 }
2433}
2434
2435/* Send Replication origin */
2436static void
2437 send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
2438 XLogRecPtr origin_lsn, bool send_origin)
2439{
2440 if (send_origin)
2441 {
2442 char *origin;
2443
2444 /*----------
2445 * XXX: which behaviour do we want here?
2446 *
2447 * Alternatives:
2448 * - don't send origin message if origin name not found
2449 * (that's what we do now)
2450 * - throw error - that will break replication, not good
2451 * - send some special "unknown" origin
2452 *----------
2453 */
2454 if (replorigin_by_oid(origin_id, true, &origin))
2455 {
2456 /* Message boundary */
2457 OutputPluginWrite(ctx, false);
2458 OutputPluginPrepareWrite(ctx, true);
2459
2460 logicalrep_write_origin(ctx->out, origin, origin_lsn);
2461 }
2462 }
2463}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:262
void free_attrmap(AttrMap *map)
Definition: attmap.c:56
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:261
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
static void cleanup(void)
Definition: bootstrap.c:715
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define PG_UINT32_MAX
Definition: c.h:595
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:223
uint32_t uint32
Definition: c.h:538
uint32 TransactionId
Definition: c.h:657
#define OidIsValid(objectId)
Definition: c.h:774
size_t Size
Definition: c.h:610
int64 TimestampTz
Definition: timestamp.h:39
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1380
int errdetail(const char *fmt,...)
Definition: elog.c:1207
int errhint(const char *fmt,...)
Definition: elog.c:1321
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define DEBUG3
Definition: elog.h:28
#define WARNING
Definition: elog.h:36
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:765
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1741
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:85
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1301
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1541
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:773
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define ResetPerTupleExprContext(estate)
Definition: executor.h:662
#define GetPerTupleExprContext(estate)
Definition: executor.h:653
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:433
Assert(PointerIsAligned(start, uint64))
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1854
void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, Datum arg)
Definition: inval.c:1875
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
i
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
#define AccessShareLock
Definition: lockdefs.h:36
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:705
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:718
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:692
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2194
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2119
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:743
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc0(Size size)
Definition: mcxt.c:1395
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:170
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
#define makeNode(_type_)
Definition: nodes.h:161
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:493
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1043
List * get_partition_ancestors(Oid relid)
Definition: partition.c:134
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
void * arg
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
#define list_make1(x1)
Definition: pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
#define llast_oid(l)
Definition: pg_list.h:200
List * GetRelationPublications(Oid relid)
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
bool is_publishable_relation(Relation rel)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1779
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:583
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2411
struct RelationSyncEntry RelationSyncEntry
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:869
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:289
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1184
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:846
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1457
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:106
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:637
struct PGOutputTxnData PGOutputTxnData
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
Definition: pgoutput.c:771
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:433
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1629
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1951
RowFilterPubAction
Definition: pgoutput.c:100
@ PUBACTION_INSERT
Definition: pgoutput.c:101
@ PUBACTION_UPDATE
Definition: pgoutput.c:102
@ PUBACTION_DELETE
Definition: pgoutput.c:103
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2361
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:654
PG_MODULE_MAGIC_EXT(.name="pgoutput",.version=PG_VERSION)
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:2031
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1742
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2437
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:682
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1761
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2326
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1870
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1931
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:700
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:569
static HTAB * RelationSyncCache
Definition: pgoutput.c:220
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:891
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1903
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1038
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:668
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1276
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:2010
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1097
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1849
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1817
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1808
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:260
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1697
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:605
static bool publications_valid
Definition: pgoutput.c:86
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:2000
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:815
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:78
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:293
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:403
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:374
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:667
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1158
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:640
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:187
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:723
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:528
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:583
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:49
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:237
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1104
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:353
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:116
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:1279
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1061
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:450
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1095
tree ctl
Definition: radixtree.h:1838
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:514
#define RelationGetDescr(relation)
Definition: rel.h:540
#define RelationGetRelationName(relation)
Definition: rel.h:548
#define RelationIsValid(relation)
Definition: rel.h:489
#define RelationGetNamespace(relation)
Definition: rel.h:555
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2099
void RelationClose(Relation relation)
Definition: relcache.c:2220
#define rbtxn_is_streamed(txn)
Definition: reorderbuffer.h:228
#define rbtxn_get_toptxn(txn)
Definition: reorderbuffer.h:288
#define rbtxn_is_subtxn(txn)
Definition: reorderbuffer.h:282
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
Definition: attmap.h:35
int16 attlen
Definition: tupdesc.h:71
char * defname
Definition: parsenodes.h:843
Node * arg
Definition: parsenodes.h:844
Definition: execnodes.h:655
CommandId es_output_cid
Definition: execnodes.h:682
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:273
Definition: primnodes.h:189
Definition: hsearch.h:66
Definition: dynahash.c:222
Definition: pg_list.h:54
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
bool twophase_opt_given
Definition: logical.h:101
LogicalDecodeStreamChangeCB stream_change_cb
Definition: output_plugin.h:240
LogicalDecodeMessageCB message_cb
Definition: output_plugin.h:223
LogicalDecodeStreamTruncateCB stream_truncate_cb
Definition: output_plugin.h:242
LogicalDecodeStreamMessageCB stream_message_cb
Definition: output_plugin.h:241
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: output_plugin.h:224
LogicalDecodeTruncateCB truncate_cb
Definition: output_plugin.h:221
LogicalDecodeStreamStopCB stream_stop_cb
Definition: output_plugin.h:236
LogicalDecodeStreamCommitCB stream_commit_cb
Definition: output_plugin.h:239
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
Definition: output_plugin.h:232
LogicalDecodeStreamPrepareCB stream_prepare_cb
Definition: output_plugin.h:238
LogicalDecodeCommitPreparedCB commit_prepared_cb
Definition: output_plugin.h:231
LogicalDecodeStreamStartCB stream_start_cb
Definition: output_plugin.h:235
LogicalDecodePrepareCB prepare_cb
Definition: output_plugin.h:230
LogicalDecodeStartupCB startup_cb
Definition: output_plugin.h:218
LogicalDecodeCommitCB commit_cb
Definition: output_plugin.h:222
LogicalDecodeBeginCB begin_cb
Definition: output_plugin.h:219
LogicalDecodeStreamAbortCB stream_abort_cb
Definition: output_plugin.h:237
LogicalDecodeBeginPrepareCB begin_prepare_cb
Definition: output_plugin.h:229
LogicalDecodeChangeCB change_cb
Definition: output_plugin.h:220
LogicalDecodeShutdownCB shutdown_cb
Definition: output_plugin.h:225
OutputPluginOutputType output_type
Definition: output_plugin.h:28
bool sent_begin_txn
Definition: pgoutput.c:216
bool alltables
bool pubviaroot
PublicationActions pubactions
RTEKind rtekind
Definition: parsenodes.h:1078
Definition: rel.h:56
Form_pg_class rd_rel
Definition: rel.h:111
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:155
Bitmapset * columns
Definition: pgoutput.c:181
PublicationActions pubactions
Definition: pgoutput.c:146
TupleTableSlot * old_slot
Definition: pgoutput.c:158
PublishGencolsType include_gencols_type
Definition: pgoutput.c:141
bool replicate_valid
Definition: pgoutput.c:130
MemoryContext entry_cxt
Definition: pgoutput.c:187
EState * estate
Definition: pgoutput.c:156
bool schema_sent
Definition: pgoutput.c:132
TupleTableSlot * new_slot
Definition: pgoutput.c:157
List * streamed_txns
Definition: pgoutput.c:142
AttrMap * attrmap
Definition: pgoutput.c:174
Oid publish_as_relid
Definition: pgoutput.c:166
struct ReorderBufferChange::@114::@116 truncate
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
HeapTuple newtuple
Definition: reorderbuffer.h:106
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
struct ReorderBufferChange::@114::@115 tp
union ReorderBufferChange::@114 data
HeapTuple oldtuple
Definition: reorderbuffer.h:104
RepOriginId origin_id
Definition: reorderbuffer.h:350
TimestampTz abort_time
Definition: reorderbuffer.h:361
void * output_plugin_private
Definition: reorderbuffer.h:467
XLogRecPtr origin_lsn
Definition: reorderbuffer.h:351
TransactionId xid
Definition: reorderbuffer.h:299
Definition: value.h:64
int natts
Definition: tupdesc.h:137
int tdrefcount
Definition: tupdesc.h:140
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:122
bool * tts_isnull
Definition: tuptable.h:126
Datum * tts_values
Definition: tuptable.h:124
Definition: oid2name.c:30
Definition: regguts.h:323
char defGetStreamingMode(DefElem *def)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:595
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:230
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:102
#define InvalidTransactionId
Definition: transam.h:31
#define FirstGenbkiObjectId
Definition: transam.h:195
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:193
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:340
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:502
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:175
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:457
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:371
Definition: pg_list.h:46
#define strVal(v)
Definition: value.h:82
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition: varatt.h:361
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:2744
const char * name
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829
uint16 RepOriginId
Definition: xlogdefs.h:68
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

AltStyle によって変換されたページ (->オリジナル) /