PostgreSQL Source Code: contrib/test_decoding/test_decoding.c Source File

PostgreSQL Source Code git master
test_decoding.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * test_decoding.c
4 * example logical decoding output plugin
5 *
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * contrib/test_decoding/test_decoding.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "catalog/pg_type.h"
16
17#include "replication/logical.h"
18#include "replication/origin.h"
19
20#include "utils/builtins.h"
21#include "utils/lsyscache.h"
22#include "utils/memutils.h"
23#include "utils/rel.h"
24
25 PG_MODULE_MAGIC_EXT(
26 .name = "test_decoding",
27 .version = PG_VERSION
28);
29
30 typedef struct
31{
32 MemoryContext context;
33 bool include_xids;
34 bool include_timestamp;
35 bool skip_empty_xacts;
36 bool only_local;
37} TestDecodingData;
38
39/*
40 * Maintain the per-transaction level variables to track whether the
41 * transaction and or streams have written any changes. In streaming mode the
42 * transaction can be decoded in streams so along with maintaining whether the
43 * transaction has written any changes, we also need to track whether the
44 * current stream has written any changes. This is required so that if user
45 * has requested to skip the empty transactions we can skip the empty streams
46 * even though the transaction has written some changes.
47 */
48 typedef struct
49{
50 bool xact_wrote_changes;
51 bool stream_wrote_changes;
52} TestDecodingTxnData;
53
54static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
55 bool is_init);
56static void pg_decode_shutdown(LogicalDecodingContext *ctx);
57static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
58 ReorderBufferTXN *txn);
59static void pg_output_begin(LogicalDecodingContext *ctx,
60 TestDecodingData *data,
61 ReorderBufferTXN *txn,
62 bool last_write);
63static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
64 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
65static void pg_decode_change(LogicalDecodingContext *ctx,
66 ReorderBufferTXN *txn, Relation relation,
67 ReorderBufferChange *change);
68static void pg_decode_truncate(LogicalDecodingContext *ctx,
69 ReorderBufferTXN *txn,
70 int nrelations, Relation relations[],
71 ReorderBufferChange *change);
72static bool pg_decode_filter(LogicalDecodingContext *ctx,
73 RepOriginId origin_id);
74static void pg_decode_message(LogicalDecodingContext *ctx,
75 ReorderBufferTXN *txn, XLogRecPtr lsn,
76 bool transactional, const char *prefix,
77 Size sz, const char *message);
78static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
79 TransactionId xid,
80 const char *gid);
81static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
82 ReorderBufferTXN *txn);
83static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
84 ReorderBufferTXN *txn,
85 XLogRecPtr prepare_lsn);
86static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
87 ReorderBufferTXN *txn,
88 XLogRecPtr commit_lsn);
89static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
90 ReorderBufferTXN *txn,
91 XLogRecPtr prepare_end_lsn,
92 TimestampTz prepare_time);
93static void pg_decode_stream_start(LogicalDecodingContext *ctx,
94 ReorderBufferTXN *txn);
95static void pg_output_stream_start(LogicalDecodingContext *ctx,
96 TestDecodingData *data,
97 ReorderBufferTXN *txn,
98 bool last_write);
99static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
100 ReorderBufferTXN *txn);
101static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
102 ReorderBufferTXN *txn,
103 XLogRecPtr abort_lsn);
104static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
105 ReorderBufferTXN *txn,
106 XLogRecPtr prepare_lsn);
107static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
108 ReorderBufferTXN *txn,
109 XLogRecPtr commit_lsn);
110static void pg_decode_stream_change(LogicalDecodingContext *ctx,
111 ReorderBufferTXN *txn,
112 Relation relation,
113 ReorderBufferChange *change);
114static void pg_decode_stream_message(LogicalDecodingContext *ctx,
115 ReorderBufferTXN *txn, XLogRecPtr lsn,
116 bool transactional, const char *prefix,
117 Size sz, const char *message);
118static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
119 ReorderBufferTXN *txn,
120 int nrelations, Relation relations[],
121 ReorderBufferChange *change);
122
123void
124 _PG_init(void)
125{
126 /* other plugins can perform things here */
127}
128
129/* specify output plugin callbacks */
130void
131 _PG_output_plugin_init(OutputPluginCallbacks *cb)
132{
133 cb->startup_cb = pg_decode_startup;
134 cb->begin_cb = pg_decode_begin_txn;
135 cb->change_cb = pg_decode_change;
136 cb->truncate_cb = pg_decode_truncate;
137 cb->commit_cb = pg_decode_commit_txn;
138 cb->filter_by_origin_cb = pg_decode_filter;
139 cb->shutdown_cb = pg_decode_shutdown;
140 cb->message_cb = pg_decode_message;
141 cb->filter_prepare_cb = pg_decode_filter_prepare;
142 cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
143 cb->prepare_cb = pg_decode_prepare_txn;
144 cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
145 cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
146 cb->stream_start_cb = pg_decode_stream_start;
147 cb->stream_stop_cb = pg_decode_stream_stop;
148 cb->stream_abort_cb = pg_decode_stream_abort;
149 cb->stream_prepare_cb = pg_decode_stream_prepare;
150 cb->stream_commit_cb = pg_decode_stream_commit;
151 cb->stream_change_cb = pg_decode_stream_change;
152 cb->stream_message_cb = pg_decode_stream_message;
153 cb->stream_truncate_cb = pg_decode_stream_truncate;
154}
155
156
157/* initialize this plugin */
158static void
159 pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
160 bool is_init)
161{
162 ListCell *option;
163 TestDecodingData *data;
164 bool enable_streaming = false;
165
166 data = palloc0(sizeof(TestDecodingData));
167 data->context = AllocSetContextCreate(ctx->context,
168 "text conversion context",
169 ALLOCSET_DEFAULT_SIZES);
170 data->include_xids = true;
171 data->include_timestamp = false;
172 data->skip_empty_xacts = false;
173 data->only_local = false;
174
175 ctx->output_plugin_private = data;
176
177 opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
178 opt->receive_rewrites = false;
179
180 foreach(option, ctx->output_plugin_options)
181 {
182 DefElem *elem = lfirst(option);
183
184 Assert(elem->arg == NULL || IsA(elem->arg, String));
185
186 if (strcmp(elem->defname, "include-xids") == 0)
187 {
188 /* if option does not provide a value, it means its value is true */
189 if (elem->arg == NULL)
190 data->include_xids = true;
191 else if (!parse_bool(strVal(elem->arg), &data->include_xids))
192 ereport(ERROR,
193 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
194 errmsg("could not parse value \"%s\" for parameter \"%s\"",
195 strVal(elem->arg), elem->defname)));
196 }
197 else if (strcmp(elem->defname, "include-timestamp") == 0)
198 {
199 if (elem->arg == NULL)
200 data->include_timestamp = true;
201 else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
202 ereport(ERROR,
203 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
204 errmsg("could not parse value \"%s\" for parameter \"%s\"",
205 strVal(elem->arg), elem->defname)));
206 }
207 else if (strcmp(elem->defname, "force-binary") == 0)
208 {
209 bool force_binary;
210
211 if (elem->arg == NULL)
212 continue;
213 else if (!parse_bool(strVal(elem->arg), &force_binary))
214 ereport(ERROR,
215 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
216 errmsg("could not parse value \"%s\" for parameter \"%s\"",
217 strVal(elem->arg), elem->defname)));
218
219 if (force_binary)
220 opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
221 }
222 else if (strcmp(elem->defname, "skip-empty-xacts") == 0)
223 {
224
225 if (elem->arg == NULL)
226 data->skip_empty_xacts = true;
227 else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
228 ereport(ERROR,
229 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
230 errmsg("could not parse value \"%s\" for parameter \"%s\"",
231 strVal(elem->arg), elem->defname)));
232 }
233 else if (strcmp(elem->defname, "only-local") == 0)
234 {
235
236 if (elem->arg == NULL)
237 data->only_local = true;
238 else if (!parse_bool(strVal(elem->arg), &data->only_local))
239 ereport(ERROR,
240 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
241 errmsg("could not parse value \"%s\" for parameter \"%s\"",
242 strVal(elem->arg), elem->defname)));
243 }
244 else if (strcmp(elem->defname, "include-rewrites") == 0)
245 {
246
247 if (elem->arg == NULL)
248 continue;
249 else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
250 ereport(ERROR,
251 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 errmsg("could not parse value \"%s\" for parameter \"%s\"",
253 strVal(elem->arg), elem->defname)));
254 }
255 else if (strcmp(elem->defname, "stream-changes") == 0)
256 {
257 if (elem->arg == NULL)
258 continue;
259 else if (!parse_bool(strVal(elem->arg), &enable_streaming))
260 ereport(ERROR,
261 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
262 errmsg("could not parse value \"%s\" for parameter \"%s\"",
263 strVal(elem->arg), elem->defname)));
264 }
265 else
266 {
267 ereport(ERROR,
268 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
269 errmsg("option \"%s\" = \"%s\" is unknown",
270 elem->defname,
271 elem->arg ? strVal(elem->arg) : "(null)")));
272 }
273 }
274
275 ctx->streaming &= enable_streaming;
276}
277
278/* cleanup this plugin's resources */
279static void
280 pg_decode_shutdown(LogicalDecodingContext *ctx)
281{
282 TestDecodingData *data = ctx->output_plugin_private;
283
284 /* cleanup our own resources via memory context reset */
285 MemoryContextDelete(data->context);
286}
287
288/* BEGIN callback */
289static void
290 pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
291{
292 TestDecodingData *data = ctx->output_plugin_private;
293 TestDecodingTxnData *txndata =
294 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
295
296 txndata->xact_wrote_changes = false;
297 txn->output_plugin_private = txndata;
298
299 /*
300 * If asked to skip empty transactions, we'll emit BEGIN at the point
301 * where the first operation is received for this transaction.
302 */
303 if (data->skip_empty_xacts)
304 return;
305
306 pg_output_begin(ctx, data, txn, true);
307}
308
309static void
310 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
311{
312 OutputPluginPrepareWrite(ctx, last_write);
313 if (data->include_xids)
314 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
315 else
316 appendStringInfoString(ctx->out, "BEGIN");
317 OutputPluginWrite(ctx, last_write);
318}
319
320/* COMMIT callback */
321static void
322 pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
323 XLogRecPtr commit_lsn)
324{
325 TestDecodingData *data = ctx->output_plugin_private;
326 TestDecodingTxnData *txndata = txn->output_plugin_private;
327 bool xact_wrote_changes = txndata->xact_wrote_changes;
328
329 pfree(txndata);
330 txn->output_plugin_private = NULL;
331
332 if (data->skip_empty_xacts && !xact_wrote_changes)
333 return;
334
335 OutputPluginPrepareWrite(ctx, true);
336 if (data->include_xids)
337 appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
338 else
339 appendStringInfoString(ctx->out, "COMMIT");
340
341 if (data->include_timestamp)
342 appendStringInfo(ctx->out, " (at %s)",
343 timestamptz_to_str(txn->commit_time));
344
345 OutputPluginWrite(ctx, true);
346}
347
348/* BEGIN PREPARE callback */
349static void
350 pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
351{
352 TestDecodingData *data = ctx->output_plugin_private;
353 TestDecodingTxnData *txndata =
354 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
355
356 txndata->xact_wrote_changes = false;
357 txn->output_plugin_private = txndata;
358
359 /*
360 * If asked to skip empty transactions, we'll emit BEGIN at the point
361 * where the first operation is received for this transaction.
362 */
363 if (data->skip_empty_xacts)
364 return;
365
366 pg_output_begin(ctx, data, txn, true);
367}
368
369/* PREPARE callback */
370static void
371 pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
372 XLogRecPtr prepare_lsn)
373{
374 TestDecodingData *data = ctx->output_plugin_private;
375 TestDecodingTxnData *txndata = txn->output_plugin_private;
376
377 /*
378 * If asked to skip empty transactions, we'll emit PREPARE at the point
379 * where the first operation is received for this transaction.
380 */
381 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
382 return;
383
384 OutputPluginPrepareWrite(ctx, true);
385
386 appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
387 quote_literal_cstr(txn->gid));
388
389 if (data->include_xids)
390 appendStringInfo(ctx->out, ", txid %u", txn->xid);
391
392 if (data->include_timestamp)
393 appendStringInfo(ctx->out, " (at %s)",
394 timestamptz_to_str(txn->prepare_time));
395
396 OutputPluginWrite(ctx, true);
397}
398
399/* COMMIT PREPARED callback */
400static void
401 pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
402 XLogRecPtr commit_lsn)
403{
404 TestDecodingData *data = ctx->output_plugin_private;
405
406 OutputPluginPrepareWrite(ctx, true);
407
408 appendStringInfo(ctx->out, "COMMIT PREPARED %s",
409 quote_literal_cstr(txn->gid));
410
411 if (data->include_xids)
412 appendStringInfo(ctx->out, ", txid %u", txn->xid);
413
414 if (data->include_timestamp)
415 appendStringInfo(ctx->out, " (at %s)",
416 timestamptz_to_str(txn->commit_time));
417
418 OutputPluginWrite(ctx, true);
419}
420
421/* ROLLBACK PREPARED callback */
422static void
423 pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
424 ReorderBufferTXN *txn,
425 XLogRecPtr prepare_end_lsn,
426 TimestampTz prepare_time)
427{
428 TestDecodingData *data = ctx->output_plugin_private;
429
430 OutputPluginPrepareWrite(ctx, true);
431
432 appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
433 quote_literal_cstr(txn->gid));
434
435 if (data->include_xids)
436 appendStringInfo(ctx->out, ", txid %u", txn->xid);
437
438 if (data->include_timestamp)
439 appendStringInfo(ctx->out, " (at %s)",
440 timestamptz_to_str(txn->commit_time));
441
442 OutputPluginWrite(ctx, true);
443}
444
445/*
446 * Filter out two-phase transactions.
447 *
448 * Each plugin can implement its own filtering logic. Here we demonstrate a
449 * simple logic by checking the GID. If the GID contains the "_nodecode"
450 * substring, then we filter it out.
451 */
452static bool
453 pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
454 const char *gid)
455{
456 if (strstr(gid, "_nodecode") != NULL)
457 return true;
458
459 return false;
460}
461
462static bool
463 pg_decode_filter(LogicalDecodingContext *ctx,
464 RepOriginId origin_id)
465{
466 TestDecodingData *data = ctx->output_plugin_private;
467
468 if (data->only_local && origin_id != InvalidRepOriginId)
469 return true;
470 return false;
471}
472
473/*
474 * Print literal `outputstr' already represented as string of type `typid'
475 * into stringbuf `s'.
476 *
477 * Some builtin types aren't quoted, the rest is quoted. Escaping is done as
478 * if standard_conforming_strings were enabled.
479 */
480static void
481 print_literal(StringInfo s, Oid typid, char *outputstr)
482{
483 const char *valptr;
484
485 switch (typid)
486 {
487 case INT2OID:
488 case INT4OID:
489 case INT8OID:
490 case OIDOID:
491 case FLOAT4OID:
492 case FLOAT8OID:
493 case NUMERICOID:
494 /* NB: We don't care about Inf, NaN et al. */
495 appendStringInfoString(s, outputstr);
496 break;
497
498 case BITOID:
499 case VARBITOID:
500 appendStringInfo(s, "B'%s'", outputstr);
501 break;
502
503 case BOOLOID:
504 if (strcmp(outputstr, "t") == 0)
505 appendStringInfoString(s, "true");
506 else
507 appendStringInfoString(s, "false");
508 break;
509
510 default:
511 appendStringInfoChar(s, '\'');
512 for (valptr = outputstr; *valptr; valptr++)
513 {
514 char ch = *valptr;
515
516 if (SQL_STR_DOUBLE(ch, false))
517 appendStringInfoChar(s, ch);
518 appendStringInfoChar(s, ch);
519 }
520 appendStringInfoChar(s, '\'');
521 break;
522 }
523}
524
525/* print the tuple 'tuple' into the StringInfo s */
526static void
527 tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
528{
529 int natt;
530
531 /* print all columns individually */
532 for (natt = 0; natt < tupdesc->natts; natt++)
533 {
534 Form_pg_attribute attr; /* the attribute itself */
535 Oid typid; /* type of current attribute */
536 Oid typoutput; /* output function */
537 bool typisvarlena;
538 Datum origval; /* possibly toasted Datum */
539 bool isnull; /* column is null? */
540
541 attr = TupleDescAttr(tupdesc, natt);
542
543 /*
544 * don't print dropped columns, we can't be sure everything is
545 * available for them
546 */
547 if (attr->attisdropped)
548 continue;
549
550 /*
551 * Don't print system columns, oid will already have been printed if
552 * present.
553 */
554 if (attr->attnum < 0)
555 continue;
556
557 typid = attr->atttypid;
558
559 /* get Datum from tuple */
560 origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
561
562 if (isnull && skip_nulls)
563 continue;
564
565 /* print attribute name */
566 appendStringInfoChar(s, ' ');
567 appendStringInfoString(s, quote_identifier(NameStr(attr->attname)));
568
569 /* print attribute type */
570 appendStringInfoChar(s, '[');
571 appendStringInfoString(s, format_type_be(typid));
572 appendStringInfoChar(s, ']');
573
574 /* query output function */
575 getTypeOutputInfo(typid,
576 &typoutput, &typisvarlena);
577
578 /* print separator */
579 appendStringInfoChar(s, ':');
580
581 /* print data */
582 if (isnull)
583 appendStringInfoString(s, "null");
584 else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(origval)))
585 appendStringInfoString(s, "unchanged-toast-datum");
586 else if (!typisvarlena)
587 print_literal(s, typid,
588 OidOutputFunctionCall(typoutput, origval));
589 else
590 {
591 Datum val; /* definitely detoasted Datum */
592
593 val = PointerGetDatum(PG_DETOAST_DATUM(origval));
594 print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
595 }
596 }
597}
598
599/*
600 * callback for individual changed tuples
601 */
602static void
603 pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
604 Relation relation, ReorderBufferChange *change)
605{
606 TestDecodingData *data;
607 TestDecodingTxnData *txndata;
608 Form_pg_class class_form;
609 TupleDesc tupdesc;
610 MemoryContext old;
611
612 data = ctx->output_plugin_private;
613 txndata = txn->output_plugin_private;
614
615 /* output BEGIN if we haven't yet */
616 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
617 {
618 pg_output_begin(ctx, data, txn, false);
619 }
620 txndata->xact_wrote_changes = true;
621
622 class_form = RelationGetForm(relation);
623 tupdesc = RelationGetDescr(relation);
624
625 /* Avoid leaking memory by using and resetting our own context */
626 old = MemoryContextSwitchTo(data->context);
627
628 OutputPluginPrepareWrite(ctx, true);
629
630 appendStringInfoString(ctx->out, "table ");
631 appendStringInfoString(ctx->out,
632 quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
633 class_form->relrewrite ?
634 get_rel_name(class_form->relrewrite) :
635 NameStr(class_form->relname)));
636 appendStringInfoChar(ctx->out, ':');
637
638 switch (change->action)
639 {
640 case REORDER_BUFFER_CHANGE_INSERT:
641 appendStringInfoString(ctx->out, " INSERT:");
642 if (change->data.tp.newtuple == NULL)
643 appendStringInfoString(ctx->out, " (no-tuple-data)");
644 else
645 tuple_to_stringinfo(ctx->out, tupdesc,
646 change->data.tp.newtuple,
647 false);
648 break;
649 case REORDER_BUFFER_CHANGE_UPDATE:
650 appendStringInfoString(ctx->out, " UPDATE:");
651 if (change->data.tp.oldtuple != NULL)
652 {
653 appendStringInfoString(ctx->out, " old-key:");
654 tuple_to_stringinfo(ctx->out, tupdesc,
655 change->data.tp.oldtuple,
656 true);
657 appendStringInfoString(ctx->out, " new-tuple:");
658 }
659
660 if (change->data.tp.newtuple == NULL)
661 appendStringInfoString(ctx->out, " (no-tuple-data)");
662 else
663 tuple_to_stringinfo(ctx->out, tupdesc,
664 change->data.tp.newtuple,
665 false);
666 break;
667 case REORDER_BUFFER_CHANGE_DELETE:
668 appendStringInfoString(ctx->out, " DELETE:");
669
670 /* if there was no PK, we only know that a delete happened */
671 if (change->data.tp.oldtuple == NULL)
672 appendStringInfoString(ctx->out, " (no-tuple-data)");
673 /* In DELETE, only the replica identity is present; display that */
674 else
675 tuple_to_stringinfo(ctx->out, tupdesc,
676 change->data.tp.oldtuple,
677 true);
678 break;
679 default:
680 Assert(false);
681 }
682
683 MemoryContextSwitchTo(old);
684 MemoryContextReset(data->context);
685
686 OutputPluginWrite(ctx, true);
687}
688
689static void
690 pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
691 int nrelations, Relation relations[], ReorderBufferChange *change)
692{
693 TestDecodingData *data;
694 TestDecodingTxnData *txndata;
695 MemoryContext old;
696 int i;
697
698 data = ctx->output_plugin_private;
699 txndata = txn->output_plugin_private;
700
701 /* output BEGIN if we haven't yet */
702 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
703 {
704 pg_output_begin(ctx, data, txn, false);
705 }
706 txndata->xact_wrote_changes = true;
707
708 /* Avoid leaking memory by using and resetting our own context */
709 old = MemoryContextSwitchTo(data->context);
710
711 OutputPluginPrepareWrite(ctx, true);
712
713 appendStringInfoString(ctx->out, "table ");
714
715 for (i = 0; i < nrelations; i++)
716 {
717 if (i > 0)
718 appendStringInfoString(ctx->out, ", ");
719
720 appendStringInfoString(ctx->out,
721 quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
722 NameStr(relations[i]->rd_rel->relname)));
723 }
724
725 appendStringInfoString(ctx->out, ": TRUNCATE:");
726
727 if (change->data.truncate.restart_seqs
728 || change->data.truncate.cascade)
729 {
730 if (change->data.truncate.restart_seqs)
731 appendStringInfoString(ctx->out, " restart_seqs");
732 if (change->data.truncate.cascade)
733 appendStringInfoString(ctx->out, " cascade");
734 }
735 else
736 appendStringInfoString(ctx->out, " (no-flags)");
737
738 MemoryContextSwitchTo(old);
739 MemoryContextReset(data->context);
740
741 OutputPluginWrite(ctx, true);
742}
743
744static void
745 pg_decode_message(LogicalDecodingContext *ctx,
746 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
747 const char *prefix, Size sz, const char *message)
748{
749 TestDecodingData *data = ctx->output_plugin_private;
750 TestDecodingTxnData *txndata;
751
752 txndata = transactional ? txn->output_plugin_private : NULL;
753
754 /* output BEGIN if we haven't yet for transactional messages */
755 if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
756 pg_output_begin(ctx, data, txn, false);
757
758 if (transactional)
759 txndata->xact_wrote_changes = true;
760
761 OutputPluginPrepareWrite(ctx, true);
762 appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
763 transactional, prefix, sz);
764 appendBinaryStringInfo(ctx->out, message, sz);
765 OutputPluginWrite(ctx, true);
766}
767
768static void
769 pg_decode_stream_start(LogicalDecodingContext *ctx,
770 ReorderBufferTXN *txn)
771{
772 TestDecodingData *data = ctx->output_plugin_private;
773 TestDecodingTxnData *txndata = txn->output_plugin_private;
774
775 /*
776 * Allocate the txn plugin data for the first stream in the transaction.
777 */
778 if (txndata == NULL)
779 {
780 txndata =
781 MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
782 txndata->xact_wrote_changes = false;
783 txn->output_plugin_private = txndata;
784 }
785
786 txndata->stream_wrote_changes = false;
787 if (data->skip_empty_xacts)
788 return;
789 pg_output_stream_start(ctx, data, txn, true);
790}
791
792static void
793 pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
794{
795 OutputPluginPrepareWrite(ctx, last_write);
796 if (data->include_xids)
797 appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
798 else
799 appendStringInfoString(ctx->out, "opening a streamed block for transaction");
800 OutputPluginWrite(ctx, last_write);
801}
802
803static void
804 pg_decode_stream_stop(LogicalDecodingContext *ctx,
805 ReorderBufferTXN *txn)
806{
807 TestDecodingData *data = ctx->output_plugin_private;
808 TestDecodingTxnData *txndata = txn->output_plugin_private;
809
810 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
811 return;
812
813 OutputPluginPrepareWrite(ctx, true);
814 if (data->include_xids)
815 appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
816 else
817 appendStringInfoString(ctx->out, "closing a streamed block for transaction");
818 OutputPluginWrite(ctx, true);
819}
820
821static void
822 pg_decode_stream_abort(LogicalDecodingContext *ctx,
823 ReorderBufferTXN *txn,
824 XLogRecPtr abort_lsn)
825{
826 TestDecodingData *data = ctx->output_plugin_private;
827
828 /*
829 * stream abort can be sent for an individual subtransaction but we
830 * maintain the output_plugin_private only under the toptxn so if this is
831 * not the toptxn then fetch the toptxn.
832 */
833 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
834 TestDecodingTxnData *txndata = toptxn->output_plugin_private;
835 bool xact_wrote_changes = txndata->xact_wrote_changes;
836
837 if (rbtxn_is_toptxn(txn))
838 {
839 Assert(txn->output_plugin_private != NULL);
840 pfree(txndata);
841 txn->output_plugin_private = NULL;
842 }
843
844 if (data->skip_empty_xacts && !xact_wrote_changes)
845 return;
846
847 OutputPluginPrepareWrite(ctx, true);
848 if (data->include_xids)
849 appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
850 else
851 appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
852 OutputPluginWrite(ctx, true);
853}
854
855static void
856 pg_decode_stream_prepare(LogicalDecodingContext *ctx,
857 ReorderBufferTXN *txn,
858 XLogRecPtr prepare_lsn)
859{
860 TestDecodingData *data = ctx->output_plugin_private;
861 TestDecodingTxnData *txndata = txn->output_plugin_private;
862
863 if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
864 return;
865
866 OutputPluginPrepareWrite(ctx, true);
867
868 if (data->include_xids)
869 appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u",
870 quote_literal_cstr(txn->gid), txn->xid);
871 else
872 appendStringInfo(ctx->out, "preparing streamed transaction %s",
873 quote_literal_cstr(txn->gid));
874
875 if (data->include_timestamp)
876 appendStringInfo(ctx->out, " (at %s)",
877 timestamptz_to_str(txn->prepare_time));
878
879 OutputPluginWrite(ctx, true);
880}
881
882static void
883 pg_decode_stream_commit(LogicalDecodingContext *ctx,
884 ReorderBufferTXN *txn,
885 XLogRecPtr commit_lsn)
886{
887 TestDecodingData *data = ctx->output_plugin_private;
888 TestDecodingTxnData *txndata = txn->output_plugin_private;
889 bool xact_wrote_changes = txndata->xact_wrote_changes;
890
891 pfree(txndata);
892 txn->output_plugin_private = NULL;
893
894 if (data->skip_empty_xacts && !xact_wrote_changes)
895 return;
896
897 OutputPluginPrepareWrite(ctx, true);
898
899 if (data->include_xids)
900 appendStringInfo(ctx->out, "committing streamed transaction TXN %u", txn->xid);
901 else
902 appendStringInfoString(ctx->out, "committing streamed transaction");
903
904 if (data->include_timestamp)
905 appendStringInfo(ctx->out, " (at %s)",
906 timestamptz_to_str(txn->commit_time));
907
908 OutputPluginWrite(ctx, true);
909}
910
911/*
912 * In streaming mode, we don't display the changes as the transaction can abort
913 * at a later point in time. We don't want users to see the changes until the
914 * transaction is committed.
915 */
916static void
917 pg_decode_stream_change(LogicalDecodingContext *ctx,
918 ReorderBufferTXN *txn,
919 Relation relation,
920 ReorderBufferChange *change)
921{
922 TestDecodingData *data = ctx->output_plugin_private;
923 TestDecodingTxnData *txndata = txn->output_plugin_private;
924
925 /* output stream start if we haven't yet */
926 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
927 {
928 pg_output_stream_start(ctx, data, txn, false);
929 }
930 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
931
932 OutputPluginPrepareWrite(ctx, true);
933 if (data->include_xids)
934 appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
935 else
936 appendStringInfoString(ctx->out, "streaming change for transaction");
937 OutputPluginWrite(ctx, true);
938}
939
940/*
941 * In streaming mode, we don't display the contents for transactional messages
942 * as the transaction can abort at a later point in time. We don't want users to
943 * see the message contents until the transaction is committed.
944 */
945static void
946 pg_decode_stream_message(LogicalDecodingContext *ctx,
947 ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
948 const char *prefix, Size sz, const char *message)
949{
950 /* Output stream start if we haven't yet for transactional messages. */
951 if (transactional)
952 {
953 TestDecodingData *data = ctx->output_plugin_private;
954 TestDecodingTxnData *txndata = txn->output_plugin_private;
955
956 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
957 {
958 pg_output_stream_start(ctx, data, txn, false);
959 }
960 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
961 }
962
963 OutputPluginPrepareWrite(ctx, true);
964
965 if (transactional)
966 {
967 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu",
968 transactional, prefix, sz);
969 }
970 else
971 {
972 appendStringInfo(ctx->out, "streaming message: transactional: %d prefix: %s, sz: %zu content:",
973 transactional, prefix, sz);
974 appendBinaryStringInfo(ctx->out, message, sz);
975 }
976
977 OutputPluginWrite(ctx, true);
978}
979
980/*
981 * In streaming mode, we don't display the detailed information of Truncate.
982 * See pg_decode_stream_change.
983 */
984static void
985 pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
986 int nrelations, Relation relations[],
987 ReorderBufferChange *change)
988{
989 TestDecodingData *data = ctx->output_plugin_private;
990 TestDecodingTxnData *txndata = txn->output_plugin_private;
991
992 if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
993 {
994 pg_output_stream_start(ctx, data, txn, false);
995 }
996 txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
997
998 OutputPluginPrepareWrite(ctx, true);
999 if (data->include_xids)
1000 appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
1001 else
1002 appendStringInfoString(ctx->out, "streaming truncate for transaction");
1003 OutputPluginWrite(ctx, true);
1004}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1862
bool parse_bool(const char *value, bool *result)
Definition: bool.c:31
#define NameStr(name)
Definition: c.h:751
#define SQL_STR_DOUBLE(ch, escape_backslash)
Definition: c.h:1162
uint32 TransactionId
Definition: c.h:657
size_t Size
Definition: c.h:610
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
char * OidOutputFunctionCall(Oid functionId, Datum val)
Definition: fmgr.c:1763
#define PG_DETOAST_DATUM(datum)
Definition: fmgr.h:240
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
Assert(PointerIsAligned(start, uint64))
static Datum heap_getattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:904
long val
Definition: informix.c:689
i
int i
Definition: isn.c:77
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:705
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:692
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
void getTypeOutputInfo(Oid type, Oid *typOutput, bool *typIsVarlena)
Definition: lsyscache.c:3074
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2119
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
void pfree(void *pointer)
Definition: mcxt.c:1594
void * palloc0(Size size)
Definition: mcxt.c:1395
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
@ OUTPUT_PLUGIN_TEXTUAL_OUTPUT
Definition: output_plugin.h:20
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
FormData_pg_class * Form_pg_class
Definition: pg_class.h:156
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
uint64_t Datum
Definition: postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
unsigned int Oid
Definition: postgres_ext.h:32
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define RelationGetForm(relation)
Definition: rel.h:508
#define RelationGetRelid(relation)
Definition: rel.h:514
#define RelationGetDescr(relation)
Definition: rel.h:540
#define rbtxn_is_toptxn(txn)
Definition: reorderbuffer.h:276
#define rbtxn_get_toptxn(txn)
Definition: reorderbuffer.h:288
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13119
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:13035
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:281
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
char * defname
Definition: parsenodes.h:843
Node * arg
Definition: parsenodes.h:844
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
Definition: output_plugin.h:240
LogicalDecodeMessageCB message_cb
Definition: output_plugin.h:223
LogicalDecodeStreamTruncateCB stream_truncate_cb
Definition: output_plugin.h:242
LogicalDecodeStreamMessageCB stream_message_cb
Definition: output_plugin.h:241
LogicalDecodeFilterPrepareCB filter_prepare_cb
Definition: output_plugin.h:228
LogicalDecodeFilterByOriginCB filter_by_origin_cb
Definition: output_plugin.h:224
LogicalDecodeTruncateCB truncate_cb
Definition: output_plugin.h:221
LogicalDecodeStreamStopCB stream_stop_cb
Definition: output_plugin.h:236
LogicalDecodeStreamCommitCB stream_commit_cb
Definition: output_plugin.h:239
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
Definition: output_plugin.h:232
LogicalDecodeStreamPrepareCB stream_prepare_cb
Definition: output_plugin.h:238
LogicalDecodeCommitPreparedCB commit_prepared_cb
Definition: output_plugin.h:231
LogicalDecodeStreamStartCB stream_start_cb
Definition: output_plugin.h:235
LogicalDecodePrepareCB prepare_cb
Definition: output_plugin.h:230
LogicalDecodeStartupCB startup_cb
Definition: output_plugin.h:218
LogicalDecodeCommitCB commit_cb
Definition: output_plugin.h:222
LogicalDecodeBeginCB begin_cb
Definition: output_plugin.h:219
LogicalDecodeStreamAbortCB stream_abort_cb
Definition: output_plugin.h:237
LogicalDecodeBeginPrepareCB begin_prepare_cb
Definition: output_plugin.h:229
LogicalDecodeChangeCB change_cb
Definition: output_plugin.h:220
LogicalDecodeShutdownCB shutdown_cb
Definition: output_plugin.h:225
OutputPluginOutputType output_type
Definition: output_plugin.h:28
Definition: rel.h:56
struct ReorderBufferChange::@114::@116 truncate
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
HeapTuple newtuple
Definition: reorderbuffer.h:106
struct ReorderBufferChange::@114::@115 tp
union ReorderBufferChange::@114 data
HeapTuple oldtuple
Definition: reorderbuffer.h:104
TimestampTz commit_time
Definition: reorderbuffer.h:359
void * output_plugin_private
Definition: reorderbuffer.h:467
TimestampTz prepare_time
Definition: reorderbuffer.h:360
TransactionId xid
Definition: reorderbuffer.h:299
Definition: value.h:64
bool skip_empty_xacts
Definition: test_decoding.c:35
MemoryContext context
Definition: test_decoding.c:32
bool include_timestamp
Definition: test_decoding.c:34
int natts
Definition: tupdesc.h:137
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: test_decoding.c:371
static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: test_decoding.c:769
static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: test_decoding.c:423
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: test_decoding.c:401
void _PG_init(void)
Definition: test_decoding.c:124
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
Definition: test_decoding.c:527
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: test_decoding.c:290
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: test_decoding.c:603
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: test_decoding.c:350
static void pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
Definition: test_decoding.c:310
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: test_decoding.c:322
static void print_literal(StringInfo s, Oid typid, char *outputstr)
Definition: test_decoding.c:481
static void pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: test_decoding.c:917
PG_MODULE_MAGIC_EXT(.name="test_decoding",.version=PG_VERSION)
static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: test_decoding.c:745
static void pg_decode_shutdown(LogicalDecodingContext *ctx)
Definition: test_decoding.c:280
static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: test_decoding.c:822
static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: test_decoding.c:856
static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: test_decoding.c:883
static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: test_decoding.c:463
static void pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
Definition: test_decoding.c:793
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid)
Definition: test_decoding.c:453
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: test_decoding.c:985
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: test_decoding.c:131
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: test_decoding.c:690
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: test_decoding.c:159
static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: test_decoding.c:804
static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: test_decoding.c:946
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
Definition: pg_list.h:46
#define strVal(v)
Definition: value.h:82
static bool VARATT_IS_EXTERNAL_ONDISK(const void *PTR)
Definition: varatt.h:361
const char * name
uint16 RepOriginId
Definition: xlogdefs.h:68
uint64 XLogRecPtr
Definition: xlogdefs.h:21

AltStyle によって変換されたページ (->オリジナル) /