1/*-------------------------------------------------------------------------
4 * publication manipulation
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/commands/publicationcmds.c
12 *-------------------------------------------------------------------------
52 * Information used to validate the columns in the row filter expression. See
53 * contain_invalid_rfcolumn_walker for details.
59 * relation's row filter */
81 bool *publish_via_partition_root_given,
82 bool *publish_via_partition_root,
83 bool *publish_generated_columns_given,
84 char *publish_generated_columns)
88 *publish_given =
false;
89 *publish_via_partition_root_given =
false;
90 *publish_generated_columns_given =
false;
97 *publish_via_partition_root =
false;
98 *publish_generated_columns = PUBLISH_GENCOLS_NONE;
105 if (strcmp(defel->
defname,
"publish") == 0)
115 * If publish option was given only the explicitly listed actions
116 * should be published.
123 *publish_given =
true;
128 (
errcode(ERRCODE_SYNTAX_ERROR),
129 errmsg(
"invalid list syntax in parameter \"%s\"",
132 /* Process the option list. */
133 foreach(lc2, publish_list)
135 char *publish_opt = (
char *)
lfirst(lc2);
137 if (strcmp(publish_opt,
"insert") == 0)
139 else if (strcmp(publish_opt,
"update") == 0)
141 else if (strcmp(publish_opt,
"delete") == 0)
143 else if (strcmp(publish_opt,
"truncate") == 0)
147 (
errcode(ERRCODE_SYNTAX_ERROR),
148 errmsg(
"unrecognized value for publication option \"%s\": \"%s\"",
149 "publish", publish_opt)));
152 else if (strcmp(defel->
defname,
"publish_via_partition_root") == 0)
154 if (*publish_via_partition_root_given)
156 *publish_via_partition_root_given =
true;
159 else if (strcmp(defel->
defname,
"publish_generated_columns") == 0)
161 if (*publish_generated_columns_given)
163 *publish_generated_columns_given =
true;
168 (
errcode(ERRCODE_SYNTAX_ERROR),
169 errmsg(
"unrecognized publication parameter: \"%s\"", defel->
defname)));
174 * Convert the PublicationObjSpecType list into schema oid list and
175 * PublicationTable list.
184 if (!pubobjspec_list)
187 foreach(cell, pubobjspec_list)
202 /* Filter out duplicates if user specifies "sch1, sch1" */
207 if (search_path ==
NIL)
/* nothing valid in search_path? */
209 errcode(ERRCODE_UNDEFINED_SCHEMA),
210 errmsg(
"no schema has been selected for CURRENT_SCHEMA"));
215 /* Filter out duplicates if user specifies "sch1, sch1" */
219 /* shouldn't happen */
227 * Returns true if any of the columns used in the row filter WHERE expression is
228 * not part of REPLICA IDENTITY, false otherwise.
242 * If pubviaroot is true, we are validating the row filter of the
243 * parent table, but the bitmap contains the replica identity
244 * information of the child table. So, get the column number of the
245 * child table as parent and child column order could be different.
264 * Check if all columns referenced in the filter expression are part of the
265 * REPLICA IDENTITY index or not.
267 * Returns true if any invalid column is found.
281 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
282 * allowed in the row filter and we can skip the validation.
284 if (relation->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
288 * For a partition, if pubviaroot is true, find the topmost ancestor that
289 * is published via this publication as we need to use its row filter
290 * expression to filter the partition's changes.
292 * Note that even though the row filter used is for an ancestor, the
293 * REPLICA IDENTITY used will be for the actual child table.
295 if (pubviaroot && relation->
rd_rel->relispartition)
301 publish_as_relid = relid;
312 Anum_pg_publication_rel_prqual,
322 context.
parentid = publish_as_relid;
323 context.
relid = relid;
325 /* Remember columns that are part of the REPLICA IDENTITY */
340 * Check for invalid columns in the publication table definition.
342 * This function evaluates two conditions:
344 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
345 * by the column list. If any column is missing, *invalid_column_list is set
347 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
348 * are published, either by being explicitly named in the column list or, if
349 * no column list is specified, by setting the option
350 * publish_generated_columns to stored. If any unpublished
351 * generated column is found, *invalid_gen_col is set to true.
353 * Returns true if any of the above conditions are not met.
357 bool pubviaroot,
char pubgencols_type,
358 bool *invalid_column_list,
359 bool *invalid_gen_col)
369 *invalid_column_list =
false;
370 *invalid_gen_col =
false;
373 * For a partition, if pubviaroot is true, find the topmost ancestor that
374 * is published via this publication as we need to use its column list for
377 * Note that even though the column list used is for an ancestor, the
378 * REPLICA IDENTITY used will be for the actual child table.
380 if (pubviaroot && relation->
rd_rel->relispartition)
385 publish_as_relid = relid;
388 /* Fetch the column list */
392 if (relation->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
394 /* With REPLICA IDENTITY FULL, no column list is allowed. */
395 *invalid_column_list = (columns != NULL);
398 * As we don't allow a column list with REPLICA IDENTITY FULL, the
399 * publish_generated_columns option must be set to stored if the table
400 * has any stored generated columns.
402 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
405 *invalid_gen_col =
true;
408 * Virtual generated columns are currently not supported for logical
409 * replication at all.
413 *invalid_gen_col =
true;
415 if (*invalid_gen_col && *invalid_column_list)
419 /* Remember columns that are part of the REPLICA IDENTITY */
424 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
425 * (to handle system columns the usual way), while column list does not
426 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
427 * over the idattrs and check all of them are in the list.
438 * The publish_generated_columns option must be set to stored if
439 * the REPLICA IDENTITY contains any stored generated column.
441 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
443 *invalid_gen_col =
true;
448 * The equivalent setting for virtual generated columns does not
451 if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
453 *invalid_gen_col =
true;
457 /* Skip validating the column list since it is not defined */
462 * If pubviaroot is true, we are validating the column list of the
463 * parent table, but the bitmap contains the replica identity
464 * information of the child table. The parent/child attnums may not
465 * match, so translate them to the parent - get the attname from the
466 * child, and look it up in the parent.
470 /* attribute name in the child table */
474 * Determine the attnum for the attribute name in parent (we are
475 * using the column list defined on the parent).
480 /* replica identity column, not covered by the column list */
483 if (*invalid_column_list && *invalid_gen_col)
490 return *invalid_column_list || *invalid_gen_col;
494 * Invalidate entries in the RelationSyncCache for relations included in the
495 * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
497 * If 'puballtables' is true, invalidate all cache entries.
512 * For partitioned tables, we must invalidate all partitions and
513 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
514 * a target. However, WAL records for TRUNCATE specify both a root and
524 /* Invalidate the relsyncache */
532/* check_functions_in_node callback */
541 * The row filter walker checks if the row filter expression is a "simple
544 * It allows only simple or compound expressions such as:
547 * - (Var Op Const) AND/OR (Var Op Const)
549 * (where Var is a column of the table this filter belongs to)
551 * The simple expression has the following restrictions:
552 * - User-defined operators are not allowed;
553 * - User-defined functions are not allowed;
554 * - User-defined types are not allowed;
555 * - User-defined collations are not allowed;
556 * - Non-immutable built-in functions are not allowed;
557 * - System columns are not allowed.
561 * We don't allow user-defined functions/operators/types/collations because
562 * (a) if a user drops a user-defined object used in a row filter expression or
563 * if there is any other error while using it, the logical decoding
564 * infrastructure won't be able to recover from such an error even if the
565 * object is recreated again because a historic snapshot is used to evaluate
567 * (b) a user-defined function can be used to access tables that could have
568 * unpleasant results because a historic snapshot is used. That's why only
569 * immutable built-in functions are allowed in row filter expressions.
571 * We don't allow system columns because currently, we don't have that
572 * information in the tuple passed to downstream. Also, as we don't replicate
573 * those to subscribers, there doesn't seem to be a need for a filter on those
576 * We can allow other node types after more analysis and testing.
581 char *errdetail_msg = NULL;
589 /* System columns are not allowed. */
591 errdetail_msg =
_(
"System columns are not allowed.");
596 /* OK, except user-defined operators are not allowed. */
598 errdetail_msg =
_(
"User-defined operators are not allowed.");
600 case T_ScalarArrayOpExpr:
601 /* OK, except user-defined operators are not allowed. */
603 errdetail_msg =
_(
"User-defined operators are not allowed.");
606 * We don't need to check the hashfuncid and negfuncid of
607 * ScalarArrayOpExpr as those functions are only built for a
611 case T_RowCompareExpr:
615 /* OK, except user-defined operators are not allowed. */
620 errdetail_msg =
_(
"User-defined operators are not allowed.");
644 errdetail_msg =
_(
"Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
649 * For all the supported nodes, if we haven't already found a problem,
650 * check the types, functions, and collations used in it. We check List
651 * by walking through each element.
653 if (!errdetail_msg && !
IsA(node,
List))
656 errdetail_msg =
_(
"User-defined types are not allowed.");
659 errdetail_msg =
_(
"User-defined or built-in mutable functions are not allowed.");
662 errdetail_msg =
_(
"User-defined collations are not allowed.");
666 * If we found a problem in this node, throw error now. Otherwise keep
671 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672 errmsg(
"invalid publication WHERE expression"),
681 * Check if the row filter expression is a "simple expression".
683 * See check_simple_rowfilter_expr_walker for details.
692 * Transform the publication WHERE expression for all the relations in the list,
693 * ensuring it is coerced to boolean and necessary collation information is
694 * added if required, and add a new nsitem/RTE for the associated relation to
695 * the ParseState's namespace list.
697 * Also check the publication row filter expression and throw an error if
698 * anything not permitted or unexpected is encountered.
709 Node *whereclause = NULL;
717 * If the publication doesn't publish changes via the root partitioned
718 * table, the partition's row filter will be used. So disallow using
719 * WHERE clause on partitioned table in this case.
724 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
725 errmsg(
"cannot use publication WHERE clause for relation \"%s\"",
727 errdetail(
"WHERE clause cannot be used for a partitioned table when %s is false.",
728 "publish_via_partition_root")));
731 * A fresh pstate is required so that we only have "this" table in its
744 "PUBLICATION WHERE");
746 /* Fix up collation information */
752 * We allow only simple expressions in row filters. See
753 * check_simple_rowfilter_expr_walker.
765 * Given a list of tables that are going to be added to a publication,
766 * verify that they fulfill the necessary preconditions, namely: no tables
767 * have a column list if any schema is published; and partitioned tables do
768 * not have column lists if publish_via_partition_root is not set.
770 * 'publish_schema' indicates that the publication contains any TABLES IN
771 * SCHEMA elements (newly added in this command, or preexisting).
772 * 'pubviaroot' is the value of publish_via_partition_root.
776 bool publish_schema,
bool pubviaroot)
788 * Disallow specifying column list if any schema is in the
791 * XXX We could instead just forbid the case when the publication
792 * tries to publish the table with a column list and a schema for that
793 * table. However, if we do that then we need a restriction during
794 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
795 * seem to be a good idea.
799 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
800 errmsg(
"cannot use column list for relation \"%s.%s\" in publication \"%s\"",
803 errdetail(
"Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
806 * If the publication doesn't publish changes via the root partitioned
807 * table, the partition's column list will be used. So disallow using
808 * a column list on the partitioned table in this case.
813 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
814 errmsg(
"cannot use column list for relation \"%s.%s\" in publication \"%s\"",
817 errdetail(
"Column lists cannot be specified for partitioned tables when %s is false.",
818 "publish_via_partition_root")));
823 * Create new publication.
831 bool nulls[Natts_pg_publication];
836 bool publish_via_partition_root_given;
837 bool publish_via_partition_root;
838 bool publish_generated_columns_given;
839 char publish_generated_columns;
844 /* must have CREATE privilege on database */
850 /* FOR ALL TABLES requires superuser */
853 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
854 errmsg(
"must be superuser to create FOR ALL TABLES publication")));
858 /* Check if name is used */
864 errmsg(
"publication \"%s\" already exists",
869 memset(nulls,
false,
sizeof(nulls));
871 values[Anum_pg_publication_pubname - 1] =
877 &publish_given, &pubactions,
878 &publish_via_partition_root_given,
879 &publish_via_partition_root,
880 &publish_generated_columns_given,
881 &publish_generated_columns);
884 Anum_pg_publication_oid);
886 values[Anum_pg_publication_puballtables - 1] =
888 values[Anum_pg_publication_pubinsert - 1] =
890 values[Anum_pg_publication_pubupdate - 1] =
892 values[Anum_pg_publication_pubdelete - 1] =
894 values[Anum_pg_publication_pubtruncate - 1] =
896 values[Anum_pg_publication_pubviaroot - 1] =
898 values[Anum_pg_publication_pubgencols - 1] =
903 /* Insert tuple into catalog. */
911 /* Make the changes visible. */
914 /* Associate objects with the publication. */
915 if (
stmt->for_all_tables)
917 /* Invalidate relcache so that publication info is rebuilt. */
925 /* FOR TABLES IN SCHEMA requires superuser */
928 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
929 errmsg(
"must be superuser to create FOR TABLES IN SCHEMA publication"));
931 if (relations !=
NIL)
937 publish_via_partition_root);
941 publish_via_partition_root);
947 if (schemaidlist !=
NIL)
950 * Schema lock is held until the publication is created to prevent
951 * concurrent schema deletion.
964 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
965 errmsg(
"\"wal_level\" is insufficient to publish logical changes"),
966 errhint(
"Set \"wal_level\" to \"logical\" before creating subscriptions.")));
972 * Change options of a publication.
978 bool nulls[Natts_pg_publication];
979 bool replaces[Natts_pg_publication];
983 bool publish_via_partition_root_given;
984 bool publish_via_partition_root;
985 bool publish_generated_columns_given;
986 char publish_generated_columns;
994 &publish_given, &pubactions,
995 &publish_via_partition_root_given,
996 &publish_via_partition_root,
997 &publish_generated_columns_given,
998 &publish_generated_columns);
1003 * If the publication doesn't publish changes via the root partitioned
1004 * table, the partition's row filter and column list will be used. So
1005 * disallow using WHERE clause and column lists on partitioned table in
1008 if (!pubform->puballtables && publish_via_partition_root_given &&
1009 !publish_via_partition_root)
1012 * Lock the publication so nobody else can do anything with it. This
1013 * prevents concurrent alter to add partitioned table(s) with WHERE
1014 * clause(s) and/or column lists which we don't allow when not
1015 * publishing via root.
1023 foreach(lc, root_relids)
1033 * Beware: we don't have lock on the relations, so cope silently
1034 * with the cache lookups returning NULL.
1042 has_rowfilter = !
heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1043 has_collist = !
heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1044 if (!has_rowfilter && !has_collist)
1051 if (relkind != RELKIND_PARTITIONED_TABLE)
1057 if (
relname == NULL)
/* table concurrently dropped */
1065 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1066 errmsg(
"cannot set parameter \"%s\" to false for publication \"%s\"",
1067 "publish_via_partition_root",
1069 errdetail(
"The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1070 relname,
"publish_via_partition_root")));
1073 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1074 errmsg(
"cannot set parameter \"%s\" to false for publication \"%s\"",
1075 "publish_via_partition_root",
1077 errdetail(
"The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1078 relname,
"publish_via_partition_root")));
1082 /* Everything ok, form a new tuple. */
1084 memset(nulls,
false,
sizeof(nulls));
1085 memset(replaces,
false,
sizeof(replaces));
1090 replaces[Anum_pg_publication_pubinsert - 1] =
true;
1093 replaces[Anum_pg_publication_pubupdate - 1] =
true;
1096 replaces[Anum_pg_publication_pubdelete - 1] =
true;
1099 replaces[Anum_pg_publication_pubtruncate - 1] =
true;
1102 if (publish_via_partition_root_given)
1104 values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
1105 replaces[Anum_pg_publication_pubviaroot - 1] =
true;
1108 if (publish_generated_columns_given)
1110 values[Anum_pg_publication_pubgencols - 1] =
CharGetDatum(publish_generated_columns);
1111 replaces[Anum_pg_publication_pubgencols - 1] =
true;
1117 /* Update the catalog. */
1124 /* Invalidate the relcache. */
1125 if (pubform->puballtables)
1135 * For any partitioned tables contained in the publication, we must
1136 * invalidate all partitions contained in the respective partition
1137 * trees, not just those explicitly mentioned in the publication.
1139 if (root_relids ==
NIL)
1145 * We already got tables explicitly mentioned in the publication.
1146 * Now get all partitions for the partitioned table in the list.
1148 foreach(lc, root_relids)
1169 * Invalidate the relations.
1175 * We don't want to send too many individual messages, at some point it's
1176 * cheaper to just reset whole relcache.
1190 * Add or remove table to/from publication.
1194 List *tables,
const char *queryString,
1195 bool publish_schema)
1199 Oid pubid = pubform->oid;
1202 * Nothing to do if no objects, except in SET: for that it is quite
1203 * possible that user has not specified any tables in which case we need
1204 * to remove all the existing tables.
1218 pubform->pubviaroot);
1224 else /* AP_SetObjects */
1234 pubform->pubviaroot);
1237 * To recreate the relation list for the publication, look for
1238 * existing relations that do not need to be dropped.
1240 foreach(oldlc, oldrelids)
1247 Node *oldrelwhereclause = NULL;
1250 /* look up the cache for the old relmap */
1256 * See if the existing relation currently has a WHERE clause or a
1257 * column list. We need to compare those too.
1262 Datum whereClauseDatum;
1263 Datum columnListDatum;
1265 /* Load the WHERE clause for this table. */
1267 Anum_pg_publication_rel_prqual,
1272 /* Transform the int2vector column list to a bitmap. */
1274 Anum_pg_publication_rel_prattrs,
1283 foreach(newlc, rels)
1293 * Validate the column list. If the column list or WHERE
1294 * clause changes, then the validation done here will be
1295 * duplicated inside PublicationAddTables(). The validation
1296 * is cheap enough that that seems harmless.
1302 * Check if any of the new set of relations matches with the
1303 * existing relations in the publication. Additionally, if the
1304 * relation has an associated WHERE clause, check the WHERE
1305 * expressions also match. Same for the column list. Drop the
1308 if (newrelid == oldrelid)
1320 * Add the non-matched relations to a list so that they can be
1330 delrels =
lappend(delrels, oldrel);
1334 /* And drop them. */
1338 * Don't bother calculating the difference for adding, we'll catch and
1339 * skip existing ones when doing catalog update.
1350 * Alter the publication schemas.
1352 * Add or remove schemas to/from publication.
1361 * Nothing to do if no objects, except in SET: for that it is quite
1362 * possible that user has not specified any schemas in which case we need
1363 * to remove all the existing schemas.
1369 * Schema lock is held until the publication is altered to prevent
1370 * concurrent schema deletion.
1380 foreach(lc, reloids)
1392 * Disallow adding schema if column list is already part of the
1393 * publication. See CheckPubRelationColumnList.
1395 if (!
heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1397 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1398 errmsg(
"cannot add schema to publication \"%s\"",
1400 errdetail(
"Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1409 else /* AP_SetObjects */
1414 /* Identify which schemas should be dropped */
1418 * Schema lock is held until the publication is altered to prevent
1419 * concurrent schema deletion.
1427 * Don't bother calculating the difference for adding, we'll catch and
1428 * skip existing ones when doing catalog update.
1435 * Check if relations and schemas can be in a given publication and throw
1436 * appropriate error if not.
1447 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1448 errmsg(
"must be superuser to add or set schemas")));
1451 * Check that user is allowed to manipulate the publication tables in
1454 if (schemaidlist && pubform->puballtables)
1456 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1457 errmsg(
"publication \"%s\" is defined as FOR ALL TABLES",
1459 errdetail(
"Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1461 /* Check that user is allowed to manipulate the publication tables. */
1462 if (tables && pubform->puballtables)
1464 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1465 errmsg(
"publication \"%s\" is defined as FOR ALL TABLES",
1467 errdetail(
"Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1471 * Alter the existing publication.
1473 * This is dispatcher function for AlterPublicationOptions,
1474 * AlterPublicationSchemas and AlterPublicationTables.
1490 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1491 errmsg(
"publication \"%s\" does not exist",
1507 Oid pubid = pubform->oid;
1516 /* Lock the publication so nobody else can do anything with it. */
1521 * It is possible that by the time we acquire the lock on publication,
1522 * concurrent DDL has removed it. We can test this by checking the
1523 * existence of publication. We get the tuple again to avoid the risk
1524 * of any publication option getting changed.
1529 errcode(ERRCODE_UNDEFINED_OBJECT),
1530 errmsg(
"publication \"%s\" does not exist",
1534 schemaidlist !=
NIL);
1544 * Remove relation from publication by mapping OID.
1559 elog(
ERROR,
"cache lookup failed for publication table %u",
1565 * Invalidate relcache so that publication info is rebuilt.
1567 * For the partitioned tables, we must invalidate all partitions contained
1568 * in the respective partition hierarchies, not just the one explicitly
1569 * mentioned in the publication. This is required because we implicitly
1570 * publish the child tables when the parent table is published.
1585 * Remove the publication by mapping OID.
1598 elog(
ERROR,
"cache lookup failed for publication %u", pubid);
1602 /* Invalidate relcache so that publication info is rebuilt. */
1603 if (pubform->puballtables)
1614 * Remove schema from publication by mapping OID.
1629 elog(
ERROR,
"cache lookup failed for publication schema %u", psoid);
1634 * Invalidate relcache so that publication info is rebuilt. See
1635 * RemovePublicationRelById for why we need to consider all the
1650 * Open relations specified by a PublicationTable list.
1651 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1652 * add them to a publication.
1661 List *relids_with_collist =
NIL;
1664 * Open, share-lock, and check all the explicitly-specified relations
1674 /* Allow query cancel in case this takes a long time */
1681 * Filter out duplicates if user specifies "foo, foo".
1683 * Note that this algorithm is known to not be very efficient (O(N^2))
1684 * but given that it only works on list of tables given to us by user
1685 * it's deemed acceptable.
1689 /* Disallow duplicate tables if there are any with row filters. */
1693 errmsg(
"conflicting or redundant WHERE clauses for table \"%s\"",
1696 /* Disallow duplicate tables if there are any with column lists. */
1700 errmsg(
"conflicting or redundant column lists for table \"%s\"",
1711 rels =
lappend(rels, pub_rel);
1715 relids_with_rf =
lappend_oid(relids_with_rf, myrelid);
1718 relids_with_collist =
lappend_oid(relids_with_collist, myrelid);
1721 * Add children of this rel, if requested, so that they too are added
1722 * to the publication. A partitioned table can't have any inheritance
1723 * children other than its partitions, which need not be explicitly
1724 * added to the publication.
1726 if (recurse && rel->
rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1734 foreach(child, children)
1738 /* Allow query cancel in case this takes a long time */
1742 * Skip duplicates if user specified both parent and child
1748 * We don't allow to specify row filter for both parent
1749 * and child table at the same time as it is not very
1750 * clear which one should be given preference.
1752 if (childrelid != myrelid &&
1756 errmsg(
"conflicting or redundant WHERE clauses for table \"%s\"",
1760 * We don't allow to specify column list for both parent
1761 * and child table at the same time as it is not very
1762 * clear which one should be given preference.
1764 if (childrelid != myrelid &&
1768 errmsg(
"conflicting or redundant column lists for table \"%s\"",
1774 /* find_all_inheritors already got lock */
1778 /* child inherits WHERE clause from parent */
1781 /* child inherits column list from parent */
1783 rels =
lappend(rels, pub_rel);
1787 relids_with_rf =
lappend_oid(relids_with_rf, childrelid);
1790 relids_with_collist =
lappend_oid(relids_with_collist, childrelid);
1802 * Close all relations in the list.
1821 * Lock the schemas specified in the schema list in AccessShareLock mode in
1822 * order to prevent concurrent schema deletion.
1829 foreach(lc, schemalist)
1833 /* Allow query cancel in case this takes a long time */
1838 * It is possible that by the time we acquire the lock on schema,
1839 * concurrent DDL has removed it. We can test this by checking the
1840 * existence of schema.
1844 errcode(ERRCODE_UNDEFINED_SCHEMA),
1845 errmsg(
"schema with OID %u does not exist", schemaid));
1850 * Add listed tables to the publication.
1864 /* Must be owner of the table or superuser. */
1882 * Remove listed tables from the publication.
1899 errcode(ERRCODE_SYNTAX_ERROR),
1900 errmsg(
"column list must not be specified in ALTER PUBLICATION ... DROP"));
1902 prid =
GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1911 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1912 errmsg(
"relation \"%s\" is not part of the publication",
1918 (
errcode(ERRCODE_SYNTAX_ERROR),
1919 errmsg(
"cannot use a WHERE clause when removing a table from a publication")));
1927 * Add listed schemas to the publication.
1935 foreach(lc, schemas)
1953 * Remove listed schemas from the publication.
1962 foreach(lc, schemas)
1967 Anum_pg_publication_namespace_oid,
1976 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1977 errmsg(
"tables from schema \"%s\" are not part of the publication",
1987 * Internal workhorse for changing a publication owner
1996 if (form->pubowner == newOwnerId)
2008 /* Must be able to become new owner */
2011 /* New owner must have CREATE privilege on database */
2019 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2020 errmsg(
"permission denied to change owner of publication \"%s\"",
2022 errhint(
"The owner of a FOR ALL TABLES publication must be a superuser.")));
2026 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2027 errmsg(
"permission denied to change owner of publication \"%s\"",
2029 errhint(
"The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
2032 form->pubowner = newOwnerId;
2035 /* Update owner dependency reference */
2045 * Change publication owner -- by name
2062 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2063 errmsg(
"publication \"%s\" does not exist",
name)));
2066 pubid = pubform->oid;
2080 * Change publication owner -- by OID
2094 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2095 errmsg(
"publication with OID %u does not exist", pubid)));
2105 * Extract the publish_generated_columns option value from a DefElem. "stored"
2106 * and "none" values are accepted.
2114 * A parameter value is required.
2121 return PUBLISH_GENCOLS_NONE;
2123 return PUBLISH_GENCOLS_STORED;
2127 errcode(ERRCODE_SYNTAX_ERROR),
2128 errmsg(
"invalid value for publication parameter \"%s\": \"%s\"", def->
defname, sval),
2129 errdetail(
"Valid values are \"%s\" and \"%s\".",
"none",
"stored"));
2131 return PUBLISH_GENCOLS_NONE;
/* keep compiler quiet */
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
#define InvalidAttrNumber
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
int bms_next_member(const Bitmapset *a, int prevbit)
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * defGetString(DefElem *def)
bool defGetBoolean(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void CacheInvalidateRelSyncAll(void)
void CacheInvalidateRelcacheByRelid(Oid relid)
void CacheInvalidateRelSync(Oid relid)
void CacheInvalidateRelcacheAll(void)
if(TABLE==NULL||TABLE_index==NULL)
List * list_concat_unique_oid(List *list1, const List *list2)
List * lappend(List *list, void *datum)
List * list_difference_oid(const List *list1, const List *list2)
List * lappend_oid(List *list, Oid datum)
List * list_append_unique_oid(List *list, Oid datum)
void list_free(List *list)
bool list_member_oid(const List *list, Oid datum)
void list_free_deep(List *list)
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
#define ShareUpdateExclusiveLock
char * get_rel_name(Oid relid)
AttrNumber get_attnum(Oid relid, const char *attname)
char * get_database_name(Oid dbid)
char get_rel_relkind(Oid relid)
char func_volatile(Oid funcid)
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
char * get_namespace_name(Oid nspid)
#define CHECK_FOR_INTERRUPTS()
Datum namein(PG_FUNCTION_ARGS)
List * fetch_search_path(bool includeImplicit)
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Oid exprType(const Node *expr)
Oid exprInputCollation(const Node *expr)
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Oid exprCollation(const Node *expr)
int exprLocation(const Node *expr)
#define expression_tree_walker(n, w, c)
#define IsA(nodeptr, _type_)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
int parser_errposition(ParseState *pstate, int location)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
FormData_pg_attribute * Form_pg_attribute
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst_node(type, lc)
static int list_length(const List *l)
#define foreach_oid(var, lst)
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool is_schema_publication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Publication * GetPublication(Oid pubid)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
static Datum CharGetDatum(char X)
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
void InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
static char defGetGeneratedColsOption(DefElem *def)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, char *publish_generated_columns)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
#define ERRCODE_DUPLICATE_OBJECT
const char * p_sourcetext
PublicationObjSpecType pubobjtype
PublicationTable * pubtable
bool has_generated_virtual
bool has_generated_stored
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheExists1(cacheId, key1)
#define GetSysCacheOid1(cacheId, oidcol, key1)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
#define FirstNormalObjectId
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
void CommandCounterIncrement(void)