1/*-------------------------------------------------------------------------
3 * receivelog.c - receive WAL files using the streaming
4 * replication protocol.
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
28/* currently open WAL file */
33 static bool still_sending =
true;
/* feedback still needs to be sent? */
59 snprintf(tmppath,
sizeof(tmppath),
"archive_status/%s.done",
66 pg_log_error(
"could not create archive status file \"%s\": %s",
73 pg_log_error(
"could not close archive status file \"%s\": %s",
82 * Open a new WAL file in the specified directory.
84 * Returns true if OK; on failure, returns false after printing an error msg.
85 * On success, 'walfile' is set to the opened WAL file.
87 * The file will be padded to 16Mb with zeroes.
101 /* Note that this considers the compression used if necessary */
107 * When streaming to files, if an existing file exists we verify that it's
108 * either empty (just created), or a complete WalSegSz segment (in which
109 * case it has been created and padded). Anything else indicates a corrupt
110 * file. Compressed files have no need for padding, so just ignore this
113 * When streaming to tar, no file with this name will exist before, so we
114 * never have to verify a size.
122 pg_log_error(
"could not get size of write-ahead log file \"%s\": %s",
129 /* Already padded file. Open it for use */
133 pg_log_error(
"could not open existing write-ahead log file \"%s\": %s",
139 /* fsync file in case of a previous crash */
142 pg_log_error(
"could not fsync existing write-ahead log file \"%s\": %s",
154 /* if write didn't set errno, assume problem is no disk space */
158 "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
164 /* File existed and was empty, so fall through and open */
167 /* No file existed, so create one */
175 pg_log_error(
"could not open write-ahead log file \"%s\": %s",
187 * Close the current WAL file (if open), and rename it to the correct
188 * filename if it's complete. On failure, prints an error message to stderr
189 * and returns false, otherwise returns true.
205 /* Note that this considers the compression used if necessary */
216 pg_log_info(
"not renaming \"%s\", segment is not complete",
fn);
237 * Mark file as archived if requested by the caller - pg_basebackup needs
238 * to do so as files can otherwise get archived again after promotion of a
239 * new node. This is in line with walreceiver.c always doing a
240 * XLogArchiveForceDone() after a complete segment.
244 /* writes error message if failed */
255 * Check if a timeline history file exists.
263 * Timeline 1 never has a history file. We treat that as if it existed,
264 * since we never need to stream it.
277 int size = strlen(content);
282 * Check that the server's idea of how timeline history files should be
283 * named matches ours.
286 if (strcmp(histfname,
filename) != 0)
288 pg_log_error(
"server reported unexpected history file name for timeline %u: %s",
294 histfname,
".tmp", 0);
297 pg_log_error(
"could not create timeline history file \"%s\": %s",
304 pg_log_error(
"could not write timeline history file \"%s\": %s",
308 * If we fail to make the file, delete it to release disk space
322 /* Maintain archive_status, check close_walfile() for details. */
325 /* writes error message if failed */
334 * Send a Standby Status Update message to server.
339 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
355 replybuf[
len] = replyRequested ? 1 : 0;
/* replyRequested */
369 * Check that the server version we're connected to is supported by
370 * ReceiveXlogStream().
372 * If it's not, an error message is printed to stderr, and false is returned.
382 * The message format used in streaming replication changed in 9.3, so we
383 * cannot stream from older servers. And we don't support servers newer
384 * than the client; it might work, but we don't know, so err on the safe
387 minServerMajor = 903;
388 maxServerMajor = PG_VERSION_NUM / 100;
390 if (serverMajor < minServerMajor)
394 pg_log_error(
"incompatible server version %s; client does not support streaming from server versions older than %s",
395 serverver ? serverver :
"'unknown'",
399 else if (serverMajor > maxServerMajor)
403 pg_log_error(
"incompatible server version %s; client does not support streaming from server versions newer than %s",
404 serverver ? serverver :
"'unknown'",
412 * Receive a log stream starting at the specified position.
414 * Individual parameters are passed through the StreamCtl structure.
416 * If sysidentifier is specified, validate that both the system
417 * identifier and the timeline matches the specified ones
418 * (by sending an extra IDENTIFY_SYSTEM command)
420 * All received segments will be written to the directory
421 * specified by basedir. This will also fetch any missing timeline history
424 * The stream_stop callback will be called every time data
425 * is received, and whenever a segment is completed. If it returns
426 * true, the streaming will stop and the function
427 * return. As long as it returns false, streaming will continue
430 * If stream_stop() checks for external input, stop_socket should be set to
431 * the FD it checks. This will allow such input to be detected promptly
432 * rather than after standby_message_timeout (which might be indefinite).
433 * Note that signals will interrupt waits for input as well, but that is
434 * race-y since a signal received while busy won't interrupt the wait.
436 * standby_message_timeout controls how often we send a message
437 * back to the primary letting it know our progress, in milliseconds.
438 * Zero means no messages are sent.
439 * This message will only contain the write location, and never
442 * If 'partial_suffix' is not NULL, files are initially created with the
443 * given suffix, and the suffix is removed once the file is finished. That
444 * allows you to tell the difference between partial and completed files,
445 * so that you can continue later where you left.
447 * If 'synchronous' is true, the received WAL is flushed as soon as written,
448 * otherwise only when the WAL file is closed.
450 * Note: The WAL location *must* be at a log segment start!
461 * The caller should've checked the server version already, but doesn't do
462 * any harm to check it here too.
468 * Decide whether we want to report the flush position. If we report the
469 * flush position, the primary will know what WAL we'll possibly
470 * re-request, and it can then remove older WAL safely. We must always do
471 * that when we are using slots.
473 * Reporting the flush position makes one eligible as a synchronous
474 * replica. People shouldn't include generic names in
475 * synchronous_standby_names, but we've protected them against it so far,
476 * so let's continue to do so unless specifically requested.
494 char *sysidentifier = NULL;
498 * Get the server system identifier and timeline, and validate them.
508 pg_log_error(
"system identifier does not match between base backup and streaming connection");
516 pg_log_error(
"starting timeline %u is not present in the server",
523 * initialize flush position to starting point, it's the caller's
524 * responsibility that that's sane.
531 * Fetch the timeline history file for this timeline, if we don't have
532 * it already. When streaming log to tar, this will always return
533 * false, as we are never streaming into an existing file and
534 * therefore there can be no pre-existing timeline history file.
542 /* FIXME: we might send it ok, but get an error */
543 pg_log_error(
"could not send replication command \"%s\": %s",
550 * The response to TIMELINE_HISTORY is a single row result set
551 * with two fields: filename and content
555 pg_log_warning(
"unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
559 /* Write the history file to disk */
568 * Before we start streaming from the requested location, check if the
569 * callback tells us to stop here.
574 /* Initiate the replication stream at specified location */
575 snprintf(query,
sizeof(query),
"START_REPLICATION %s%X/%08X TIMELINE %u",
582 pg_log_error(
"could not send replication command \"%s\": %s",
595 * Streaming finished.
597 * There are two possible reasons for that: a controlled shutdown, or
598 * we reached the end of the current timeline. In case of
599 * end-of-timeline, the server sends a result set after Copy has
600 * finished, containing information about the next timeline. Read
601 * that, and restart streaming from the next timeline. In case of
602 * controlled shutdown, stop here.
607 * End-of-timeline. Read the next timeline's ID and starting
608 * position. Usually, the starting position will match the end of
609 * the previous timeline, but there are corner cases like if the
610 * server had sent us half of a WAL record, when it was promoted.
611 * The new timeline will begin at the end of the last complete
612 * record in that case, overlapping the partial WAL record on the
623 /* Sanity check the values the server gave us */
624 if (newtimeline <= stream->timeline)
626 pg_log_error(
"server reported unexpected next timeline %u, following timeline %u",
632 pg_log_error(
"server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
638 /* Read the final result, which should be CommandComplete. */
642 pg_log_error(
"unexpected termination of replication stream: %s",
650 * Loop back to start streaming from the new timeline. Always
651 * start streaming at the beginning of a segment.
663 * End of replication (ie. controlled shut down of the server).
665 * Check if the callback thinks it's OK to stop here. If not,
672 pg_log_error(
"replication stream was terminated before stop point");
678 /* Server returned an error. */
679 pg_log_error(
"unexpected termination of replication stream: %s",
695 * Helper function to parse the result set returned by server after streaming
696 * has finished. On failure, prints an error to stderr and returns false.
705 * The result set consists of one row and two columns, e.g:
707 * next_tli | next_tli_startpos
708 * ----------+-------------------
711 * next_tli is the timeline ID of the next timeline after the one that
712 * just finished streaming. next_tli_startpos is the WAL location where
713 * the server switched to it.
718 pg_log_error(
"unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
724 if (sscanf(
PQgetvalue(res, 0, 1),
"%X/%08X", &startpos_xlogid,
725 &startpos_xrecoff) != 2)
727 pg_log_error(
"could not parse next timeline's starting point \"%s\"",
737 * The main loop of ReceiveXlogStream. Handles the COPY stream after
738 * initiating streaming with the START_REPLICATION command.
740 * If the COPY ends (not necessarily successfully) due a message from the
741 * server, returns a PGresult and sets *stoppos to the last byte written.
742 * On any other sort of error, returns NULL.
761 * Check if we should continue streaming, or abort at this point.
769 * If synchronous option is true, issue sync command as soon as there
770 * are WAL data which has not been flushed yet.
775 pg_fatal(
"could not fsync file \"%s\": %s",
780 * Send feedback so that the server sees the latest WAL locations
789 * Potentially send a status message to the primary
795 /* Time to send feedback! */
802 * Calculate how long send/receive loops should sleep
807 /* Done with any prior message */
826 /* Check the message type. */
839 * Check if we should continue streaming, or abort at this
852 /* Done with that message */
857 * Process the received data, and any subsequent data we can read
870 * Wait until we can read a CopyData message,
871 * or timeout, or occurrence of a signal or input on the stop_socket.
872 * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
874 * Returns 1 if data has become available for reading, 0 if timed out
875 * or interrupted by signal or stop_socket input, and -1 on an error.
884 struct timeval timeout;
885 struct timeval *timeoutptr;
894 FD_ZERO(&input_mask);
895 FD_SET(connsocket, &input_mask);
899 FD_SET(stop_socket, &input_mask);
900 maxfd =
Max(maxfd, stop_socket);
907 timeout.tv_sec = timeout_ms / 1000L;
908 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
909 timeoutptr = &timeout;
912 ret =
select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
917 return 0;
/* Got a signal, so not an error */
921 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
922 return 1;
/* Got input on connection socket */
924 return 0;
/* Got timeout or input on stop_socket */
928 * Receive CopyData message available from XLOG stream, blocking for
929 * maximum of 'timeout' ms.
931 * If data was received, returns the length of the data. *buffer is set to
932 * point to a buffer holding the received message. The caller must eventually
933 * free the buffer with PQfreemem().
935 * Returns 0 if no data was available within timeout, or if wait was
936 * interrupted by signal or stop_socket input.
937 * -1 on error. -2 if the server ended the COPY.
946 /* Caller should have cleared any prior buffer */
949 /* Try to receive a CopyData message */
956 * No data available. Wait for some to appear, but not longer than
957 * the specified timeout, so that we can ping the server. Also stop
958 * waiting if input appears on stop_socket.
964 /* Now there is actually data on the socket */
967 pg_log_error(
"could not receive data from WAL stream: %s",
972 /* Now that we've consumed some input, try again */
977 if (rawlen == -1)
/* end-of-streaming or error */
985 /* Return received messages to caller */
991 * Process the keepalive message.
1002 * Parse the keepalive message, enclosed in the CopyData message. We just
1003 * check if the server requested a reply, and ignore the rest.
1005 pos = 1;
/* skip msgtype PqReplMsg_Keepalive */
1006 pos += 8;
/* skip walEnd */
1007 pos += 8;
/* skip sendTime */
1014 replyRequested =
copybuf[pos];
1016 /* If the server requested an immediate reply, send one. */
1023 * If a valid flush location needs to be reported, flush the
1024 * current WAL file so that the latest flush location is sent back
1025 * to the server. This is necessary to see whether the last WAL
1026 * data has been successfully replicated or not, at the normal
1027 * shutdown of the server.
1030 pg_fatal(
"could not fsync file \"%s\": %s",
1045 * Process WALData message.
1057 * Once we've decided we don't want to receive any more, just ignore any
1058 * subsequent WALData messages.
1064 * Read the header of the WALData message, enclosed in the CopyData
1065 * message. We only need the WAL location field (dataStart), the rest of
1066 * the header is ignored.
1068 hdr_len = 1;
/* msgtype PqReplMsg_WALData */
1069 hdr_len += 8;
/* dataStart */
1070 hdr_len += 8;
/* walEnd */
1071 hdr_len += 8;
/* sendTime */
1079 /* Extract WAL location for this block */
1083 * Verify that the initial location in the stream matches where we think
1088 /* No file open yet */
1091 pg_log_error(
"received write-ahead log record for offset %u with no file open",
1098 /* More data in existing segment */
1101 pg_log_error(
"got WAL data offset %08x, expected %08x",
1107 bytes_left =
len - hdr_len;
1115 * If crossing a WAL boundary, only write up until we reach wal
1118 if (xlogoff + bytes_left >
WalSegSz)
1119 bytes_to_write =
WalSegSz - xlogoff;
1121 bytes_to_write = bytes_left;
1127 /* Error logged by open_walfile */
1133 copybuf + hdr_len + bytes_written,
1134 bytes_to_write) != bytes_to_write)
1136 pg_log_error(
"could not write %d bytes to WAL file \"%s\": %s",
1142 /* Write was successful, advance our position */
1143 bytes_written += bytes_to_write;
1144 bytes_left -= bytes_to_write;
1145 *blockpos += bytes_to_write;
1146 xlogoff += bytes_to_write;
1148 /* Did we reach the end of a WAL segment? */
1152 /* Error message written in close_walfile() */
1166 return true;
/* ignore the rest of this WALData packet */
1170 /* No more data left to write, receive next copy packet */
1176 * Handle end of the copy stream.
1185 * The server closed its end of the copy stream. If we haven't closed
1186 * ours already, we need to do so now, unless the server threw an error,
1187 * in which case we don't.
1193 /* Error message written in close_walfile() */
1210 *stoppos = blockpos;
1215 * Check if we should continue streaming, or abort at this point.
1224 /* Potential error message is written by close_walfile */
1240 * Calculate how long send/receive loops should sleep
1250 status_targettime = last_status +
1253 if (status_targettime > 0)
1262 /* Always sleep at least 1 sec */
1269 sleeptime = secs * 1000 + usecs / 1000;
Datum now(PG_FUNCTION_ARGS)
#define ngettext(s, p, n)
int PQserverVersion(const PGconn *conn)
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQflush(PGconn *conn)
void PQfreemem(void *ptr)
int PQputCopyEnd(PGconn *conn, const char *errormsg)
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
int PQconsumeInput(PGconn *conn)
PGresult * PQexec(PGconn *conn, const char *query)
int PQgetCopyData(PGconn *conn, char **buffer, int async)
Assert(PointerIsAligned(start, uint64))
#define PQresultErrorMessage
#define pg_log_error(...)
static int standby_message_timeout
static XLogRecPtr startpos
#define pg_log_warning(...)
size_t strlcpy(char *dst, const char *src, size_t siz)
#define PqReplMsg_WALData
#define PqReplMsg_Keepalive
#define PqReplMsg_StandbyStatusUpdate
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
static bool mark_file_as_archived(StreamCtl *stream, const char *fname)
static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer)
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status)
static bool reportFlushPosition
static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr *blockpos)
static bool close_walfile(StreamCtl *stream, XLogRecPtr pos)
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool still_sending
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
static PGresult * HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos)
static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout, TimestampTz last_status)
bool CheckServerVersionForStreaming(PGconn *conn)
static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
static XLogRecPtr lastFlushPosition
static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
int64 fe_recvint64(char *buf)
TimestampTz feGetCurrentTimestamp(void)
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void fe_sendint64(int64 i, char *buf)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
int standby_message_timeout
WalWriteMethod * walmethod
bool(* existsfile)(WalWriteMethod *wwmethod, const char *pathname)
ssize_t(* write)(Walfile *f, const void *buf, size_t count)
ssize_t(* get_file_size)(WalWriteMethod *wwmethod, const char *pathname)
int(* close)(Walfile *f, WalCloseMethod method)
char *(* get_file_name)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
const WalWriteMethodOps * ops
pg_compress_algorithm compression_algorithm
static StringInfo copybuf
static void * fn(void *arg)
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define select(n, r, w, e, timeout)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr