index c8c7bc5045a18ad41f9c64f0b6c6529f299711dc..27fa607da41d108bfc121d49eee72628a77f1b19 100644 (file)
=pod
+=item $node->wait_for_subscription_sync(publisher, subname, dbname)
+
+Wait for all tables in pg_subscription_rel to complete the initial
+synchronization (i.e to be either in 'syncdone' or 'ready' state).
+
+If the publisher node is given, additionally, check if the subscriber has
+caught up to what has been committed on the primary. This is useful to
+ensure that the initial data synchronization has been completed after
+creating a new subscription.
+
+If there is no active replication connection from this peer, wait until
+poll_query_until timeout.
+
+This is not a test. It die()s on failure.
+
+=cut
+
+sub wait_for_subscription_sync
+{
+ my ($self, $publisher, $subname, $dbname) = @_;
+ my $name = $self->name;
+
+ $dbname = defined($dbname) ? $dbname : 'postgres';
+
+ # Wait for all tables to finish initial sync.
+ print "Waiting for all subscriptions in \"$name\" to synchronize data\n";
+ my $query =
+ qq[SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');];
+ $self->poll_query_until($dbname, $query)
+ or croak "timed out waiting for subscriber to synchronize data";
+
+ # Then, wait for the replication to catchup if required.
+ if (defined($publisher))
+ {
+ croak 'subscription name must be specified' unless defined($subname);
+ $publisher->wait_for_catchup($subname);
+ }
+
+ print "done\n";
+ return;
+}
+
+=pod
+
=item $node->wait_for_log(regexp, offset)
Waits for the contents of the server log file, starting at the given offset, to
index f53b3b7db0c5f60fe16c455a5489a6169c0fc92b..c5b5be419cc2732ab56d2c5575a9621b565cfb85 100644 (file)
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
);
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
"CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
);
-$node_publisher->wait_for_catchup('tap_sub_temp1');
-
-# Also wait for initial table sync to finish
-$synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_temp1');
# Subscriber table will have no rows initially
$result =
index 3f1f00f7c8d9d338d86406c156fccaba355eadf0..d6c6f4932720e943163f1d30cc7d13a70141b817 100644 (file)
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
);
-$node_publisher->wait_for_catchup('tap_sub');
-
# Wait for initial sync to finish as well
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Insert initial test data
$node_publisher->safe_psql(
index cf61fc1e0f605e8573728802b6f57b5d33708848..fd4bf7bacd1af4072ff77f676ebcf03e315f35f8 100644 (file)
@@ -39,13 +39,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
@@ -71,8 +66,7 @@ $node_subscriber->poll_query_until('postgres', $started_query)
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
# wait for sync to finish this time
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
# check that all data is synced
$result =
);
# and wait for data sync to finish again
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
# check that all data is synced
$result =
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
# wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rep_next");
index 38a74a897f805b4cf8451903ad93c9d72f166dc5..3ee0522460b78fb081b9d70c6be94aa241122b5f 100644 (file)
@@ -32,13 +32,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
);
-$node_publisher->wait_for_catchup('mysub');
-
-# Wait for initial sync to finish as well
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
$node_publisher->safe_psql('postgres',
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
index c924ff35f7174457a8fc7eb9b54b39d0972da6a4..fdcb3f811ce00520ea9254e0dd2099ceea9fe508 100644 (file)
@@ -28,13 +28,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
);
-$node_publisher->wait_for_catchup('mysub');
-
-# Wait for initial sync to finish as well
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
$node_publisher->safe_psql('postgres',
q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
index 01df54229c4b1f446b80fceea1a35932e94d5110..8882addc18f85e9c85ed1f4e9ce6c65e6dbd3b79 100644 (file)
m/WARNING: publication "non_existent_pub" does not exist in the publisher/,
"Create subscription throws warning for non-existent publication");
-$node_publisher->wait_for_catchup('mysub1');
-
-# Also wait for initial table sync to finish.
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish.
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub1');
# Specifying non-existent publication along with add publication.
($ret, $stdout, $stderr) = $node_subscriber->psql('postgres',
index 67b4026afa407f10e68890aced4559a9cc09a597..b4d44a200bb6a6f8ccc83a178b39a9859213acc4 100644 (file)
@@ -38,13 +38,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
my $result =
$node_subscriber->safe_psql('postgres',
@@ -105,8 +100,7 @@ $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
# Add replica identity column. (The serial is not necessary, but it's
# a convenient way to get a default on the new column so that rows
index d51924943136696b0db27e69b3fc621dd133feb0..a6fe82a71f2e2049480f54162b852d6f26468dcc 100644 (file)
@@ -67,10 +67,7 @@ $node_subscriber->safe_psql('postgres',
);
# Wait for initial sync of all subscriptions
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
# insert data to truncate
);
# wait for initial data sync
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
# insert data to truncate
index e991a0803217384a437573cb68887015dc15c90d..3d96f6f30f825c1bf6c1b184a3a0efe4015db815 100644 (file)
@@ -40,10 +40,7 @@ $node_subscriber->safe_psql('postgres',
);
# Wait for initial sync of all subscriptions
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
is( $result, qq(1|22
index 0dfbbabc3b0365a532925725920b9f225d14876a..8b33e4e7ae15afc10f84d49cb280f8342b149116 100644 (file)
@@ -153,12 +153,8 @@ ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
});
# Wait for initial sync of all subscriptions
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber1->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-$node_subscriber2->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
# Tests for replication using leaf partition identity and schema
"ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
# Wait for initial sync of all subscriptions
-$node_subscriber1->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-$node_subscriber2->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
# check that data is synced correctly
$result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2");
@@ -568,8 +562,7 @@ $node_subscriber2->safe_psql('postgres',
# make sure the subscription on the second subscriber is synced, before
# continuing
-$node_subscriber2->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->wait_for_subscription_sync;
# Insert a change into the leaf partition, should be replicated through
# the partition root (thanks to the FOR ALL TABLES partition).
$node_subscriber2->safe_psql('postgres',
"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
-$node_subscriber2->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->wait_for_subscription_sync;
# Make partition map cache
$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
index a1f03e7adc2ad24199659667e22eeec837e9bea4..8d8b35721fcb88a714186bb8379f936094001a47 100644 (file)
@@ -46,10 +46,7 @@ $node_subscriber->safe_psql('postgres',
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
# Ensure nodes are in sync with each other
-$node_publisher->wait_for_catchup('tsub');
-$node_subscriber->poll_query_until('postgres',
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
-) or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
# Insert some content and make sure it's replicated across
$node_publisher->safe_psql(
index 6561b189de8ba552ce1d81cae9fd6e2ef014d643..cbaa327e441d96f574efbae1bf28c104dd8ffc3e 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
index f27f1694f291dc27cfeef7ea2f911f4f985c06f9..bc0a9cd0531a4b9c56ff9fda79a5c22d1022a0ea 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
index 0bce63b7167c2b918c08175cfecd456b45156acd..866f1512e47b4f1462f1134b8df3460a145b87d9 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
index 7155442e76cb234bb23f74df409cb508cc5ec800..551f16df6ddbc9d108559fe3facfae1b895c3bef 100644 (file)
@@ -40,13 +40,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
index dbd0fca4d1e7a991de437f3512d42b31d8a76453..4d7da82b7a8021582b43318a099d8682793913b0 100644 (file)
@@ -41,13 +41,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
index c3e9857f7ce88100d4716d2859f6ad17d4371e90..caa90897ec0f26525e9304ecdd725e13e50a7634 100644 (file)
PUBLICATION tap_pub
WITH (two_phase = on)");
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for two-phase to be enabled
my $twophase_query =
PUBLICATION tap_pub_copy
WITH (two_phase=on, copy_data=false);");
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname_copy);
-
-# Also wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
# Also wait for two-phase to be enabled
$node_subscriber->poll_query_until('postgres', $twophase_query)
index d8475d25a497bac2c6928dd49ecc3eb1d47b8ca7..9b454106bdf3f20a6e77815785017a2ba4967eff 100644 (file)
PUBLICATION tap_pub
WITH (streaming = on, two_phase = on)");
-# Wait for subscriber to finish initialization
-$node_publisher->wait_for_catchup($appname);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for two-phase to be enabled
my $twophase_query =
index 246f8c923724cfbbacc21e6ac47a619f4265228c..eaf47e66f1acb9f06df2292fbbf915ce7bf9a83b 100644 (file)
@@ -37,13 +37,7 @@ $node_subscriber->safe_psql('postgres',
);
# Wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Check the initial data of tab_1 is copied to subscriber
my $result = $node_subscriber->safe_psql('postgres',
@@ -67,10 +61,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
# Wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Check the initial data of tab_drop_refresh was copied to subscriber
$result = $node_subscriber->safe_psql('postgres',
@@ -82,10 +73,7 @@ $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
# Wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-
-$node_publisher->wait_for_catchup('tap_sub');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
# Check the initial data of tab_1 was copied to subscriber again
$result = $node_subscriber->safe_psql('postgres',
index 5ce275cf72545f6996f0717473a50c0f91eef0a9..627c63b529a70173798642a6e923828ae7350fb5 100644 (file)
@@ -62,13 +62,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema"
);
-$node_publisher->wait_for_catchup('tap_sub_schema');
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_schema');
# Check the schema table data is synced up
my $result = $node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
# Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql('postgres', "INSERT INTO sch1.tab3 VALUES(11)");
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
# Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
"ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION");
# Wait for sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"
index e8f3e4bba11582da7d20c9ede7580ef9308279c2..f9f6117dfb49a5e7f6c8b7c11f2000d5ec6acf2c 100644 (file)
CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
));
-$node_publisher->wait_for_catchup('admin_sub');
-
-# Wait for initial sync to finish as well
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub');
# Verify that "regress_admin" can replicate into the tables
#
index b1fb2d7cae4126b7f428be634b704d6ed4775d84..f5f8a670920ff4329d1d8f51473401a723398b92 100644 (file)
@@ -17,9 +17,6 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $appname = 'tap_sub';
@@ -48,10 +45,8 @@ $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_forall"
);
-$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# The subscription of the FOR ALL TABLES publication means there should be no
# filtering on the tablesync COPY, so all expect all 5 will be present.
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_x, tap_pub_allinschema"
);
-$node_publisher->wait_for_catchup($appname);
# wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# The subscription of the ALL TABLES IN SCHEMA publication means there should be
# no filtering on the tablesync COPY, so expect all 5 will be present.
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
);
-$node_publisher->wait_for_catchup($appname);
-
# wait for initial table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Check expected replicated rows for tab_rowfilter_1
# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
# wait for table synchronization to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
index 303e8ec3fc2461a5a824fd3d9cd541f827eb2033..05daa77c58a3c0593d7ea0e05914e62bd71768ff 100644 (file)
@@ -124,10 +124,7 @@ $node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
# Wait for the data to replicate.
-$node_publisher->wait_for_catchup('sub');
-$node_subscriber->poll_query_until('postgres',
- "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
-);
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
# Confirm that we have finished the table sync.
my $result =
index e9241d29966af328a5c6f7862a928a25a914c358..b297a51f7c7288496fdd2806a41b700da3164914 100644 (file)
PUBLICATION tap_pub_B
WITH (origin = none, copy_data = off)");
-# Wait for subscribers to finish initialization
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
-
-# Also wait for initial table sync to finish
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_A->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-$node_B->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_A->wait_for_subscription_sync($node_B, $appname_A);
+$node_B->wait_for_subscription_sync($node_A, $appname_B1);
is(1, 1, 'Bidirectional replication setup is complete');
PUBLICATION tap_pub_C
WITH (origin = none)");
-$node_C->wait_for_catchup($appname_B2);
-
-$node_B->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->wait_for_subscription_sync($node_C, $appname_B2);
# insert a record
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
index 9fa6e0b35ffb923a54a832c0ce9ef08cce358fc4..b6644556cf4931187abacf7015fd47cbcb4dff90 100644 (file)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my $offset = 0;
-sub wait_for_subscription_sync
-{
- my ($node) = @_;
-
- # Also wait for initial table sync to finish
- my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-
- $node->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-}
-
# setup tables on both nodes
# tab1: simple 1:1 replication
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
# tab1: only (a,b) is replicated
$result =
# wait for the tablesync to complete, add a bit more data and then check
# the results of the replication
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub3
));
-wait_for_subscription_sync($node_subscriber);
-
-$node_publisher->wait_for_catchup('sub1');
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1');
# insert data and make sure the columns in column list get fully replicated
$node_publisher->safe_psql(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub5
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub6
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub7
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
TRUNCATE test_part_c;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub9
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub1, pub2;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub3;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
ALTER SUBSCRIPTION sub1 SET PUBLICATION pub4;
));
-wait_for_subscription_sync($node_subscriber);
+$node_subscriber->wait_for_subscription_sync;
$node_publisher->safe_psql(
'postgres', qq(
index 11ba47371596c939842aad4e5328dbe31d2b1b5f..6247aa773014ae796560074a54008824dfbab32a 100644 (file)
# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
# when tablesync workers might still be running. So in addition to that,
# verify that tables are synced.
-# XXX maybe this should be integrated in wait_for_catchup() itself.
-$node_twoways->wait_for_catchup('testsub');
-my $synced_query =
- "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_twoways->poll_query_until('d2', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+$node_twoways->wait_for_subscription_sync($node_twoways, 'testsub', 'd2');
is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
$rows * 2, "2x$rows rows in t");
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Also wait for initial table sync to finish
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
is( $node_subscriber->safe_psql(
'postgres', "SELECT * FROM tab_replidentity_index"),