1/*-------------------------------------------------------------------------
4 * Use the frontend/backend protocol for communication over a shm_mq
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * src/backend/libpq/pqmq.c
11 *-------------------------------------------------------------------------
49 * Arrange to redirect frontend/backend protocol messages to a shared-memory
63 * When the DSM that contains our shm_mq goes away, we need to stop sending
78 * Arrange to SendProcSignal() to the parallel leader each time we transmit
79 * message data via the shm_mq.
112 /* There's never anything pending. */
117 * Transmit a libpq protocol message to the shared memory message queue
118 * selected via pq_mq_handle. We don't include a length word, because the
119 * receiver will know the length of the message from shm_mq_receive().
128 * If we're sending a message, and we have to wait because the queue is
129 * full, and then we get interrupted, and that interrupt results in trying
130 * to send another message, we respond by detaching the queue. There's no
131 * way to return to the original context, but even if there were, just
132 * queueing the message would amount to indefinitely postponing the
133 * response to the interrupt. So we do this instead.
147 * If the message queue is already gone, just ignore the message. This
148 * doesn't necessarily indicate a problem; for example, DEBUG messages can
149 * be generated late in the shutdown sequence, after all DSMs have already
157 iov[0].
data = &msgtype;
165 * Immediately notify the receiver by passing force_flush as true so
166 * that the shared memory value is updated before we send the parallel
167 * message signal right after this.
191 WAIT_EVENT_MESSAGE_QUEUE_PUT_MESSAGE);
208 * While the shm_mq machinery does support sending a message in
209 * non-blocking mode, there's currently no way to try sending beginning to
210 * send the message that doesn't also commit us to completing the
211 * transmission. This could be improved in the future, but for now we
218 * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
219 * structure with the results.
224 /* Initialize edata with reasonable defaults. */
229 /* Loop over fields and extract each one. */
245 /* ignore, trusting we'll get a nonlocalized version */
248 if (strcmp(
value,
"DEBUG") == 0)
251 * We can't reconstruct the exact DEBUG level, but
252 * presumably it was >= client_min_messages, so select
253 * DEBUG1 to ensure we'll pass it on to the client.
257 else if (strcmp(
value,
"LOG") == 0)
260 * It can't be LOG_SERVER_ONLY, or the worker wouldn't
261 * have sent it to us; so LOG is the correct value.
265 else if (strcmp(
value,
"INFO") == 0)
267 else if (strcmp(
value,
"NOTICE") == 0)
269 else if (strcmp(
value,
"WARNING") == 0)
271 else if (strcmp(
value,
"ERROR") == 0)
273 else if (strcmp(
value,
"FATAL") == 0)
275 else if (strcmp(
value,
"PANIC") == 0)
281 if (strlen(
value) != 5)
332 elog(
ERROR,
"unrecognized error field code: %d", (
int) code);
bool IsLogicalParallelApplyWorker(void)
#define MemSet(start, val, len)
void on_dsm_detach(dsm_segment *seg, on_dsm_detach_callback function, Datum arg)
#define MAKE_SQLSTATE(ch1, ch2, ch3, ch4, ch5)
ProtocolVersion FrontendProtocol
Assert(PointerIsAligned(start, uint64))
#define IsParallelWorker()
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
#define CHECK_FOR_INTERRUPTS()
int32 pg_strtoint32(const char *s)
CommandDest whereToSendOutput
#define PG_DIAG_INTERNAL_QUERY
#define PG_DIAG_SCHEMA_NAME
#define PG_DIAG_CONSTRAINT_NAME
#define PG_DIAG_DATATYPE_NAME
#define PG_DIAG_SOURCE_LINE
#define PG_DIAG_STATEMENT_POSITION
#define PG_DIAG_SOURCE_FILE
#define PG_DIAG_MESSAGE_HINT
#define PG_DIAG_SEVERITY_NONLOCALIZED
#define PG_DIAG_TABLE_NAME
#define PG_DIAG_MESSAGE_PRIMARY
#define PG_DIAG_COLUMN_NAME
#define PG_DIAG_MESSAGE_DETAIL
#define PG_DIAG_SOURCE_FUNCTION
#define PG_DIAG_INTERNAL_POSITION
const PQcommMethods * PqCommMethods
#define PG_PROTOCOL_LATEST
static int mq_flush(void)
static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
static const PQcommMethods PqCommMqMethods
static ProcNumber pq_mq_parallel_leader_proc_number
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
static int mq_putmessage(char msgtype, const char *s, size_t len)
static bool mq_is_send_pending(void)
static pid_t pq_mq_parallel_leader_pid
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
static shm_mq_handle * pq_mq_handle
static void mq_putmessage_noblock(char msgtype, const char *s, size_t len)
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
static void mq_comm_reset(void)
static int mq_flush_if_writable(void)
#define INVALID_PROC_NUMBER
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
@ PROCSIG_PARALLEL_MESSAGE
@ PROCSIG_PARALLEL_APPLY_MESSAGE
void shm_mq_detach(shm_mq_handle *mqh)
shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, bool force_flush)
struct MemoryContextData * assoc_context
#define WL_EXIT_ON_PM_DEATH