1/*-------------------------------------------------------------------------
4 * replication subscriptions
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/catalog/pg_subscription.c
12 *-------------------------------------------------------------------------
29#include "utils/fmgroids.h"
38 * Add a comma-separated list of publication names to the 'dest' string.
48 foreach(lc, publications)
69 * Fetch the subscription from the syscache.
87 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
94 sub->
dbid = subform->subdbid;
95 sub->
skiplsn = subform->subskiplsn;
97 sub->
owner = subform->subowner;
98 sub->
enabled = subform->subenabled;
99 sub->
binary = subform->subbinary;
100 sub->
stream = subform->substream;
105 sub->
failover = subform->subfailover;
113 Anum_pg_subscription_subconninfo);
119 Anum_pg_subscription_subslotname,
129 Anum_pg_subscription_subsynccommit);
132 /* Get publications */
135 Anum_pg_subscription_subpublications);
141 Anum_pg_subscription_suborigin);
144 /* Is the subscription owner a superuser? */
153 * Return number of subscriptions defined in given database.
154 * Used by dropdb() to check if database can indeed be dropped.
168 Anum_pg_subscription_subdbid,
186 * Free memory allocated by subscription struct.
200 * Disable the given subscription.
206 bool nulls[Natts_pg_subscription];
207 bool replaces[Natts_pg_subscription];
211 /* Look up the subscription in the catalog */
216 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
220 /* Form a new tuple. */
222 memset(nulls,
false,
sizeof(nulls));
223 memset(replaces,
false,
sizeof(replaces));
225 /* Set the subscription to disabled. */
227 replaces[Anum_pg_subscription_subenabled - 1] =
true;
229 /* Update the catalog */
239 * Convert text array to list of strings.
241 * Note: the resulting list of strings is pallocated here.
256 for (
i = 0;
i < nelems;
i++)
263 * Add new state record for a subscription table.
265 * If retain_lock is true, then don't release the locks taken in this function.
266 * We normally release the locks at the end of transaction but in binary-upgrade
267 * mode, we expect to release those immediately.
275 bool nulls[Natts_pg_subscription_rel];
282 /* Try finding existing mapping. */
287 elog(
ERROR,
"subscription table %u in subscription %u already exists",
290 /* Form the tuple. */
292 memset(nulls,
false,
sizeof(nulls));
299 nulls[Anum_pg_subscription_rel_srsublsn - 1] =
true;
303 /* Insert tuple into catalog. */
321 * Update the state of a subscription table.
329 bool nulls[Natts_pg_subscription_rel];
331 bool replaces[Natts_pg_subscription_rel];
335#ifdef USE_ASSERT_CHECKING
352 /* Try finding existing mapping. */
357 elog(
ERROR,
"subscription table %u in subscription %u does not exist",
360 /* Update the tuple. */
362 memset(nulls,
false,
sizeof(nulls));
363 memset(replaces,
false,
sizeof(replaces));
365 replaces[Anum_pg_subscription_rel_srsubstate - 1] =
true;
368 replaces[Anum_pg_subscription_rel_srsublsn - 1] =
true;
372 nulls[Anum_pg_subscription_rel_srsublsn - 1] =
true;
377 /* Update the catalog. */
385 * Get state of subscription table.
387 * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
399 * This is to avoid the race condition with AlterSubscription which tries
400 * to remove this relstate.
404 /* Try finding the mapping. */
413 return SUBREL_STATE_UNKNOWN;
421 Anum_pg_subscription_rel_srsublsn, &isnull);
436 * Drop subscription relation mapping. These can be for a particular
437 * subscription, or for a particular relation, or both.
453 Anum_pg_subscription_rel_srsubid,
462 Anum_pg_subscription_rel_srrelid,
468 /* Do the search and delete what we found. */
477 * We don't allow to drop the relation mapping when the table
478 * synchronization is in progress unless the caller updates the
479 * corresponding subscription as well. This is to ensure that we don't
480 * leave tablesync slots or origins in the system when the
481 * corresponding table is dropped.
483 if (!
OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
486 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
487 errmsg(
"could not drop relation mapping for subscription \"%s\"",
489 errdetail(
"Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
493 * translator: first %s is a SQL ALTER command and second %s is a
496 errhint(
"Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
497 "ALTER SUBSCRIPTION ... ENABLE",
498 "DROP SUBSCRIPTION ...")));
509 * Does the subscription have any relations?
511 * Use this function only to know true/false, and when you have no need for the
512 * List returned by GetSubscriptionRelations.
525 Anum_pg_subscription_rel_srsubid,
532 /* If even a single tuple exists then the subscription has tables. */
543 * Get the relations for the subscription.
545 * If not_ready is true, return only the relations that are not in a ready
546 * state, otherwise return all the relations of the subscription. The
547 * returned list is palloc'ed in the current memory context.
562 Anum_pg_subscription_rel_srsubid,
568 Anum_pg_subscription_rel_srsubstate,
585 relstate->
relid = subrel->srrelid;
586 relstate->
state = subrel->srsubstate;
588 Anum_pg_subscription_rel_srsublsn, &isnull);
605 * Update the dead tuple retention status for the given subscription.
611 bool nulls[Natts_pg_subscription];
612 bool replaces[Natts_pg_subscription];
616 /* Look up the subscription in the catalog */
621 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
625 /* Form a new tuple. */
627 memset(nulls,
false,
sizeof(nulls));
628 memset(replaces,
false,
sizeof(replaces));
630 /* Set the subscription to disabled. */
631 values[Anum_pg_subscription_subretentionactive - 1] = active;
632 replaces[Anum_pg_subscription_subretentionactive - 1] =
true;
634 /* Update the catalog */
#define DatumGetArrayTypeP(X)
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode, bool orstronger)
#define SET_LOCKTAG_OBJECT(locktag, dboid, classoid, objoid, objsubid)
char * get_rel_name(Oid relid)
char * get_subscription_name(Oid subid, bool missing_ok)
char * pstrdup(const char *in)
void pfree(void *pointer)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
int CountDBSubscriptions(Oid dbid)
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
static List * textarray_to_stringlist(ArrayType *textarray)
bool HasSubscriptionRelations(Oid subid)
void UpdateDeadTupleRetentionStatus(Oid subid, bool active)
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
FormData_pg_subscription_rel * Form_pg_subscription_rel
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
Datum quote_literal(PG_FUNCTION_ARGS)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
#define BTEqualStrategyNumber
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, ScanKeyData *key)
static void table_endscan(TableScanDesc scan)
String * makeString(char *str)
#define InvalidXLogRecPtr