1/*-------------------------------------------------------------------------
4 * Prefetching support for recovery.
6 * Portions Copyright (c) 2022-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/transam/xlogprefetcher.c
13 * This module provides a drop-in replacement for an XLogReader that tries to
14 * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 * accessed in the near future are not already in the buffer pool, it initiates
16 * I/Os that might complete before the caller eventually needs the data. When
17 * referenced blocks are found in the buffer pool already, the buffer is
18 * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 * avoid a second buffer mapping table lookup.
21 * Currently, only the main fork is considered for prefetching. Currently,
22 * prefetching is only effective on systems where PrefetchBuffer() does
23 * something useful (mainly Linux).
25 *-------------------------------------------------------------------------
41#include "utils/fmgrprotos.h"
47 * Every time we process this much WAL, we'll update the values in
48 * pg_stat_recovery_prefetch.
50 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
53 * To detect repeated access to the same block and skip useless extra system
54 * calls, we remember a small window of recently prefetched blocks.
56 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
59 * When maintenance_io_concurrency is not saturated, we're prepared to look
60 * ahead up to N times that number of block references.
62 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
64/* Define to log internal debugging messages. */
65/* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
71#define RecoveryPrefetchEnabled() \
72 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 maintenance_io_concurrency > 0)
75 #define RecoveryPrefetchEnabled() false
81 * Enum used to report whether an IO should be started.
91 * Type of callback that can decide which block to prefetch next. For now
98 * A simple circular queue of LSNs, using to control the number of
99 * (potentially) inflight IOs. This stands in for a later more general IO
100 * control mechanism, which is why it has the apparently unnecessary
101 * indirection through a function pointer.
121 * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122 * blocks that will be soon be referenced, to try to avoid IO stalls.
126 /* WAL reader and current reading state. */
131 /* When to publish stats. */
134 /* Book-keeping to avoid accessing blocks that don't exist yet. */
138 /* Book-keeping to avoid repeat prefetches. */
143 /* Book-keeping to disable prefetching temporarily. */
146 /* IO depth manager. */
155 * A temporary filter used to track block ranges that haven't been created
156 * yet, whole relations that haven't been created yet, and whole relations
157 * that (we assume) have already been dropped, or will be created by bulk WAL
169 * Counters exposed in shared memory for pg_stat_recovery_prefetch.
204 uintptr_t lrq_private,
210 Assert(max_distance >= max_inflight);
212 size = max_distance + 1;
/* full ring buffer has a gap */
247 /* Try to start as many IOs as we can within our limits. */
275 * We know that LSNs before 'lsn' have been replayed, so we can now assume
276 * that any IOs that were started before then have finished.
300 * Reset all counters to zero.
337 * Called when any GUC is changed that affects prefetching.
346 * Increment a counter in shared memory. This is equivalent to *counter++ on a
347 * plain uint64 without any memory barrier or locking, except on platforms
348 * where readers can't read uint64 without possibly observing a torn value.
358 * Create a prefetcher that is ready to begin prefetching blocks referenced by
368 prefetcher->
reader = reader;
380 /* First usage will cause streaming_read to be allocated. */
387 * Destroy a prefetcher and release all resources.
398 * Provide access to the reader.
403 return prefetcher->
reader;
407 * Update the statistics visible in the pg_stat_recovery_prefetch view.
417 /* How far ahead of replay are we now? */
429 /* How many IOs are currently in flight and completed? */
433 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
443 * A callback that examines the next block reference in the WAL, and possibly
444 * starts an IO so that a later read will be fast.
446 * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
448 * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
449 * that isn't in the buffer pool, and the kernel has been asked to start
450 * reading it to make a future read system call faster. An LSN is written to
451 * *lsn, and the I/O will be considered to have completed once that LSN is
454 * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
455 * that it was already in the buffer pool, or we decided for various reasons
466 * We keep track of the record and block we're up to between calls with
467 * prefetcher->record and prefetcher->next_block_id.
473 /* Try to read a new future record, if we don't already have one. */
474 if (prefetcher->
record == NULL)
479 * If there are already records or an error queued up that could
480 * be replayed, we don't want to block here. Otherwise, it's OK
481 * to block waiting for more data: presumably the caller has
482 * nothing else to do.
486 /* Readahead is disabled until we replay past a certain point. */
487 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
494 * We can't read any more, due to an error or lack of data in
495 * nonblocking mode. Don't try to read ahead again until
496 * we've replayed everything already decoded.
506 * If prefetching is disabled, we don't need to analyze the record
507 * or issue any prefetches. We just need to cause one record to
516 /* We have a new record to process. */
517 prefetcher->
record = record;
522 /* Continue to process from last call, or last loop. */
523 record = prefetcher->
record;
527 * Check for operations that require us to filter out block ranges, or
528 * pause readahead completely.
530 if (replaying_lsn < record->lsn)
535 if (rmid == RM_XLOG_ID)
541 * These records might change the TLI. Avoid potential
542 * bugs if we were to allow "read TLI" and "replay TLI" to
543 * differ without more analysis.
547#ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 elog(XLOGPREFETCHER_DEBUG_LEVEL,
549 "suppressing all readahead until %X/%08X is replayed due to possible TLI change",
553 /* Fall through so we move past this record. */
556 else if (rmid == RM_DBASE_ID)
559 * When databases are created with the file-copy strategy,
560 * there are no WAL records to tell us about the creation of
561 * individual relations.
571 * Don't try to prefetch anything in this database until
572 * it has been created, or we might confuse the blocks of
573 * different generations, if a database OID or
574 * relfilenumber is reused. It's also more efficient than
575 * discovering that relations don't exist on disk yet with
580#ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 elog(XLOGPREFETCHER_DEBUG_LEVEL,
582 "suppressing prefetch in database %u until %X/%08X is replayed due to raw file copy",
588 else if (rmid == RM_SMGR_ID)
598 * Don't prefetch anything for this whole relation
599 * until it has been created. Otherwise we might
600 * confuse the blocks of different generations, if a
601 * relfilenumber is reused. This also avoids the need
602 * to discover the problem via extra syscalls that
608#ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 elog(XLOGPREFETCHER_DEBUG_LEVEL,
610 "suppressing prefetch in relation %u/%u/%u until %X/%08X is replayed, which creates the relation",
624 * Don't consider prefetching anything in the truncated
625 * range until the truncation has been performed.
631#ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 elog(XLOGPREFETCHER_DEBUG_LEVEL,
633 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, which truncates the relation",
644 /* Scan the block references, starting where we left off last time. */
658 * Record the LSN of this record. When it's replayed,
659 * LsnReadQueue will consider any IOs submitted for earlier LSNs
664 /* We don't try to prefetch anything but the main fork for now. */
671 * If there is a full page image attached, we won't be reading the
672 * page, so don't bother trying to prefetch.
680 /* There is no point in reading a page that will be zeroed. */
687 /* Should we skip prefetching this block due to a filter? */
694 /* There is no point in repeatedly prefetching the same block. */
701 * XXX If we also remembered where it was, we could set
702 * recent_buffer so that recovery could skip smgropen()
703 * and a buffer table lookup.
715 * We could try to have a fast path for repeated references to the
716 * same relation (with some scheme to handle invalidations
717 * safely), but for now we'll call smgropen() every time.
722 * If the relation file doesn't exist on disk, for example because
723 * we're replaying after a crash and the file will be created and
724 * then unlinked by WAL that hasn't been replayed yet, suppress
725 * further prefetching in the relation until this record is
730#ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 elog(XLOGPREFETCHER_DEBUG_LEVEL,
732 "suppressing all prefetch in relation %u/%u/%u until %X/%08X is replayed, because the relation does not exist on disk",
745 * If the relation isn't big enough to contain the referenced
746 * block yet, suppress prefetching of this block and higher until
747 * this record is replayed.
751#ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 elog(XLOGPREFETCHER_DEBUG_LEVEL,
753 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%08X is replayed, because the relation is too small",
766 /* Try to initiate prefetching. */
770 /* Cache hit, nothing to do. */
777 /* Cache miss, I/O (presumably) started. */
785 * This shouldn't be possible, because we already determined
786 * that the relation exists on disk and is big enough.
787 * Something is wrong with the cache invalidation for
788 * smgrexists(), smgrnblocks(), or the file was unlinked or
789 * truncated beneath our feet?
792 "could not prefetch relation %u/%u/%u block %u",
801 * Several callsites need to be able to read exactly one record
802 * without any internal readahead. Examples: xlog.c reading
803 * checkpoint records with emode set to PANIC, which might otherwise
804 * cause XLogPageRead() to panic on some future page, and xlog.c
805 * determining where to start writing WAL next, which depends on the
806 * contents of the reader's internal buffer after reading one record.
807 * Therefore, don't even think about prefetching until the first
808 * record after XLogPrefetcherBeginRead() has been consumed.
814 /* Advance to the next record. */
815 prefetcher->
record = NULL;
821 * Expose statistics about recovery prefetching.
826#define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
852 * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
866 * Don't allow any prefetching of this block or higher until replayed.
875 * We were already filtering this rlocator. Extend the filter's
876 * lifetime to cover this WAL record, but leave the lower of the block
877 * numbers there because we don't want to have to track individual
888 * Have we replayed any records that caused us to begin filtering a block
889 * range? That means that relations should have been created, extended or
890 * dropped as required, so we can stop filtering out accesses to a given
911 * Check if a given block should be skipped due to a filter.
918 * Test for empty queue first, because we expect it to be empty most of
919 * the time and we can avoid the hash table lookup in that case.
925 /* See if the block range is filtered. */
929#ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 elog(XLOGPREFETCHER_DEBUG_LEVEL,
931 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (blocks >= %u filtered)",
939 /* See if the whole database is filtered. */
945#ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 elog(XLOGPREFETCHER_DEBUG_LEVEL,
947 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%08X is replayed (whole database)",
959 * A wrapper for XLogBeginRead() that also resets the prefetcher.
964 /* This will forget about any in-flight IO. */
967 /* Book-keeping to avoid readahead on first read. */
972 /* This will forget about any queued up records in the decoder. */
977 * A wrapper for XLogReadRecord() that provides the same interface, but also
978 * tries to initiate I/O for blocks referenced in future WAL records.
987 * See if it's time to reset the prefetching machinery, because a relevant
1012 (uintptr_t) prefetcher,
1019 * Release last returned record, if there is one, as it's now been
1025 * Can we drop any filters yet? If we were waiting for a relation to be
1026 * created or extended, it is now OK to access blocks in the covered
1032 * All IO initiated by earlier WAL is now completed. This might trigger
1033 * further prefetching.
1038 * If there's nothing queued yet, then start prefetching to cause at least
1039 * one record to be queued.
1048 /* Read the next record. */
1054 * The record we just got is the "current" one, for the benefit of the
1055 * XLogRecXXX() macros.
1060 * If maintenance_io_concurrency is set very low, we might have started
1061 * prefetching some but not all of the blocks referenced in the record
1062 * we're about to return. Forget about the rest of the blocks in this
1063 * record by dropping the prefetcher's reference to it.
1065 if (record == prefetcher->
record)
1066 prefetcher->
record = NULL;
1069 * See if it's time to compute some statistics, because enough WAL has
1086 GUC_check_errdetail(
"\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1097 /* Reconfigure prefetching, because a setting it depends on changed. */
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
TimestampTz GetCurrentTimestamp(void)
static Datum values[MAXATTR]
PrefetchBufferResult PrefetchSharedBuffer(SMgrRelation smgr_reln, ForkNumber forkNum, BlockNumber blockNum)
int maintenance_io_concurrency
static bool BufferIsValid(Buffer bufnum)
#define FLEXIBLE_ARRAY_MEMBER
#define XLOG_DBASE_CREATE_FILE_COPY
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void hash_destroy(HTAB *hashp)
int errmsg(const char *fmt,...)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
#define GUC_check_errdetail
Assert(PointerIsAligned(start, uint64))
static void dlist_init(dlist_head *head)
static void dlist_delete(dlist_node *node)
#define dlist_tail_element(type, membername, lhead)
static void dlist_push_head(dlist_head *head, dlist_node *node)
static bool dlist_is_empty(const dlist_head *head)
if(TABLE==NULL||TABLE_index==NULL)
void pfree(void *pointer)
void * palloc0(Size size)
#define AmStartupProcess()
#define XLOG_CHECKPOINT_SHUTDOWN
#define XLOG_END_OF_RECOVERY
static rewind_source * source
static Datum Int64GetDatum(int64 X)
static Datum Int32GetDatum(int32 X)
#define INVALID_PROC_NUMBER
struct RelFileLocator RelFileLocator
#define RelFileLocatorEquals(locator1, locator2)
#define InvalidRelFileNumber
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum)
SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend)
bool smgrexists(SMgrRelation reln, ForkNumber forknum)
#define XLOG_SMGR_TRUNCATE
struct LsnReadQueue::@17 queue[FLEXIBLE_ARRAY_MEMBER]
Tuplestorestate * setResult
RelFileLocatorBackend smgr_rlocator
pg_atomic_uint64 skip_fpw
pg_atomic_uint64 skip_init
pg_atomic_uint64 reset_time
pg_atomic_uint64 prefetch
pg_atomic_uint64 skip_rep
pg_atomic_uint64 skip_new
XLogRecPtr filter_until_replayed
BlockNumber filter_from_block
XLogRecPtr no_readahead_until
RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
LsnReadQueue * streaming_read
DecodedXLogRecord * record
XLogRecPtr next_stats_shm_lsn
BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE]
DecodedXLogRecord * record
DecodedXLogRecord * decode_queue_head
DecodedXLogRecord * decode_queue_tail
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr
void XLogPrefetchResetStats(void)
static bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno)
void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
struct LsnReadQueue LsnReadQueue
#define RecoveryPrefetchEnabled()
static void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
LsnReadQueueNextStatus(* LsnReadQueueNextFun)(uintptr_t lrq_private, XLogRecPtr *lsn)
static void lrq_free(LsnReadQueue *lrq)
struct XLogPrefetchStats XLogPrefetchStats
static void lrq_prefetch(LsnReadQueue *lrq)
static int XLogPrefetchReconfigureCount
Datum pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
XLogPrefetcher * XLogPrefetcherAllocate(XLogReaderState *reader)
static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
static uint32 lrq_completed(LsnReadQueue *lrq)
static XLogPrefetchStats * SharedStats
static uint32 lrq_inflight(LsnReadQueue *lrq)
void XLogPrefetchReconfigure(void)
size_t XLogPrefetchShmemSize(void)
#define PG_STAT_GET_RECOVERY_PREFETCH_COLS
XLogRecord * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
XLogReaderState * XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
static LsnReadQueue * lrq_alloc(uint32 max_distance, uint32 max_inflight, uintptr_t lrq_private, LsnReadQueueNextFun next)
void XLogPrefetchShmemInit(void)
void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
void assign_recovery_prefetch(int new_value, void *extra)
static void XLogPrefetchIncrement(pg_atomic_uint64 *counter)
#define XLOGPREFETCHER_SEQ_WINDOW_SIZE
struct XLogPrefetcherFilter XLogPrefetcherFilter
static void lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
#define XLOGPREFETCHER_STATS_DISTANCE
static void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator, BlockNumber blockno, XLogRecPtr lsn)
#define XLOGPREFETCHER_DISTANCE_MULTIPLIER
void XLogPrefetcherFree(XLogPrefetcher *prefetcher)
bool check_recovery_prefetch(int *new_value, void **extra, GucSource source)
DecodedXLogRecord * XLogReadAhead(XLogReaderState *state, bool nonblocking)
DecodedXLogRecord * XLogNextRecord(XLogReaderState *state, char **errormsg)
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
XLogRecPtr XLogReleasePreviousRecord(XLogReaderState *state)
static bool XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
#define BKPBLOCK_WILL_INIT