PostgreSQL Source Code git master
Data Structures | Typedefs | Functions
receivelog.h File Reference
#include "access/xlogdefs.h"
#include "libpq-fe.h"
#include "walmethods.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct   StreamCtl
 

Typedefs

typedef bool(*  stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
 
typedef struct StreamCtl  StreamCtl
 

Functions

 
 

Typedef Documentation

stream_stop_callback

typedef bool(* stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 23 of file receivelog.h.

StreamCtl

typedef struct StreamCtl StreamCtl

Function Documentation

CheckServerVersionForStreaming()

bool CheckServerVersionForStreaming ( PGconnconn )

Definition at line 375 of file receivelog.c.

376{
377 int minServerMajor,
378 maxServerMajor;
379 int serverMajor;
380
381 /*
382 * The message format used in streaming replication changed in 9.3, so we
383 * cannot stream from older servers. And we don't support servers newer
384 * than the client; it might work, but we don't know, so err on the safe
385 * side.
386 */
387 minServerMajor = 903;
388 maxServerMajor = PG_VERSION_NUM / 100;
389 serverMajor = PQserverVersion(conn) / 100;
390 if (serverMajor < minServerMajor)
391 {
392 const char *serverver = PQparameterStatus(conn, "server_version");
393
394 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
395 serverver ? serverver : "'unknown'",
396 "9.3");
397 return false;
398 }
399 else if (serverMajor > maxServerMajor)
400 {
401 const char *serverver = PQparameterStatus(conn, "server_version");
402
403 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
404 serverver ? serverver : "'unknown'",
405 PG_VERSION);
406 return false;
407 }
408 return true;
409}
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:7669
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:7634
#define pg_log_error(...)
Definition: logging.h:106
PGconn * conn
Definition: streamutil.c:52

References conn, pg_log_error, PQparameterStatus(), and PQserverVersion().

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

ReceiveXlogStream()

bool ReceiveXlogStream ( PGconnconn,
StreamCtlstream 
)

Definition at line 453 of file receivelog.c.

454{
455 char query[128];
456 char slotcmd[128];
457 PGresult *res;
458 XLogRecPtr stoppos;
459
460 /*
461 * The caller should've checked the server version already, but doesn't do
462 * any harm to check it here too.
463 */
465 return false;
466
467 /*
468 * Decide whether we want to report the flush position. If we report the
469 * flush position, the primary will know what WAL we'll possibly
470 * re-request, and it can then remove older WAL safely. We must always do
471 * that when we are using slots.
472 *
473 * Reporting the flush position makes one eligible as a synchronous
474 * replica. People shouldn't include generic names in
475 * synchronous_standby_names, but we've protected them against it so far,
476 * so let's continue to do so unless specifically requested.
477 */
478 if (stream->replication_slot != NULL)
479 {
480 reportFlushPosition = true;
481 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
482 }
483 else
484 {
485 if (stream->synchronous)
486 reportFlushPosition = true;
487 else
488 reportFlushPosition = false;
489 slotcmd[0] = 0;
490 }
491
492 if (stream->sysidentifier != NULL)
493 {
494 char *sysidentifier = NULL;
495 TimeLineID servertli;
496
497 /*
498 * Get the server system identifier and timeline, and validate them.
499 */
500 if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
501 {
502 pg_free(sysidentifier);
503 return false;
504 }
505
506 if (strcmp(stream->sysidentifier, sysidentifier) != 0)
507 {
508 pg_log_error("system identifier does not match between base backup and streaming connection");
509 pg_free(sysidentifier);
510 return false;
511 }
512 pg_free(sysidentifier);
513
514 if (stream->timeline > servertli)
515 {
516 pg_log_error("starting timeline %u is not present in the server",
517 stream->timeline);
518 return false;
519 }
520 }
521
522 /*
523 * initialize flush position to starting point, it's the caller's
524 * responsibility that that's sane.
525 */
526 lastFlushPosition = stream->startpos;
527
528 while (1)
529 {
530 /*
531 * Fetch the timeline history file for this timeline, if we don't have
532 * it already. When streaming log to tar, this will always return
533 * false, as we are never streaming into an existing file and
534 * therefore there can be no pre-existing timeline history file.
535 */
536 if (!existsTimeLineHistoryFile(stream))
537 {
538 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
539 res = PQexec(conn, query);
541 {
542 /* FIXME: we might send it ok, but get an error */
543 pg_log_error("could not send replication command \"%s\": %s",
544 "TIMELINE_HISTORY", PQresultErrorMessage(res));
545 PQclear(res);
546 return false;
547 }
548
549 /*
550 * The response to TIMELINE_HISTORY is a single row result set
551 * with two fields: filename and content
552 */
553 if (PQnfields(res) != 2 || PQntuples(res) != 1)
554 {
555 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556 PQntuples(res), PQnfields(res), 1, 2);
557 }
558
559 /* Write the history file to disk */
561 PQgetvalue(res, 0, 0),
562 PQgetvalue(res, 0, 1));
563
564 PQclear(res);
565 }
566
567 /*
568 * Before we start streaming from the requested location, check if the
569 * callback tells us to stop here.
570 */
571 if (stream->stream_stop(stream->startpos, stream->timeline, false))
572 return true;
573
574 /* Initiate the replication stream at specified location */
575 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%08X TIMELINE %u",
576 slotcmd,
577 LSN_FORMAT_ARGS(stream->startpos),
578 stream->timeline);
579 res = PQexec(conn, query);
581 {
582 pg_log_error("could not send replication command \"%s\": %s",
583 "START_REPLICATION", PQresultErrorMessage(res));
584 PQclear(res);
585 return false;
586 }
587 PQclear(res);
588
589 /* Stream the WAL */
590 res = HandleCopyStream(conn, stream, &stoppos);
591 if (res == NULL)
592 goto error;
593
594 /*
595 * Streaming finished.
596 *
597 * There are two possible reasons for that: a controlled shutdown, or
598 * we reached the end of the current timeline. In case of
599 * end-of-timeline, the server sends a result set after Copy has
600 * finished, containing information about the next timeline. Read
601 * that, and restart streaming from the next timeline. In case of
602 * controlled shutdown, stop here.
603 */
605 {
606 /*
607 * End-of-timeline. Read the next timeline's ID and starting
608 * position. Usually, the starting position will match the end of
609 * the previous timeline, but there are corner cases like if the
610 * server had sent us half of a WAL record, when it was promoted.
611 * The new timeline will begin at the end of the last complete
612 * record in that case, overlapping the partial WAL record on the
613 * old timeline.
614 */
615 uint32 newtimeline;
616 bool parsed;
617
618 parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
619 PQclear(res);
620 if (!parsed)
621 goto error;
622
623 /* Sanity check the values the server gave us */
624 if (newtimeline <= stream->timeline)
625 {
626 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
627 newtimeline, stream->timeline);
628 goto error;
629 }
630 if (stream->startpos > stoppos)
631 {
632 pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
633 stream->timeline, LSN_FORMAT_ARGS(stoppos),
634 newtimeline, LSN_FORMAT_ARGS(stream->startpos));
635 goto error;
636 }
637
638 /* Read the final result, which should be CommandComplete. */
639 res = PQgetResult(conn);
641 {
642 pg_log_error("unexpected termination of replication stream: %s",
644 PQclear(res);
645 goto error;
646 }
647 PQclear(res);
648
649 /*
650 * Loop back to start streaming from the new timeline. Always
651 * start streaming at the beginning of a segment.
652 */
653 stream->timeline = newtimeline;
654 stream->startpos = stream->startpos -
656 continue;
657 }
658 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
659 {
660 PQclear(res);
661
662 /*
663 * End of replication (ie. controlled shut down of the server).
664 *
665 * Check if the callback thinks it's OK to stop here. If not,
666 * complain.
667 */
668 if (stream->stream_stop(stoppos, stream->timeline, false))
669 return true;
670 else
671 {
672 pg_log_error("replication stream was terminated before stop point");
673 goto error;
674 }
675 }
676 else
677 {
678 /* Server returned an error. */
679 pg_log_error("unexpected termination of replication stream: %s",
681 PQclear(res);
682 goto error;
683 }
684 }
685
686error:
687 if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
688 pg_log_error("could not close file \"%s\": %s",
690 walfile = NULL;
691 return false;
692}
uint32_t uint32
Definition: c.h:538
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2273
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define PQresultErrorMessage
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQgetResult
Definition: libpq-be-fe.h:246
#define PQclear
Definition: libpq-be-fe.h:245
#define PQnfields
Definition: libpq-be-fe.h:252
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQntuples
Definition: libpq-be-fe.h:251
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:137
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define pg_log_warning(...)
Definition: pgfnames.c:24
#define sprintf
Definition: port.h:241
#define snprintf
Definition: port.h:239
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
Definition: receivelog.c:745
static bool reportFlushPosition
Definition: receivelog.c:30
static bool existsTimeLineHistoryFile(StreamCtl *stream)
Definition: receivelog.c:258
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
Definition: receivelog.c:275
static Walfile * walfile
Definition: receivelog.c:29
bool CheckServerVersionForStreaming(PGconn *conn)
Definition: receivelog.c:375
static XLogRecPtr lastFlushPosition
Definition: receivelog.c:31
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
Definition: receivelog.c:699
static void error(void)
Definition: sql-dyntest.c:147
int WalSegSz
Definition: streamutil.c:32
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:409
char * sysidentifier
Definition: receivelog.h:33
TimeLineID timeline
Definition: receivelog.h:32
stream_stop_callback stream_stop
Definition: receivelog.h:41
char * replication_slot
Definition: receivelog.h:48
XLogRecPtr startpos
Definition: receivelog.h:31
WalWriteMethod * walmethod
Definition: receivelog.h:46
bool synchronous
Definition: receivelog.h:36
int(* close)(Walfile *f, WalCloseMethod method)
Definition: walmethods.h:55
const WalWriteMethodOps * ops
Definition: walmethods.h:105
char * pathname
Definition: walmethods.h:21
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
@ CLOSE_NO_RENAME
Definition: walmethods.h:35
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
Definition: xlog_internal.h:106
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:46
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:62

References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetResult, PQgetvalue, PQnfields, PQntuples, PQresultErrorMessage, PQresultStatus, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.

Referenced by LogStreamerMain(), and StreamLog().

AltStyle によって変換されたページ (->オリジナル) /