1/*-------------------------------------------------------------------------
4 * AIO - perform AIO using worker processes
6 * IO workers consume IOs from a shared memory submission queue, run
7 * traditional synchronous system calls, and perform the shared completion
8 * handling immediately. Client code submits most requests by pushing IOs
9 * into the submission queue, and waits (if necessary) using condition
10 * variables. Some IOs cannot be performed in another process due to lack of
11 * infrastructure for reopening the file, and must processed synchronously by
12 * the client code when submitted.
14 * So that the submitter can make just one system call when submitting a batch
15 * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 * could be improved by using futexes instead of latches to wake N waiters.
18 * This method of AIO is available in all builds on all operating systems, and
21 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 * Portions Copyright (c) 1994, Regents of the University of California
25 * src/backend/storage/aio/method_worker.c
27 *-------------------------------------------------------------------------
51/* How many workers should each worker wake up if needed? */
52 #define IO_WORKER_WAKEUP_FANOUT 2
106 /* Round size up to next power of two so we can make a mask. */
110 sizeof(int) * *queue_size;
172 /* Find the lowest bit position, and clear it. */
187 new_head = (queue->
head + 1) & (queue->
size - 1);
188 if (new_head == queue->
tail)
192 return false;
/* full */
196 queue->
head = new_head;
209 return -1;
/* empty */
254 for (
int i = 0;
i < num_staged_ios; ++
i)
260 * We'll do it synchronously, but only after we've sent as many as
261 * we can to workers, to maximize concurrency.
263 synchronous_ios[nsync++] = staged_ios[
i];
269 /* Choose an idle worker to wake up if we haven't already. */
275 "choosing worker %d",
284 /* Run whatever is left synchronously. */
287 for (
int i = 0;
i < nsync; ++
i)
297 for (
int i = 0;
i < num_staged_ios;
i++)
306 return num_staged_ios;
310 * on_shmem_exit() callback that releases the worker's slot in
327 * Register the worker in shared memory, assign MyIoWorkerId and register a
328 * shutdown callback to release registration.
336 * XXX: This could do with more fine-grained locking. But it's also not
337 * very common for the number of workers to change at the moment...
355 elog(
ERROR,
"couldn't find a free worker slot");
380 owner_pid = owner_proc->
pid;
382 errcontext(
"I/O worker executing I/O on behalf of process %d", owner_pid);
388 sigjmp_buf local_sigjmp_buf;
391 volatile int error_errno = 0;
398 pqsignal(SIGINT,
die);
/* to allow manually triggering worker restart */
401 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
402 * shutdown sequence, similar to checkpointer.
405 /* SIGQUIT handler was already set up by InitPostmasterChild */
411 /* also registers a shutdown callback to unregister */
421 /* see PostgresMain() */
422 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
430 * In the - very unlikely - case that the IO failed in a way that
431 * raises an error we need to mark the IO as failed.
433 * Need to do just enough error recovery so that we can mark the IO as
434 * failed and then exit (postmaster will start a new worker).
438 if (error_ioh != NULL)
440 /* should never fail without setting error_errno */
453 /* We can now handle ereport(ERROR) */
467 * Try to get a job to do.
469 * The lwlock acquisition also provides the necessary memory barrier
470 * to ensure that we don't see an outdated data in the handle.
476 * Nothing to do. Mark self idle.
478 * XXX: Invent some kind of back pressure to reduce useless
485 /* Got one. Clear idle flag. */
488 /* See if we can wake up some peers. */
491 for (
int i = 0;
i < nwakeups; ++
i)
500 for (
int i = 0;
i < nlatches; ++
i)
509 errcallback.
arg = ioh;
512 "worker %d processing IO",
516 * Prevent interrupts between pgaio_io_reopen() and
517 * pgaio_io_perform_synchronously() that otherwise could lead to
518 * the FD getting closed in that window.
523 * It's very unlikely, but possible, that reopen fails. E.g. due
524 * to memory allocations failing or file permissions changing or
525 * such. In that case we need to fail the IO.
527 * There's not really a good errno we can report here.
529 error_errno = ENOENT;
533 * To be able to exercise the reopen-fails path, allow injection
534 * points to trigger a failure at this point.
542 * As part of IO completion the buffer will be marked as NOACCESS,
543 * until the buffer is pinned again - which never happens in io
544 * workers. Therefore the next time there is IO for the same
545 * buffer, the memory will be considered inaccessible. To avoid
546 * that, explicitly allow access to the memory before reading data
554 for (
int i = 0;
i < iov_length;
i++)
560 * We don't expect this to ever fail with ERROR or FATAL, no need
561 * to keep error_ioh set to the IO.
562 * pgaio_io_perform_synchronously() contains a critical section to
563 * ensure we don't accidentally fail.
568 errcallback.
arg = NULL;
573 WAIT_EVENT_IO_WORKER_MAIN);
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
int pgaio_io_get_id(PgAioHandle *ioh)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
@ PGAIO_HF_REFERENCES_LOCAL
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
void pgaio_io_reopen(PgAioHandle *ioh)
bool pgaio_io_can_reopen(PgAioHandle *ioh)
void AuxiliaryProcessMainCommon(void)
#define FLEXIBLE_ARRAY_MEMBER
void EmitErrorReport(void)
ErrorContextCallback * error_context_stack
sigjmp_buf * PG_exception_stack
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
static size_t pgaio_worker_control_shmem_size(void)
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
#define IO_WORKER_WAKEUP_FANOUT
static size_t pgaio_worker_shmem_size(void)
struct PgAioWorkerSlot PgAioWorkerSlot
struct PgAioWorkerSubmissionQueue PgAioWorkerSubmissionQueue
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static int io_worker_queue_size
struct PgAioWorkerControl PgAioWorkerControl
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
const IoMethodOps pgaio_worker_ops
static void pgaio_worker_die(int code, Datum arg)
static int pgaio_worker_submission_queue_consume(void)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
static void pgaio_worker_shmem_init(bool first_time)
static int pgaio_worker_choose_idle(void)
#define RESUME_INTERRUPTS()
#define START_CRIT_SECTION()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
BackendType MyBackendType
static int pg_rightmost_one_pos64(uint64 word)
static uint32 pg_nextpower2_32(uint32 num)
#define GetPGProcByNumber(n)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static void set_ps_display(const char *activity)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
struct ErrorContextCallback * previous
void(* callback)(void *arg)
size_t(* shmem_size)(void)
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
int sqes[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]