index c18db4fd042cc7096acabc357419b95d035f3f57..e3a0e92d363930017034c8d7744bb6ab02df3fc0 100644 (file)
#include <unistd.h>
-/* Size of the streaming replication protocol header */
-#define STREAMING_HEADER_SIZE (1+8+8+8)
+/* Size of the streaming replication protocol headers */
+#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
+#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
const XLogRecPtr InvalidXLogRecPtr = {0, 0};
@@ -374,18 +375,33 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
progname, PQerrorMessage(conn));
return false;
}
- if (r < STREAMING_HEADER_SIZE + 1)
+ if (copybuf[0] == 'k')
{
- fprintf(stderr, _("%s: streaming header too small: %i\n"),
- progname, r);
- return false;
+ /*
+ * keepalive message, sent in 9.2 and newer. We just ignore
+ * this message completely, but need to forward past it
+ * in our reading.
+ */
+ if (r != STREAMING_KEEPALIVE_SIZE)
+ {
+ fprintf(stderr, _("%s: keepalive message is incorrect size: %i\n"),
+ progname, r);
+ return false;
+ }
+ continue;
}
- if (copybuf[0] != 'w')
+ else if (copybuf[0] != 'w')
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
return false;
}
+ if (r < STREAMING_HEADER_SIZE + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ progname, r);
+ return false;
+ }
/* Extract WAL location for this block */
memcpy(&blockpos, copybuf + 1, 8);