454{
455 char query[128];
456 char slotcmd[128];
459
460 /*
461 * The caller should've checked the server version already, but doesn't do
462 * any harm to check it here too.
463 */
465 return false;
466
467 /*
468 * Decide whether we want to report the flush position. If we report the
469 * flush position, the primary will know what WAL we'll possibly
470 * re-request, and it can then remove older WAL safely. We must always do
471 * that when we are using slots.
472 *
473 * Reporting the flush position makes one eligible as a synchronous
474 * replica. People shouldn't include generic names in
475 * synchronous_standby_names, but we've protected them against it so far,
476 * so let's continue to do so unless specifically requested.
477 */
479 {
482 }
483 else
484 {
487 else
489 slotcmd[0] = 0;
490 }
491
493 {
494 char *sysidentifier = NULL;
496
497 /*
498 * Get the server system identifier and timeline, and validate them.
499 */
501 {
503 return false;
504 }
505
507 {
508 pg_log_error(
"system identifier does not match between base backup and streaming connection");
510 return false;
511 }
513
515 {
516 pg_log_error(
"starting timeline %u is not present in the server",
518 return false;
519 }
520 }
521
522 /*
523 * initialize flush position to starting point, it's the caller's
524 * responsibility that that's sane.
525 */
527
528 while (1)
529 {
530 /*
531 * Fetch the timeline history file for this timeline, if we don't have
532 * it already. When streaming log to tar, this will always return
533 * false, as we are never streaming into an existing file and
534 * therefore there can be no pre-existing timeline history file.
535 */
537 {
541 {
542 /* FIXME: we might send it ok, but get an error */
543 pg_log_error(
"could not send replication command \"%s\": %s",
546 return false;
547 }
548
549 /*
550 * The response to TIMELINE_HISTORY is a single row result set
551 * with two fields: filename and content
552 */
554 {
555 pg_log_warning(
"unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
557 }
558
559 /* Write the history file to disk */
563
565 }
566
567 /*
568 * Before we start streaming from the requested location, check if the
569 * callback tells us to stop here.
570 */
572 return true;
573
574 /* Initiate the replication stream at specified location */
575 snprintf(query,
sizeof(query),
"START_REPLICATION %s%X/%08X TIMELINE %u",
576 slotcmd,
581 {
582 pg_log_error(
"could not send replication command \"%s\": %s",
585 return false;
586 }
588
589 /* Stream the WAL */
591 if (res == NULL)
593
594 /*
595 * Streaming finished.
596 *
597 * There are two possible reasons for that: a controlled shutdown, or
598 * we reached the end of the current timeline. In case of
599 * end-of-timeline, the server sends a result set after Copy has
600 * finished, containing information about the next timeline. Read
601 * that, and restart streaming from the next timeline. In case of
602 * controlled shutdown, stop here.
603 */
605 {
606 /*
607 * End-of-timeline. Read the next timeline's ID and starting
608 * position. Usually, the starting position will match the end of
609 * the previous timeline, but there are corner cases like if the
610 * server had sent us half of a WAL record, when it was promoted.
611 * The new timeline will begin at the end of the last complete
612 * record in that case, overlapping the partial WAL record on the
613 * old timeline.
614 */
616 bool parsed;
617
620 if (!parsed)
622
623 /* Sanity check the values the server gave us */
624 if (newtimeline <= stream->timeline)
625 {
626 pg_log_error(
"server reported unexpected next timeline %u, following timeline %u",
629 }
631 {
632 pg_log_error(
"server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
636 }
637
638 /* Read the final result, which should be CommandComplete. */
641 {
642 pg_log_error(
"unexpected termination of replication stream: %s",
646 }
648
649 /*
650 * Loop back to start streaming from the new timeline. Always
651 * start streaming at the beginning of a segment.
652 */
656 continue;
657 }
659 {
661
662 /*
663 * End of replication (ie. controlled shut down of the server).
664 *
665 * Check if the callback thinks it's OK to stop here. If not,
666 * complain.
667 */
669 return true;
670 else
671 {
672 pg_log_error(
"replication stream was terminated before stop point");
674 }
675 }
676 else
677 {
678 /* Server returned an error. */
679 pg_log_error(
"unexpected termination of replication stream: %s",
683 }
684 }
685
691 return false;
692}
PGresult * PQexec(PGconn *conn, const char *query)
#define PQresultErrorMessage
#define pg_log_warning(...)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool reportFlushPosition
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool CheckServerVersionForStreaming(PGconn *conn)
static XLogRecPtr lastFlushPosition
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
WalWriteMethod * walmethod
int(* close)(Walfile *f, WalCloseMethod method)
const WalWriteMethodOps * ops
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)