PostgreSQL Source Code: src/backend/storage/aio/method_worker.c Source File

PostgreSQL Source Code git master
method_worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * method_worker.c
4 * AIO - perform AIO using worker processes
5 *
6 * IO workers consume IOs from a shared memory submission queue, run
7 * traditional synchronous system calls, and perform the shared completion
8 * handling immediately. Client code submits most requests by pushing IOs
9 * into the submission queue, and waits (if necessary) using condition
10 * variables. Some IOs cannot be performed in another process due to lack of
11 * infrastructure for reopening the file, and must processed synchronously by
12 * the client code when submitted.
13 *
14 * So that the submitter can make just one system call when submitting a batch
15 * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 * could be improved by using futexes instead of latches to wake N waiters.
17 *
18 * This method of AIO is available in all builds on all operating systems, and
19 * is the default.
20 *
21 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 * Portions Copyright (c) 1994, Regents of the University of California
23 *
24 * IDENTIFICATION
25 * src/backend/storage/aio/method_worker.c
26 *
27 *-------------------------------------------------------------------------
28 */
29
30#include "postgres.h"
31
32#include "libpq/pqsignal.h"
33#include "miscadmin.h"
34#include "port/pg_bitutils.h"
35#include "postmaster/auxprocess.h"
36#include "postmaster/interrupt.h"
37#include "storage/aio.h"
38#include "storage/aio_internal.h"
39#include "storage/aio_subsys.h"
40#include "storage/io_worker.h"
41#include "storage/ipc.h"
42#include "storage/latch.h"
43#include "storage/proc.h"
44#include "tcop/tcopprot.h"
45#include "utils/injection_point.h"
46#include "utils/memdebug.h"
47#include "utils/ps_status.h"
48#include "utils/wait_event.h"
49
50
51/* How many workers should each worker wake up if needed? */
52 #define IO_WORKER_WAKEUP_FANOUT 2
53
54
55 typedef struct PgAioWorkerSubmissionQueue
56{
57 uint32 size;
58 uint32 mask;
59 uint32 head;
60 uint32 tail;
61 int sqes[FLEXIBLE_ARRAY_MEMBER];
62 } PgAioWorkerSubmissionQueue;
63
64 typedef struct PgAioWorkerSlot
65{
66 Latch *latch;
67 bool in_use;
68 } PgAioWorkerSlot;
69
70 typedef struct PgAioWorkerControl
71{
72 uint64 idle_worker_mask;
73 PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
74 } PgAioWorkerControl;
75
76
77static size_t pgaio_worker_shmem_size(void);
78static void pgaio_worker_shmem_init(bool first_time);
79
80static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
81static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
82
83
84 const IoMethodOps pgaio_worker_ops = {
85 .shmem_size = pgaio_worker_shmem_size,
86 .shmem_init = pgaio_worker_shmem_init,
87
88 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
89 .submit = pgaio_worker_submit,
90};
91
92
93/* GUCs */
94 int io_workers = 3;
95
96
97 static int io_worker_queue_size = 64;
98 static int MyIoWorkerId;
99 static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
100 static PgAioWorkerControl *io_worker_control;
101
102
103static size_t
104 pgaio_worker_queue_shmem_size(int *queue_size)
105{
106 /* Round size up to next power of two so we can make a mask. */
107 *queue_size = pg_nextpower2_32(io_worker_queue_size);
108
109 return offsetof(PgAioWorkerSubmissionQueue, sqes) +
110 sizeof(int) * *queue_size;
111}
112
113static size_t
114 pgaio_worker_control_shmem_size(void)
115{
116 return offsetof(PgAioWorkerControl, workers) +
117 sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
118}
119
120static size_t
121 pgaio_worker_shmem_size(void)
122{
123 size_t sz;
124 int queue_size;
125
126 sz = pgaio_worker_queue_shmem_size(&queue_size);
127 sz = add_size(sz, pgaio_worker_control_shmem_size());
128
129 return sz;
130}
131
132static void
133 pgaio_worker_shmem_init(bool first_time)
134{
135 bool found;
136 int queue_size;
137
138 io_worker_submission_queue =
139 ShmemInitStruct("AioWorkerSubmissionQueue",
140 pgaio_worker_queue_shmem_size(&queue_size),
141 &found);
142 if (!found)
143 {
144 io_worker_submission_queue->size = queue_size;
145 io_worker_submission_queue->head = 0;
146 io_worker_submission_queue->tail = 0;
147 }
148
149 io_worker_control =
150 ShmemInitStruct("AioWorkerControl",
151 pgaio_worker_control_shmem_size(),
152 &found);
153 if (!found)
154 {
155 io_worker_control->idle_worker_mask = 0;
156 for (int i = 0; i < MAX_IO_WORKERS; ++i)
157 {
158 io_worker_control->workers[i].latch = NULL;
159 io_worker_control->workers[i].in_use = false;
160 }
161 }
162}
163
164static int
165 pgaio_worker_choose_idle(void)
166{
167 int worker;
168
169 if (io_worker_control->idle_worker_mask == 0)
170 return -1;
171
172 /* Find the lowest bit position, and clear it. */
173 worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
174 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
175 Assert(io_worker_control->workers[worker].in_use);
176
177 return worker;
178}
179
180static bool
181 pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
182{
183 PgAioWorkerSubmissionQueue *queue;
184 uint32 new_head;
185
186 queue = io_worker_submission_queue;
187 new_head = (queue->head + 1) & (queue->size - 1);
188 if (new_head == queue->tail)
189 {
190 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
191 io_worker_submission_queue->size);
192 return false; /* full */
193 }
194
195 queue->sqes[queue->head] = pgaio_io_get_id(ioh);
196 queue->head = new_head;
197
198 return true;
199}
200
201static int
202 pgaio_worker_submission_queue_consume(void)
203{
204 PgAioWorkerSubmissionQueue *queue;
205 int result;
206
207 queue = io_worker_submission_queue;
208 if (queue->tail == queue->head)
209 return -1; /* empty */
210
211 result = queue->sqes[queue->tail];
212 queue->tail = (queue->tail + 1) & (queue->size - 1);
213
214 return result;
215}
216
217static uint32
218 pgaio_worker_submission_queue_depth(void)
219{
220 uint32 head;
221 uint32 tail;
222
223 head = io_worker_submission_queue->head;
224 tail = io_worker_submission_queue->tail;
225
226 if (tail > head)
227 head += io_worker_submission_queue->size;
228
229 Assert(head >= tail);
230
231 return head - tail;
232}
233
234static bool
235 pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
236{
237 return
238 !IsUnderPostmaster
239 || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
240 || !pgaio_io_can_reopen(ioh);
241}
242
243static void
244 pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
245{
246 PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
247 int nsync = 0;
248 Latch *wakeup = NULL;
249 int worker;
250
251 Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
252
253 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
254 for (int i = 0; i < num_staged_ios; ++i)
255 {
256 Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
257 if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
258 {
259 /*
260 * We'll do it synchronously, but only after we've sent as many as
261 * we can to workers, to maximize concurrency.
262 */
263 synchronous_ios[nsync++] = staged_ios[i];
264 continue;
265 }
266
267 if (wakeup == NULL)
268 {
269 /* Choose an idle worker to wake up if we haven't already. */
270 worker = pgaio_worker_choose_idle();
271 if (worker >= 0)
272 wakeup = io_worker_control->workers[worker].latch;
273
274 pgaio_debug_io(DEBUG4, staged_ios[i],
275 "choosing worker %d",
276 worker);
277 }
278 }
279 LWLockRelease(AioWorkerSubmissionQueueLock);
280
281 if (wakeup)
282 SetLatch(wakeup);
283
284 /* Run whatever is left synchronously. */
285 if (nsync > 0)
286 {
287 for (int i = 0; i < nsync; ++i)
288 {
289 pgaio_io_perform_synchronously(synchronous_ios[i]);
290 }
291 }
292}
293
294static int
295 pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
296{
297 for (int i = 0; i < num_staged_ios; i++)
298 {
299 PgAioHandle *ioh = staged_ios[i];
300
301 pgaio_io_prepare_submit(ioh);
302 }
303
304 pgaio_worker_submit_internal(num_staged_ios, staged_ios);
305
306 return num_staged_ios;
307}
308
309/*
310 * on_shmem_exit() callback that releases the worker's slot in
311 * io_worker_control.
312 */
313static void
314 pgaio_worker_die(int code, Datum arg)
315{
316 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
317 Assert(io_worker_control->workers[MyIoWorkerId].in_use);
318 Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
319
320 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
321 io_worker_control->workers[MyIoWorkerId].in_use = false;
322 io_worker_control->workers[MyIoWorkerId].latch = NULL;
323 LWLockRelease(AioWorkerSubmissionQueueLock);
324}
325
326/*
327 * Register the worker in shared memory, assign MyIoWorkerId and register a
328 * shutdown callback to release registration.
329 */
330static void
331 pgaio_worker_register(void)
332{
333 MyIoWorkerId = -1;
334
335 /*
336 * XXX: This could do with more fine-grained locking. But it's also not
337 * very common for the number of workers to change at the moment...
338 */
339 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
340
341 for (int i = 0; i < MAX_IO_WORKERS; ++i)
342 {
343 if (!io_worker_control->workers[i].in_use)
344 {
345 Assert(io_worker_control->workers[i].latch == NULL);
346 io_worker_control->workers[i].in_use = true;
347 MyIoWorkerId = i;
348 break;
349 }
350 else
351 Assert(io_worker_control->workers[i].latch != NULL);
352 }
353
354 if (MyIoWorkerId == -1)
355 elog(ERROR, "couldn't find a free worker slot");
356
357 io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
358 io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
359 LWLockRelease(AioWorkerSubmissionQueueLock);
360
361 on_shmem_exit(pgaio_worker_die, 0);
362}
363
364static void
365 pgaio_worker_error_callback(void *arg)
366{
367 ProcNumber owner;
368 PGPROC *owner_proc;
369 int32 owner_pid;
370 PgAioHandle *ioh = arg;
371
372 if (!ioh)
373 return;
374
375 Assert(ioh->owner_procno != MyProcNumber);
376 Assert(MyBackendType == B_IO_WORKER);
377
378 owner = ioh->owner_procno;
379 owner_proc = GetPGProcByNumber(owner);
380 owner_pid = owner_proc->pid;
381
382 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
383}
384
385void
386 IoWorkerMain(const void *startup_data, size_t startup_data_len)
387{
388 sigjmp_buf local_sigjmp_buf;
389 PgAioHandle *volatile error_ioh = NULL;
390 ErrorContextCallback errcallback = {0};
391 volatile int error_errno = 0;
392 char cmd[128];
393
394 MyBackendType = B_IO_WORKER;
395 AuxiliaryProcessMainCommon();
396
397 pqsignal(SIGHUP, SignalHandlerForConfigReload);
398 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
399
400 /*
401 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
402 * shutdown sequence, similar to checkpointer.
403 */
404 pqsignal(SIGTERM, SIG_IGN);
405 /* SIGQUIT handler was already set up by InitPostmasterChild */
406 pqsignal(SIGALRM, SIG_IGN);
407 pqsignal(SIGPIPE, SIG_IGN);
408 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
409 pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
410
411 /* also registers a shutdown callback to unregister */
412 pgaio_worker_register();
413
414 sprintf(cmd, "%d", MyIoWorkerId);
415 set_ps_display(cmd);
416
417 errcallback.callback = pgaio_worker_error_callback;
418 errcallback.previous = error_context_stack;
419 error_context_stack = &errcallback;
420
421 /* see PostgresMain() */
422 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
423 {
424 error_context_stack = NULL;
425 HOLD_INTERRUPTS();
426
427 EmitErrorReport();
428
429 /*
430 * In the - very unlikely - case that the IO failed in a way that
431 * raises an error we need to mark the IO as failed.
432 *
433 * Need to do just enough error recovery so that we can mark the IO as
434 * failed and then exit (postmaster will start a new worker).
435 */
436 LWLockReleaseAll();
437
438 if (error_ioh != NULL)
439 {
440 /* should never fail without setting error_errno */
441 Assert(error_errno != 0);
442
443 errno = error_errno;
444
445 START_CRIT_SECTION();
446 pgaio_io_process_completion(error_ioh, -error_errno);
447 END_CRIT_SECTION();
448 }
449
450 proc_exit(1);
451 }
452
453 /* We can now handle ereport(ERROR) */
454 PG_exception_stack = &local_sigjmp_buf;
455
456 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
457
458 while (!ShutdownRequestPending)
459 {
460 uint32 io_index;
461 Latch *latches[IO_WORKER_WAKEUP_FANOUT];
462 int nlatches = 0;
463 int nwakeups = 0;
464 int worker;
465
466 /*
467 * Try to get a job to do.
468 *
469 * The lwlock acquisition also provides the necessary memory barrier
470 * to ensure that we don't see an outdated data in the handle.
471 */
472 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
473 if ((io_index = pgaio_worker_submission_queue_consume()) == -1)
474 {
475 /*
476 * Nothing to do. Mark self idle.
477 *
478 * XXX: Invent some kind of back pressure to reduce useless
479 * wakeups?
480 */
481 io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
482 }
483 else
484 {
485 /* Got one. Clear idle flag. */
486 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
487
488 /* See if we can wake up some peers. */
489 nwakeups = Min(pgaio_worker_submission_queue_depth(),
490 IO_WORKER_WAKEUP_FANOUT);
491 for (int i = 0; i < nwakeups; ++i)
492 {
493 if ((worker = pgaio_worker_choose_idle()) < 0)
494 break;
495 latches[nlatches++] = io_worker_control->workers[worker].latch;
496 }
497 }
498 LWLockRelease(AioWorkerSubmissionQueueLock);
499
500 for (int i = 0; i < nlatches; ++i)
501 SetLatch(latches[i]);
502
503 if (io_index != -1)
504 {
505 PgAioHandle *ioh = NULL;
506
507 ioh = &pgaio_ctl->io_handles[io_index];
508 error_ioh = ioh;
509 errcallback.arg = ioh;
510
511 pgaio_debug_io(DEBUG4, ioh,
512 "worker %d processing IO",
513 MyIoWorkerId);
514
515 /*
516 * Prevent interrupts between pgaio_io_reopen() and
517 * pgaio_io_perform_synchronously() that otherwise could lead to
518 * the FD getting closed in that window.
519 */
520 HOLD_INTERRUPTS();
521
522 /*
523 * It's very unlikely, but possible, that reopen fails. E.g. due
524 * to memory allocations failing or file permissions changing or
525 * such. In that case we need to fail the IO.
526 *
527 * There's not really a good errno we can report here.
528 */
529 error_errno = ENOENT;
530 pgaio_io_reopen(ioh);
531
532 /*
533 * To be able to exercise the reopen-fails path, allow injection
534 * points to trigger a failure at this point.
535 */
536 INJECTION_POINT("aio-worker-after-reopen", ioh);
537
538 error_errno = 0;
539 error_ioh = NULL;
540
541 /*
542 * As part of IO completion the buffer will be marked as NOACCESS,
543 * until the buffer is pinned again - which never happens in io
544 * workers. Therefore the next time there is IO for the same
545 * buffer, the memory will be considered inaccessible. To avoid
546 * that, explicitly allow access to the memory before reading data
547 * into it.
548 */
549#ifdef USE_VALGRIND
550 {
551 struct iovec *iov;
552 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
553
554 for (int i = 0; i < iov_length; i++)
555 VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
556 }
557#endif
558
559 /*
560 * We don't expect this to ever fail with ERROR or FATAL, no need
561 * to keep error_ioh set to the IO.
562 * pgaio_io_perform_synchronously() contains a critical section to
563 * ensure we don't accidentally fail.
564 */
565 pgaio_io_perform_synchronously(ioh);
566
567 RESUME_INTERRUPTS();
568 errcallback.arg = NULL;
569 }
570 else
571 {
572 WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
573 WAIT_EVENT_IO_WORKER_MAIN);
574 ResetLatch(MyLatch);
575 }
576
577 CHECK_FOR_INTERRUPTS();
578
579 if (ConfigReloadPending)
580 {
581 ConfigReloadPending = false;
582 ProcessConfigFile(PGC_SIGHUP);
583 }
584 }
585
586 error_context_stack = errcallback.previous;
587 proc_exit(0);
588}
589
590bool
591 pgaio_workers_enabled(void)
592{
593 return io_method == IOMETHOD_WORKER;
594}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition: aio.c:525
int io_method
Definition: aio.c:74
int pgaio_io_get_id(PgAioHandle *ioh)
Definition: aio.c:339
PgAioCtl * pgaio_ctl
Definition: aio.c:78
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition: aio.c:507
@ IOMETHOD_WORKER
Definition: aio.h:35
@ PGAIO_HF_REFERENCES_LOCAL
Definition: aio.h:60
#define pgaio_debug(elevel, msg,...)
Definition: aio_internal.h:382
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:395
#define PGAIO_SUBMIT_BATCH_SIZE
Definition: aio_internal.h:28
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition: aio_io.c:116
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition: aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition: aio_target.c:116
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition: aio_target.c:103
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
#define Min(x, y)
Definition: c.h:1003
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:470
int32_t int32
Definition: c.h:534
uint64_t uint64
Definition: c.h:539
uint16_t uint16
Definition: c.h:537
uint32_t uint32
Definition: c.h:538
void EmitErrorReport(void)
Definition: elog.c:1695
ErrorContextCallback * error_context_stack
Definition: elog.c:95
sigjmp_buf * PG_exception_stack
Definition: elog.c:97
#define errcontext
Definition: elog.h:198
#define DEBUG3
Definition: elog.h:28
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define DEBUG4
Definition: elog.h:27
ProcNumber MyProcNumber
Definition: globals.c:90
bool IsUnderPostmaster
Definition: globals.c:120
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
void proc_exit(int code)
Definition: ipc.c:104
i
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:290
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
void LWLockReleaseAll(void)
Definition: lwlock.c:1945
@ LW_EXCLUSIVE
Definition: lwlock.h:112
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static size_t pgaio_worker_control_shmem_size(void)
Definition: method_worker.c:114
static uint32 pgaio_worker_submission_queue_depth(void)
Definition: method_worker.c:218
static void pgaio_worker_error_callback(void *arg)
Definition: method_worker.c:365
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
Definition: method_worker.c:235
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
Definition: method_worker.c:295
#define IO_WORKER_WAKEUP_FANOUT
Definition: method_worker.c:52
static size_t pgaio_worker_shmem_size(void)
Definition: method_worker.c:121
struct PgAioWorkerSlot PgAioWorkerSlot
struct PgAioWorkerSubmissionQueue PgAioWorkerSubmissionQueue
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
Definition: method_worker.c:104
static int io_worker_queue_size
Definition: method_worker.c:97
struct PgAioWorkerControl PgAioWorkerControl
static void pgaio_worker_register(void)
Definition: method_worker.c:331
static PgAioWorkerControl * io_worker_control
Definition: method_worker.c:100
static int MyIoWorkerId
Definition: method_worker.c:98
const IoMethodOps pgaio_worker_ops
Definition: method_worker.c:84
static void pgaio_worker_die(int code, Datum arg)
Definition: method_worker.c:314
static int pgaio_worker_submission_queue_consume(void)
Definition: method_worker.c:202
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
Definition: method_worker.c:181
bool pgaio_workers_enabled(void)
Definition: method_worker.c:591
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
Definition: method_worker.c:99
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
Definition: method_worker.c:386
static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
Definition: method_worker.c:244
static void pgaio_worker_shmem_init(bool first_time)
Definition: method_worker.c:133
int io_workers
Definition: method_worker.c:94
static int pgaio_worker_choose_idle(void)
Definition: method_worker.c:165
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
@ B_IO_WORKER
Definition: miscadmin.h:363
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
BackendType MyBackendType
Definition: miscinit.c:64
void * arg
static int pg_rightmost_one_pos64(uint64 word)
Definition: pg_bitutils.h:145
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define die(msg)
Definition: pg_test_fsync.c:100
#define pqsignal
Definition: port.h:531
#define sprintf
Definition: port.h:241
uint64_t Datum
Definition: postgres.h:70
#define MAX_IO_WORKERS
Definition: proc.h:462
#define GetPGProcByNumber(n)
Definition: proc.h:440
int ProcNumber
Definition: procnumber.h:24
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
struct ErrorContextCallback * previous
Definition: elog.h:297
void * arg
Definition: elog.h:299
void(* callback)(void *arg)
Definition: elog.h:298
size_t(* shmem_size)(void)
Definition: aio_internal.h:277
Definition: latch.h:114
Definition: proc.h:179
int pid
Definition: proc.h:199
PgAioHandle * io_handles
Definition: aio_internal.h:252
int32 owner_procno
Definition: aio_internal.h:131
uint8 flags
Definition: aio_internal.h:114
uint64 idle_worker_mask
Definition: method_worker.c:72
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:73
Latch * latch
Definition: method_worker.c:66
int sqes[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:61
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:130
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171

AltStyle によって変換されたページ (->オリジナル) /