PostgreSQL Source Code: src/backend/storage/aio/aio_callback.c Source File

PostgreSQL Source Code git master
aio_callback.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * aio_callback.c
4 * AIO - Functionality related to callbacks that can be registered on IO
5 * Handles
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/storage/aio/aio_callback.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "miscadmin.h"
19#include "storage/aio.h"
20#include "storage/aio_internal.h"
21#include "storage/bufmgr.h"
22#include "storage/md.h"
23
24
25/* just to have something to put into aio_handle_cbs */
26 static const PgAioHandleCallbacks aio_invalid_cb = {0};
27
28 typedef struct PgAioHandleCallbacksEntry
29{
30 const PgAioHandleCallbacks *const cb;
31 const char *const name;
32 } PgAioHandleCallbacksEntry;
33
34/*
35 * Callback definition for the callbacks that can be registered on an IO
36 * handle. See PgAioHandleCallbackID's definition for an explanation for why
37 * callbacks are not identified by a pointer.
38 */
39 static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
40#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
41 CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
42
43 CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
44
45 CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
46
47 CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
48#undef CALLBACK_ENTRY
49};
50
51
52
53/* --------------------------------------------------------------------------------
54 * Public callback related functions operating on IO Handles
55 * --------------------------------------------------------------------------------
56 */
57
58/*
59 * Register callback for the IO handle.
60 *
61 * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
62 * registered for each IO.
63 *
64 * Callbacks need to be registered before [indirectly] calling
65 * pgaio_io_start_*(), as the IO may be executed immediately.
66 *
67 * A callback can be passed a small bit of data, e.g. to indicate whether to
68 * zero a buffer if it is invalid.
69 *
70 *
71 * Note that callbacks are executed in critical sections. This is necessary
72 * to be able to execute IO in critical sections (consider e.g. WAL
73 * logging). To perform AIO we first need to acquire a handle, which, if there
74 * are no free handles, requires waiting for IOs to complete and to execute
75 * their completion callbacks.
76 *
77 * Callbacks may be executed in the issuing backend but also in another
78 * backend (because that backend is waiting for the IO) or in IO workers (if
79 * io_method=worker is used).
80 *
81 *
82 * See PgAioHandleCallbackID's definition for an explanation for why
83 * callbacks are not identified by a pointer.
84 */
85void
86 pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id,
87 uint8 cb_data)
88{
89 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
90
91 Assert(cb_id <= PGAIO_HCB_MAX);
92 if (cb_id >= lengthof(aio_handle_cbs))
93 elog(ERROR, "callback %d is out of range", cb_id);
94 if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
95 aio_handle_cbs[cb_id].cb->complete_local == NULL)
96 elog(ERROR, "callback %d does not have a completion callback", cb_id);
97 if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS)
98 elog(PANIC, "too many callbacks, the max is %d",
99 PGAIO_HANDLE_MAX_CALLBACKS);
100 ioh->callbacks[ioh->num_callbacks] = cb_id;
101 ioh->callbacks_data[ioh->num_callbacks] = cb_data;
102
103 pgaio_debug_io(DEBUG3, ioh,
104 "adding cb #%d, id %d/%s",
105 ioh->num_callbacks + 1,
106 cb_id, ce->name);
107
108 ioh->num_callbacks++;
109}
110
111/*
112 * Associate an array of data with the Handle. This is e.g. useful to the
113 * transport knowledge about which buffers a multi-block IO affects to
114 * completion callbacks.
115 *
116 * Right now this can be done only once for each IO, even though multiple
117 * callbacks can be registered. There aren't any known usecases requiring more
118 * and the required amount of shared memory does add up, so it doesn't seem
119 * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
120 */
121void
122 pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
123{
124 Assert(ioh->state == PGAIO_HS_HANDED_OUT);
125 Assert(ioh->handle_data_len == 0);
126 Assert(len <= PG_IOV_MAX);
127 Assert(len <= io_max_combine_limit);
128
129 for (int i = 0; i < len; i++)
130 pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
131 ioh->handle_data_len = len;
132}
133
134/*
135 * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
136 * array to a 64bit array. Without it callers would end up needing to
137 * open-code equivalent code.
138 */
139void
140 pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
141{
142 Assert(ioh->state == PGAIO_HS_HANDED_OUT);
143 Assert(ioh->handle_data_len == 0);
144 Assert(len <= PG_IOV_MAX);
145 Assert(len <= io_max_combine_limit);
146
147 for (int i = 0; i < len; i++)
148 pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
149 ioh->handle_data_len = len;
150}
151
152/*
153 * Return data set with pgaio_io_set_handle_data_*().
154 */
155uint64 *
156 pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
157{
158 Assert(ioh->handle_data_len > 0);
159
160 *len = ioh->handle_data_len;
161
162 return &pgaio_ctl->handle_data[ioh->iovec_off];
163}
164
165
166
167/* --------------------------------------------------------------------------------
168 * Public IO Result related functions
169 * --------------------------------------------------------------------------------
170 */
171
172void
173 pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
174{
175 PgAioHandleCallbackID cb_id = result.id;
176 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
177
178 Assert(result.status != PGAIO_RS_UNKNOWN);
179 Assert(result.status != PGAIO_RS_OK);
180
181 if (ce->cb->report == NULL)
182 elog(ERROR, "callback %d/%s does not have report callback",
183 result.id, ce->name);
184
185 ce->cb->report(result, target_data, elevel);
186}
187
188
189
190/* --------------------------------------------------------------------------------
191 * Internal callback related functions operating on IO Handles
192 * --------------------------------------------------------------------------------
193 */
194
195/*
196 * Internal function which invokes ->stage for all the registered callbacks.
197 */
198void
199 pgaio_io_call_stage(PgAioHandle *ioh)
200{
201 Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
202 Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
203
204 for (int i = ioh->num_callbacks; i > 0; i--)
205 {
206 PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
207 uint8 cb_data = ioh->callbacks_data[i - 1];
208 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
209
210 if (!ce->cb->stage)
211 continue;
212
213 pgaio_debug_io(DEBUG3, ioh,
214 "calling cb #%d %d/%s->stage(%u)",
215 i, cb_id, ce->name, cb_data);
216 ce->cb->stage(ioh, cb_data);
217 }
218}
219
220/*
221 * Internal function which invokes ->complete_shared for all the registered
222 * callbacks.
223 */
224void
225 pgaio_io_call_complete_shared(PgAioHandle *ioh)
226{
227 PgAioResult result;
228
229 START_CRIT_SECTION();
230
231 Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
232 Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
233
234 result.status = PGAIO_RS_OK; /* low level IO is always considered OK */
235 result.result = ioh->result;
236 result.id = PGAIO_HCB_INVALID;
237 result.error_data = 0;
238
239 /*
240 * Call callbacks with the last registered (innermost) callback first.
241 * Each callback can modify the result forwarded to the next callback.
242 */
243 for (int i = ioh->num_callbacks; i > 0; i--)
244 {
245 PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
246 uint8 cb_data = ioh->callbacks_data[i - 1];
247 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
248
249 if (!ce->cb->complete_shared)
250 continue;
251
252 pgaio_debug_io(DEBUG4, ioh,
253 "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
254 i, cb_id, ce->name,
255 cb_data,
256 pgaio_result_status_string(result.status),
257 result.id, result.error_data, result.result);
258 result = ce->cb->complete_shared(ioh, result, cb_data);
259
260 /* the callback should never transition to unknown */
261 Assert(result.status != PGAIO_RS_UNKNOWN);
262 }
263
264 ioh->distilled_result = result;
265
266 pgaio_debug_io(DEBUG3, ioh,
267 "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
268 pgaio_result_status_string(result.status),
269 result.id, result.error_data, result.result,
270 ioh->result);
271
272 END_CRIT_SECTION();
273}
274
275/*
276 * Internal function which invokes ->complete_local for all the registered
277 * callbacks.
278 *
279 * Returns ioh->distilled_result after, possibly, being modified by local
280 * callbacks.
281 *
282 * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
283 */
284PgAioResult
285 pgaio_io_call_complete_local(PgAioHandle *ioh)
286{
287 PgAioResult result;
288
289 START_CRIT_SECTION();
290
291 Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
292 Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
293
294 /* start with distilled result from shared callback */
295 result = ioh->distilled_result;
296 Assert(result.status != PGAIO_RS_UNKNOWN);
297
298 for (int i = ioh->num_callbacks; i > 0; i--)
299 {
300 PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
301 uint8 cb_data = ioh->callbacks_data[i - 1];
302 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
303
304 if (!ce->cb->complete_local)
305 continue;
306
307 pgaio_debug_io(DEBUG4, ioh,
308 "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
309 i, cb_id, ce->name, cb_data,
310 pgaio_result_status_string(result.status),
311 result.id, result.error_data, result.result);
312 result = ce->cb->complete_local(ioh, result, cb_data);
313
314 /* the callback should never transition to unknown */
315 Assert(result.status != PGAIO_RS_UNKNOWN);
316 }
317
318 /*
319 * Note that we don't save the result in ioh->distilled_result, the local
320 * callback's result should not ever matter to other waiters. However, the
321 * local backend does care, so we return the result as modified by local
322 * callbacks, which then can be passed to ioh->report_return->result.
323 */
324 pgaio_debug_io(DEBUG3, ioh,
325 "after local completion: result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
326 pgaio_result_status_string(result.status),
327 result.id, result.error_data, result.result,
328 ioh->result);
329
330 END_CRIT_SECTION();
331
332 return result;
333}
const char * pgaio_result_status_string(PgAioResultStatus rs)
Definition: aio.c:931
PgAioCtl * pgaio_ctl
Definition: aio.c:78
PgAioHandleCallbackID
Definition: aio.h:193
@ PGAIO_HCB_MD_READV
Definition: aio.h:196
@ PGAIO_HCB_LOCAL_BUFFER_READV
Definition: aio.h:200
@ PGAIO_HCB_SHARED_BUFFER_READV
Definition: aio.h:198
@ PGAIO_HCB_INVALID
Definition: aio.h:194
#define PGAIO_HANDLE_MAX_CALLBACKS
Definition: aio.h:267
#define PGAIO_TID_COUNT
Definition: aio.h:123
#define PGAIO_OP_COUNT
Definition: aio.h:107
@ PGAIO_TID_INVALID
Definition: aio.h:119
@ PGAIO_OP_INVALID
Definition: aio.h:90
#define PGAIO_HCB_MAX
Definition: aio.h:203
void pgaio_io_call_stage(PgAioHandle *ioh)
Definition: aio_callback.c:199
static const PgAioHandleCallbacksEntry aio_handle_cbs[]
Definition: aio_callback.c:39
void pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
Definition: aio_callback.c:140
PgAioResult pgaio_io_call_complete_local(PgAioHandle *ioh)
Definition: aio_callback.c:285
void pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id, uint8 cb_data)
Definition: aio_callback.c:86
void pgaio_io_call_complete_shared(PgAioHandle *ioh)
Definition: aio_callback.c:225
uint64 * pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
Definition: aio_callback.c:156
void pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
Definition: aio_callback.c:122
static const PgAioHandleCallbacks aio_invalid_cb
Definition: aio_callback.c:26
struct PgAioHandleCallbacksEntry PgAioHandleCallbacksEntry
#define CALLBACK_ENTRY(id, callback)
void pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
Definition: aio_callback.c:173
@ PGAIO_HS_HANDED_OUT
Definition: aio_internal.h:53
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:395
@ PGAIO_RS_OK
Definition: aio_types.h:81
@ PGAIO_RS_UNKNOWN
Definition: aio_types.h:80
int io_max_combine_limit
Definition: bufmgr.c:172
const PgAioHandleCallbacks aio_shared_buffer_readv_cb
Definition: bufmgr.c:7404
const PgAioHandleCallbacks aio_local_buffer_readv_cb
Definition: bufmgr.c:7413
uint8_t uint8
Definition: c.h:536
uint64_t uint64
Definition: c.h:539
uint32_t uint32
Definition: c.h:538
#define lengthof(array)
Definition: c.h:787
#define DEBUG3
Definition: elog.h:28
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define DEBUG4
Definition: elog.h:27
Assert(PointerIsAligned(start, uint64))
i
int i
Definition: isn.c:77
const PgAioHandleCallbacks aio_md_readv_cb
Definition: md.c:159
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
const void size_t len
const void * data
#define PG_IOV_MAX
Definition: pg_iovec.h:47
uint64 * handle_data
Definition: aio_internal.h:249
const PgAioHandleCallbacks *const cb
Definition: aio_callback.c:30
const char *const name
Definition: aio_callback.c:31
PgAioHandleCallbackComplete complete_shared
Definition: aio.h:239
PgAioHandleCallbackStage stage
Definition: aio.h:219
PgAioHandleCallbackReport report
Definition: aio.h:258
PgAioHandleCallbackComplete complete_local
Definition: aio.h:251
uint8 target
Definition: aio_internal.h:108
PgAioResult distilled_result
Definition: aio_internal.h:162
uint8 callbacks[PGAIO_HANDLE_MAX_CALLBACKS]
Definition: aio_internal.h:119
uint8 handle_data_len
Definition: aio_internal.h:128
uint8 op
Definition: aio_internal.h:111
int32 result
Definition: aio_internal.h:134
uint32 iovec_off
Definition: aio_internal.h:170
uint8 callbacks_data[PGAIO_HANDLE_MAX_CALLBACKS]
Definition: aio_internal.h:122
uint8 state
Definition: aio_internal.h:105
uint8 num_callbacks
Definition: aio_internal.h:116
uint32 status
Definition: aio_types.h:108
uint32 error_data
Definition: aio_types.h:111
int32 result
Definition: aio_types.h:113
uint32 id
Definition: aio_types.h:105

AltStyle によって変換されたページ (->オリジナル) /