index 7918176fc588e873169f08aecf890ec1c5729aa8..3e20f4487872e498b480660c7044fc32fb386e8c 100644 (file)
return found;
}
+
+/*
+ * TwoPhaseGetXidByLockingProc
+ * Return the oldest transaction ID from prepared transactions that are
+ * currently in the commit critical section.
+ *
+ * This function only considers transactions in the currently connected
+ * database. If no matching transactions are found, it returns
+ * InvalidTransactionId.
+ */
+TransactionId
+TwoPhaseGetOldestXidInCommit(void)
+{
+ TransactionId oldestRunningXid = InvalidTransactionId;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC *commitproc;
+ TransactionId xid;
+
+ if (!gxact->valid)
+ continue;
+
+ if (gxact->locking_backend == INVALID_PROC_NUMBER)
+ continue;
+
+ /*
+ * Get the backend that is handling the transaction. It's safe to
+ * access this backend while holding TwoPhaseStateLock, as the backend
+ * can only be destroyed after either removing or unlocking the
+ * current global transaction, both of which require an exclusive
+ * TwoPhaseStateLock.
+ */
+ commitproc = GetPGProcByNumber(gxact->locking_backend);
+
+ if (MyDatabaseId != commitproc->databaseId)
+ continue;
+
+ if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
+ continue;
+
+ xid = XidFromFullTransactionId(gxact->fxid);
+
+ if (!TransactionIdIsValid(oldestRunningXid) ||
+ TransactionIdPrecedes(xid, oldestRunningXid))
+ oldestRunningXid = xid;
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return oldestRunningXid;
+}
index 82cf65fae737a5d1d934a074654ff60e249b1a12..750d262fccade6a8df9d5ea7ad16e4b94e985ada 100644 (file)
@@ -854,7 +854,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
pgstat_create_subscription(subid);
- if (opts.enabled)
+ /*
+ * Notify the launcher to start the apply worker if the subscription is
+ * enabled, or to create the conflict detection slot if retain_dead_tuples
+ * is enabled.
+ *
+ * Creating the conflict detection slot is essential even when the
+ * subscription is not enabled. This ensures that dead tuples are
+ * retained, which is necessary for accurately identifying the type of
+ * conflict during replication.
+ */
+ if (opts.enabled || opts.retaindeadtuples)
ApplyLauncherWakeupAtCommit();
ObjectAddressSet(myself, SubscriptionRelationId, subid);
index d3356bc84ee0cddc14cab9ecf60989308e078c46..e6da4028d392e89fe72508cf82db363547bfb084 100644 (file)
return has_subrels && (table_states_not_ready == NIL);
}
+/*
+ * Return whether the subscription currently has any relations.
+ *
+ * Note: Unlike HasSubscriptionRelations(), this function relies on cached
+ * information for subscription relations. Additionally, it should not be
+ * invoked outside of apply or tablesync workers, as MySubscription must be
+ * initialized first.
+ */
+bool
+HasSubscriptionRelationsCached(void)
+{
+ bool started_tx;
+ bool has_subrels;
+
+ /* We need up-to-date subscription tables info here */
+ has_subrels = FetchTableStates(&started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(true);
+ }
+
+ return has_subrels;
+}
+
/*
* Update the two_phase state of the specified subscription in pg_subscription.
*/
index f1ebd63e792eed17b1eb4a232b29881474c6004c..c0f6bef5c282cec380ac3ccba89b56a0a087d30e 100644 (file)
* workers is complex and not worth the effort, so we simply return if not
* all tables are in the READY state.
*
- * It is safe to add new tables with initial states to the subscription
- * after this check because any changes applied to these tables should
- * have a WAL position greater than the rdt_data->remote_lsn.
+ * Advancing the transaction ID is necessary even when no tables are
+ * currently subscribed, to avoid retaining dead tuples unnecessarily.
+ * While it might seem safe to skip all phases and directly assign
+ * candidate_xid to oldest_nonremovable_xid during the
+ * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
+ * concurrently add tables to the subscription, the apply worker may not
+ * process invalidations in time. Consequently,
+ * HasSubscriptionRelationsCached() might miss the new tables, leading to
+ * premature advancement of oldest_nonremovable_xid.
+ *
+ * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
+ * invalidations are guaranteed to be processed before applying changes
+ * from newly added tables while waiting for the local flush to reach
+ * remote_lsn.
+ *
+ * Additionally, even if we check for subscription tables during
+ * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
+ * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
+ * subscription tables at this stage to prevent unnecessary tuple
+ * retention.
*/
- if (!AllTablesyncsReady())
+ if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
{
TimestampTz now;
index e3dce9dc68d04ff0d96a04fa8606881536c344c0..59822f22b8d06055d08f38660e2fc3a53c4e15ac 100644 (file)
#include "access/timeline.h"
#include "access/transam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
{
XLogRecPtr lsn = InvalidXLogRecPtr;
TransactionId oldestXidInCommit;
+ TransactionId oldestGXidInCommit;
FullTransactionId nextFullXid;
FullTransactionId fullOldestXidInCommit;
WalSnd *walsnd = MyWalSnd;
* ones replicated.
*/
oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
+
+ /*
+ * Update the oldest xid for standby transmission if an older prepared
+ * transaction exists and is currently in commit phase.
+ */
+ if (TransactionIdIsValid(oldestGXidInCommit) &&
+ TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
+ oldestXidInCommit = oldestGXidInCommit;
+
nextFullXid = ReadNextFullTransactionId();
fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
oldestXidInCommit);
index 509bdad9a5d557cb2912bbfc38846dd7fc451655..64463e9f4afb44294e51ea4431794045703feac9 100644 (file)
@@ -68,4 +68,6 @@ extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res,
int szgid);
extern bool LookupGXactBySubid(Oid subid);
+extern TransactionId TwoPhaseGetOldestXidInCommit(void);
+
#endif /* TWOPHASE_H */
index 62ea1a0058081b162fd0c670b258e44627d89ed0..de003802612790b6f6cd8b9693b9c647ee4c2e0f 100644 (file)
@@ -272,6 +272,7 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
char *originname, Size szoriginname);
extern bool AllTablesyncsReady(void);
+extern bool HasSubscriptionRelationsCached(void);
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void process_syncing_tables(XLogRecPtr current_lsn);
index 51b23a39fa9359d6791a4d37522740d6c3cbc993..e06429c288fe656de4b132df1267bc21a1de9bb5 100644 (file)
.*Remote row \(2, 4\); replica identity full \(2, 2\)/,
'update target row was deleted in tab');
+###############################################################################
+# Check that the xmin value of the conflict detection slot can be advanced when
+# the subscription has no tables.
+###############################################################################
+
+# Remove the table from the publication
+$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B DROP TABLE tab");
+
+$node_A->safe_psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION");
+
+# Remember the next transaction ID to be assigned
+$next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+# Confirm that the xmin value is advanced to the latest nextXid. If no
+# transactions are running, the apply worker selects nextXid as the candidate
+# for the non-removable xid. See GetOldestActiveTransactionId().
+ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is updated on Node A");
+
+# Re-add the table to the publication for further tests
+$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B ADD TABLE tab");
+
+$node_A->safe_psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)");
+
###############################################################################
# Check that dead tuple retention stops due to the wait time surpassing
# max_retention_duration.