PostgreSQL Source Code git master
Data Structures | Macros | Typedefs | Functions | Variables
applyparallelworker.c File Reference
#include "postgres.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
Include dependency graph for applyparallelworker.c:

Go to the source code of this file.

Data Structures

 

Macros

#define  PG_LOGICAL_APPLY_SHM_MAGIC   0x787ca067
 
#define  PARALLEL_APPLY_KEY_SHARED   1
 
#define  PARALLEL_APPLY_KEY_MQ   2
 
 
#define  DSM_QUEUE_SIZE   (16 * 1024 * 1024)
 
#define  DSM_ERROR_QUEUE_SIZE   (16 * 1024)
 
#define  SIZE_STATS_MESSAGE   (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
 
#define  PARALLEL_APPLY_LOCK_STREAM   0
 
#define  PARALLEL_APPLY_LOCK_XACT   1
 
#define  SHM_SEND_RETRY_INTERVAL_MS   1000
 
 

Typedefs

 

Functions

 
 
 
static bool  pa_can_start (void)
 
static bool  pa_setup_dsm (ParallelApplyWorkerInfo *winfo)
 
 
 
 
static void  pa_free_worker (ParallelApplyWorkerInfo *winfo)
 
void  pa_detach_all_error_mq (void)
 
 
 
static void  ProcessParallelApplyInterrupts (void)
 
 
static void  pa_shutdown (int code, Datum arg)
 
void  ParallelApplyWorkerMain (Datum main_arg)
 
 
 
 
bool  pa_send_data (ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
 
void  pa_switch_to_partial_serialize (ParallelApplyWorkerInfo *winfo, bool stream_locked)
 
 
 
 
 
static void  pa_savepoint_name (Oid suboid, TransactionId xid, char *spname, Size szsp)
 
void  pa_start_subtrans (TransactionId current_xid, TransactionId top_xid)
 
void  pa_reset_subtrans (void)
 
 
 
void  pa_lock_stream (TransactionId xid, LOCKMODE lockmode)
 
void  pa_unlock_stream (TransactionId xid, LOCKMODE lockmode)
 
 
 
 
 

Variables

static HTABParallelApplyTxnHash = NULL
 
 
 
volatile sig_atomic_t  ParallelApplyMessagePending = false
 
 
static Listsubxactlist = NIL
 

Macro Definition Documentation

DSM_ERROR_QUEUE_SIZE

#define DSM_ERROR_QUEUE_SIZE   (16 * 1024)

Definition at line 195 of file applyparallelworker.c.

DSM_QUEUE_SIZE

#define DSM_QUEUE_SIZE   (16 * 1024 * 1024)

Definition at line 187 of file applyparallelworker.c.

PARALLEL_APPLY_KEY_ERROR_QUEUE

#define PARALLEL_APPLY_KEY_ERROR_QUEUE   3

Definition at line 184 of file applyparallelworker.c.

PARALLEL_APPLY_KEY_MQ

#define PARALLEL_APPLY_KEY_MQ   2

Definition at line 183 of file applyparallelworker.c.

PARALLEL_APPLY_KEY_SHARED

#define PARALLEL_APPLY_KEY_SHARED   1

Definition at line 182 of file applyparallelworker.c.

PARALLEL_APPLY_LOCK_STREAM

#define PARALLEL_APPLY_LOCK_STREAM   0

Definition at line 209 of file applyparallelworker.c.

PARALLEL_APPLY_LOCK_XACT

#define PARALLEL_APPLY_LOCK_XACT   1

Definition at line 210 of file applyparallelworker.c.

PG_LOGICAL_APPLY_SHM_MAGIC

#define PG_LOGICAL_APPLY_SHM_MAGIC   0x787ca067

Definition at line 175 of file applyparallelworker.c.

SHM_SEND_RETRY_INTERVAL_MS

#define SHM_SEND_RETRY_INTERVAL_MS   1000

SHM_SEND_TIMEOUT_MS

#define SHM_SEND_TIMEOUT_MS   (10000 - SHM_SEND_RETRY_INTERVAL_MS)

SIZE_STATS_MESSAGE

#define SIZE_STATS_MESSAGE   (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))

Definition at line 203 of file applyparallelworker.c.

Typedef Documentation

ParallelApplyWorkerEntry

Function Documentation

HandleParallelApplyMessageInterrupt()

void HandleParallelApplyMessageInterrupt ( void  )

Definition at line 997 of file applyparallelworker.c.

998{
999 InterruptPending = true;
1002}
volatile sig_atomic_t ParallelApplyMessagePending
volatile sig_atomic_t InterruptPending
Definition: globals.c:32
struct Latch * MyLatch
Definition: globals.c:63
void SetLatch(Latch *latch)
Definition: latch.c:290

References InterruptPending, MyLatch, ParallelApplyMessagePending, and SetLatch().

Referenced by procsignal_sigusr1_handler().

LogicalParallelApplyLoop()

static void LogicalParallelApplyLoop ( shm_mq_handlemqh )
static

Definition at line 735 of file applyparallelworker.c.

736{
737 shm_mq_result shmq_res;
738 ErrorContextCallback errcallback;
740
741 /*
742 * Init the ApplyMessageContext which we clean up after each replication
743 * protocol message.
744 */
746 "ApplyMessageContext",
748
749 /*
750 * Push apply error context callback. Fields will be filled while applying
751 * a change.
752 */
753 errcallback.callback = apply_error_callback;
754 errcallback.previous = error_context_stack;
755 error_context_stack = &errcallback;
756
757 for (;;)
758 {
759 void *data;
760 Size len;
761
763
764 /* Ensure we are reading the data into our memory context. */
766
767 shmq_res = shm_mq_receive(mqh, &len, &data, true);
768
769 if (shmq_res == SHM_MQ_SUCCESS)
770 {
772 int c;
773
774 if (len == 0)
775 elog(ERROR, "invalid message length");
776
778
779 /*
780 * The first byte of messages sent from leader apply worker to
781 * parallel apply workers can only be PqReplMsg_WALData.
782 */
783 c = pq_getmsgbyte(&s);
784 if (c != PqReplMsg_WALData)
785 elog(ERROR, "unexpected message \"%c\"", c);
786
787 /*
788 * Ignore statistics fields that have been updated by the leader
789 * apply worker.
790 *
791 * XXX We can avoid sending the statistics fields from the leader
792 * apply worker but for that, it needs to rebuild the entire
793 * message by removing these fields which could be more work than
794 * simply ignoring these fields in the parallel apply worker.
795 */
797
798 apply_dispatch(&s);
799 }
800 else if (shmq_res == SHM_MQ_WOULD_BLOCK)
801 {
802 /* Replay the changes from the file, if any. */
804 {
805 int rc;
806
807 /* Wait for more work. */
808 rc = WaitLatch(MyLatch,
810 1000L,
811 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
812
813 if (rc & WL_LATCH_SET)
815 }
816 }
817 else
818 {
819 Assert(shmq_res == SHM_MQ_DETACHED);
820
822 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 errmsg("lost connection to the logical replication apply worker")));
824 }
825
827 MemoryContextSwitchTo(oldcxt);
828 }
829
830 /* Pop the error context stack. */
831 error_context_stack = errcallback.previous;
832
833 MemoryContextSwitchTo(oldcxt);
834}
static void ProcessParallelApplyInterrupts(void)
#define SIZE_STATS_MESSAGE
static bool pa_process_spooled_messages_if_required(void)
MemoryContext ApplyMessageContext
Definition: worker.c:471
void apply_dispatch(StringInfo s)
Definition: worker.c:3747
MemoryContext ApplyContext
Definition: worker.c:472
void apply_error_callback(void *arg)
Definition: worker.c:6118
size_t Size
Definition: c.h:610
ErrorContextCallback * error_context_stack
Definition: elog.c:95
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
Assert(PointerIsAligned(start, uint64))
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const void size_t len
const void * data
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
c
char * c
Definition: preproc-cursor.c:31
#define PqReplMsg_WALData
Definition: protocol.h:77
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:572
shm_mq_result
Definition: shm_mq.h:37
@ SHM_MQ_SUCCESS
Definition: shm_mq.h:38
@ SHM_MQ_WOULD_BLOCK
Definition: shm_mq.h:39
@ SHM_MQ_DETACHED
Definition: shm_mq.h:40
static void initReadOnlyStringInfo(StringInfo str, char *data, int len)
Definition: stringinfo.h:157
struct ErrorContextCallback * previous
Definition: elog.h:297
void(* callback)(void *arg)
Definition: elog.h:298
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, apply_dispatch(), apply_error_callback(), ApplyContext, ApplyMessageContext, Assert(), ErrorContextCallback::callback, CurrentMemoryContext, StringInfoData::cursor, data, elog, ereport, errcode(), errmsg(), ERROR, error_context_stack, initReadOnlyStringInfo(), len, MemoryContextReset(), MemoryContextSwitchTo(), MyLatch, pa_process_spooled_messages_if_required(), pq_getmsgbyte(), PqReplMsg_WALData, ErrorContextCallback::previous, ProcessParallelApplyInterrupts(), ResetLatch(), SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SIZE_STATS_MESSAGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by ParallelApplyWorkerMain().

pa_allocate_worker()

void pa_allocate_worker ( TransactionId  xid )

Definition at line 471 of file applyparallelworker.c.

472{
473 bool found;
474 ParallelApplyWorkerInfo *winfo = NULL;
476
477 if (!pa_can_start())
478 return;
479
481 if (!winfo)
482 return;
483
484 /* First time through, initialize parallel apply worker state hashtable. */
486 {
487 HASHCTL ctl;
488
489 MemSet(&ctl, 0, sizeof(ctl));
490 ctl.keysize = sizeof(TransactionId);
491 ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
492 ctl.hcxt = ApplyContext;
493
494 ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
495 16, &ctl,
497 }
498
499 /* Create an entry for the requested transaction. */
500 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
501 if (found)
502 elog(ERROR, "hash table corrupted");
503
504 /* Update the transaction information in shared memory. */
505 SpinLockAcquire(&winfo->shared->mutex);
507 winfo->shared->xid = xid;
508 SpinLockRelease(&winfo->shared->mutex);
509
510 winfo->in_use = true;
511 winfo->serialize_changes = false;
512 entry->winfo = winfo;
513}
struct ParallelApplyWorkerEntry ParallelApplyWorkerEntry
static bool pa_can_start(void)
static HTAB * ParallelApplyTxnHash
static ParallelApplyWorkerInfo * pa_launch_parallel_worker(void)
#define MemSet(start, val, len)
Definition: c.h:1019
uint32 TransactionId
Definition: c.h:657
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
tree ctl
Definition: radixtree.h:1838
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
Definition: hsearch.h:66
ParallelApplyWorkerInfo * winfo
ParallelApplyWorkerShared * shared
ParallelTransState xact_state
@ PARALLEL_TRANS_UNKNOWN

References ApplyContext, ctl, elog, ERROR, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), ParallelApplyWorkerInfo::in_use, MemSet, ParallelApplyWorkerShared::mutex, pa_can_start(), pa_launch_parallel_worker(), PARALLEL_TRANS_UNKNOWN, ParallelApplyTxnHash, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, SpinLockAcquire, SpinLockRelease, ParallelApplyWorkerEntry::winfo, ParallelApplyWorkerShared::xact_state, and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_start().

pa_can_start()

static bool pa_can_start ( void  )
static

Definition at line 265 of file applyparallelworker.c.

266{
267 /* Only leader apply workers can start parallel apply workers. */
269 return false;
270
271 /*
272 * It is good to check for any change in the subscription parameter to
273 * avoid the case where for a very long time the change doesn't get
274 * reflected. This can happen when there is a constant flow of streaming
275 * transactions that are handled by parallel apply workers.
276 *
277 * It is better to do it before the below checks so that the latest values
278 * of subscription can be used for the checks.
279 */
281
282 /*
283 * Don't start a new parallel apply worker if the subscription is not
284 * using parallel streaming mode, or if the publisher does not support
285 * parallel apply.
286 */
288 return false;
289
290 /*
291 * Don't start a new parallel worker if user has set skiplsn as it's
292 * possible that they want to skip the streaming transaction. For
293 * streaming transactions, we need to serialize the transaction to a file
294 * so that we can get the last LSN of the transaction to judge whether to
295 * skip before starting to apply the change.
296 *
297 * One might think that we could allow parallelism if the first lsn of the
298 * transaction is greater than skiplsn, but we don't send it with the
299 * STREAM START message, and it doesn't seem worth sending the extra eight
300 * bytes with the STREAM START to enable parallelism for this case.
301 */
303 return false;
304
305 /*
306 * For streaming transactions that are being applied using a parallel
307 * apply worker, we cannot decide whether to apply the change for a
308 * relation that is not in the READY state (see
309 * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310 * time. So, we don't start the new parallel apply worker in this case.
311 */
312 if (!AllTablesyncsReady())
313 return false;
314
315 return true;
316}
void maybe_reread_subscription(void)
Definition: worker.c:5007
Subscription * MySubscription
Definition: worker.c:479
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
XLogRecPtr skiplsn
bool AllTablesyncsReady(void)
Definition: tablesync.c:1770
static bool am_leader_apply_worker(void)
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References AllTablesyncsReady(), am_leader_apply_worker(), maybe_reread_subscription(), MyLogicalRepWorker, MySubscription, LogicalRepWorker::parallel_apply, Subscription::skiplsn, and XLogRecPtrIsInvalid.

Referenced by pa_allocate_worker().

pa_decr_and_wait_stream_block()

void pa_decr_and_wait_stream_block ( void  )

Definition at line 1599 of file applyparallelworker.c.

1600{
1602
1603 /*
1604 * It is only possible to not have any pending stream chunks when we are
1605 * applying spooled messages.
1606 */
1608 {
1610 return;
1611
1612 elog(ERROR, "invalid pending streaming chunk 0");
1613 }
1614
1616 {
1619 }
1620}
void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
void pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
static bool pa_has_spooled_message_pending()
ParallelApplyWorkerShared * MyParallelShared
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:437
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:237
#define AccessShareLock
Definition: lockdefs.h:36
pg_atomic_uint32 pending_stream_count
static bool am_parallel_apply_worker(void)

References AccessShareLock, am_parallel_apply_worker(), Assert(), elog, ERROR, MyParallelShared, pa_has_spooled_message_pending(), pa_lock_stream(), pa_unlock_stream(), ParallelApplyWorkerShared::pending_stream_count, pg_atomic_read_u32(), pg_atomic_sub_fetch_u32(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), and apply_handle_stream_stop().

pa_detach_all_error_mq()

void pa_detach_all_error_mq ( void  )

Definition at line 623 of file applyparallelworker.c.

624{
625 ListCell *lc;
626
627 foreach(lc, ParallelApplyWorkerPool)
628 {
630
631 if (winfo->error_mq_handle)
632 {
634 winfo->error_mq_handle = NULL;
635 }
636 }
637}
static List * ParallelApplyWorkerPool
#define lfirst(lc)
Definition: pg_list.h:172
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
shm_mq_handle * error_mq_handle
Definition: pg_list.h:46

References ParallelApplyWorkerInfo::error_mq_handle, lfirst, ParallelApplyWorkerPool, and shm_mq_detach().

Referenced by logicalrep_worker_detach().

pa_find_worker()

ParallelApplyWorkerInfo * pa_find_worker ( TransactionId  xid )

Definition at line 519 of file applyparallelworker.c.

520{
521 bool found;
523
524 if (!TransactionIdIsValid(xid))
525 return NULL;
526
528 return NULL;
529
530 /* Return the cached parallel apply worker if valid. */
532 return stream_apply_worker;
533
534 /* Find an entry for the requested transaction. */
535 entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
536 if (found)
537 {
538 /* The worker must not have exited. */
539 Assert(entry->winfo->in_use);
540 return entry->winfo;
541 }
542
543 return NULL;
544}
static ParallelApplyWorkerInfo * stream_apply_worker
@ HASH_FIND
Definition: hsearch.h:113
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), HASH_FIND, hash_search(), ParallelApplyWorkerInfo::in_use, ParallelApplyTxnHash, stream_apply_worker, TransactionIdIsValid, and ParallelApplyWorkerEntry::winfo.

Referenced by get_transaction_apply_action().

pa_free_worker()

static void pa_free_worker ( ParallelApplyWorkerInfowinfo )
static

Definition at line 557 of file applyparallelworker.c.

558{
560 Assert(winfo->in_use);
562
564 elog(ERROR, "hash table corrupted");
565
566 /*
567 * Stop the worker if there are enough workers in the pool.
568 *
569 * XXX Additionally, we also stop the worker if the leader apply worker
570 * serialize part of the transaction data due to a send timeout. This is
571 * because the message could be partially written to the queue and there
572 * is no way to clean the queue other than resending the message until it
573 * succeeds. Instead of trying to send the data which anyway would have
574 * been serialized and then letting the parallel apply worker deal with
575 * the spurious message, we stop the worker.
576 */
577 if (winfo->serialize_changes ||
580 {
582 pa_free_worker_info(winfo);
583
584 return;
585 }
586
587 winfo->in_use = false;
588 winfo->serialize_changes = false;
589}
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared)
@ HASH_REMOVE
Definition: hsearch.h:115
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
Definition: launcher.c:657
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:54
static int list_length(const List *l)
Definition: pg_list.h:152
@ PARALLEL_TRANS_FINISHED

References am_parallel_apply_worker(), Assert(), elog, ERROR, HASH_REMOVE, hash_search(), ParallelApplyWorkerInfo::in_use, list_length(), logicalrep_pa_worker_stop(), max_parallel_apply_workers_per_subscription, pa_free_worker_info(), pa_get_xact_state(), PARALLEL_TRANS_FINISHED, ParallelApplyTxnHash, ParallelApplyWorkerPool, ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

pa_free_worker_info()

static void pa_free_worker_info ( ParallelApplyWorkerInfowinfo )
static

Definition at line 596 of file applyparallelworker.c.

597{
598 Assert(winfo);
599
600 if (winfo->mq_handle)
601 shm_mq_detach(winfo->mq_handle);
602
603 if (winfo->error_mq_handle)
605
606 /* Unlink the files with serialized changes. */
607 if (winfo->serialize_changes)
609
610 if (winfo->dsm_seg)
611 dsm_detach(winfo->dsm_seg);
612
613 /* Remove from the worker pool. */
615
616 pfree(winfo);
617}
void stream_cleanup_files(Oid subid, TransactionId xid)
Definition: worker.c:5350
void dsm_detach(dsm_segment *seg)
Definition: dsm.c:803
List * list_delete_ptr(List *list, void *datum)
Definition: list.c:872
void pfree(void *pointer)
Definition: mcxt.c:1594
shm_mq_handle * mq_handle

References Assert(), dsm_detach(), ParallelApplyWorkerInfo::dsm_seg, ParallelApplyWorkerInfo::error_mq_handle, list_delete_ptr(), ParallelApplyWorkerInfo::mq_handle, MyLogicalRepWorker, ParallelApplyWorkerPool, pfree(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, shm_mq_detach(), stream_cleanup_files(), LogicalRepWorker::subid, and ParallelApplyWorkerShared::xid.

Referenced by pa_free_worker(), and pa_launch_parallel_worker().

pa_get_fileset_state()

static PartialFileSetState pa_get_fileset_state ( void  )
static

Definition at line 1526 of file applyparallelworker.c.

1527{
1528 PartialFileSetState fileset_state;
1529
1531
1533 fileset_state = MyParallelShared->fileset_state;
1535
1536 return fileset_state;
1537}
PartialFileSetState fileset_state
PartialFileSetState

References am_parallel_apply_worker(), Assert(), ParallelApplyWorkerShared::fileset_state, ParallelApplyWorkerShared::mutex, MyParallelShared, SpinLockAcquire, and SpinLockRelease.

Referenced by pa_has_spooled_message_pending(), and pa_process_spooled_messages_if_required().

pa_get_xact_state()

static ParallelTransState pa_get_xact_state ( ParallelApplyWorkerSharedwshared )
static

Definition at line 1327 of file applyparallelworker.c.

1328{
1329 ParallelTransState xact_state;
1330
1331 SpinLockAcquire(&wshared->mutex);
1332 xact_state = wshared->xact_state;
1333 SpinLockRelease(&wshared->mutex);
1334
1335 return xact_state;
1336}
ParallelTransState

References ParallelApplyWorkerShared::mutex, SpinLockAcquire, SpinLockRelease, and ParallelApplyWorkerShared::xact_state.

Referenced by pa_free_worker(), pa_wait_for_xact_finish(), and pa_wait_for_xact_state().

pa_has_spooled_message_pending()

static bool pa_has_spooled_message_pending ( )
static

Definition at line 643 of file applyparallelworker.c.

644{
645 PartialFileSetState fileset_state;
646
647 fileset_state = pa_get_fileset_state();
648
649 return (fileset_state != FS_EMPTY);
650}
static PartialFileSetState pa_get_fileset_state(void)
@ FS_EMPTY

References FS_EMPTY, and pa_get_fileset_state().

Referenced by pa_decr_and_wait_stream_block().

pa_launch_parallel_worker()

static ParallelApplyWorkerInfo * pa_launch_parallel_worker ( void  )
static

Definition at line 404 of file applyparallelworker.c.

405{
406 MemoryContext oldcontext;
407 bool launched;
409 ListCell *lc;
410
411 /* Try to get an available parallel apply worker from the worker pool. */
412 foreach(lc, ParallelApplyWorkerPool)
413 {
414 winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
415
416 if (!winfo->in_use)
417 return winfo;
418 }
419
420 /*
421 * Start a new parallel apply worker.
422 *
423 * The worker info can be used for the lifetime of the worker process, so
424 * create it in a permanent context.
425 */
427
429
430 /* Setup shared memory. */
431 if (!pa_setup_dsm(winfo))
432 {
433 MemoryContextSwitchTo(oldcontext);
434 pfree(winfo);
435 return NULL;
436 }
437
445 false);
446
447 if (launched)
448 {
450 }
451 else
452 {
453 pa_free_worker_info(winfo);
454 winfo = NULL;
455 }
456
457 MemoryContextSwitchTo(oldcontext);
458
459 return winfo;
460}
static bool pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
dsm_handle dsm_segment_handle(dsm_segment *seg)
Definition: dsm.c:1123
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:317
List * lappend(List *list, void *datum)
Definition: list.c:339
void * palloc0(Size size)
Definition: mcxt.c:1395
#define InvalidOid
Definition: postgres_ext.h:37
@ WORKERTYPE_PARALLEL_APPLY

References ApplyContext, LogicalRepWorker::dbid, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_handle(), ParallelApplyWorkerInfo::in_use, InvalidOid, lappend(), lfirst, logicalrep_worker_launch(), MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, Subscription::oid, pa_free_worker_info(), pa_setup_dsm(), palloc0(), ParallelApplyWorkerPool, pfree(), LogicalRepWorker::userid, and WORKERTYPE_PARALLEL_APPLY.

Referenced by pa_allocate_worker().

pa_lock_stream()

void pa_lock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

Definition at line 1548 of file applyparallelworker.c.

1549{
1551 PARALLEL_APPLY_LOCK_STREAM, lockmode);
1552}
#define PARALLEL_APPLY_LOCK_STREAM
void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
Definition: lmgr.c:1209

References LockApplyTransactionForSession(), MyLogicalRepWorker, PARALLEL_APPLY_LOCK_STREAM, and LogicalRepWorker::subid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_stop(), pa_decr_and_wait_stream_block(), pa_process_spooled_messages_if_required(), and pa_switch_to_partial_serialize().

pa_lock_transaction()

void pa_lock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)

Definition at line 1581 of file applyparallelworker.c.

1582{
1584 PARALLEL_APPLY_LOCK_XACT, lockmode);
1585}
#define PARALLEL_APPLY_LOCK_XACT

References LockApplyTransactionForSession(), MyLogicalRepWorker, PARALLEL_APPLY_LOCK_XACT, and LogicalRepWorker::subid.

Referenced by apply_handle_stream_start(), and pa_wait_for_xact_finish().

pa_process_spooled_messages_if_required()

static bool pa_process_spooled_messages_if_required ( void  )
static

Definition at line 659 of file applyparallelworker.c.

660{
661 PartialFileSetState fileset_state;
662
663 fileset_state = pa_get_fileset_state();
664
665 if (fileset_state == FS_EMPTY)
666 return false;
667
668 /*
669 * If the leader apply worker is busy serializing the partial changes then
670 * acquire the stream lock now and wait for the leader worker to finish
671 * serializing the changes. Otherwise, the parallel apply worker won't get
672 * a chance to receive a STREAM_STOP (and acquire the stream lock) until
673 * the leader had serialized all changes which can lead to undetected
674 * deadlock.
675 *
676 * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
677 * worker has finished serializing the changes.
678 */
679 if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
680 {
683
684 fileset_state = pa_get_fileset_state();
685 }
686
687 /*
688 * We cannot read the file immediately after the leader has serialized all
689 * changes to the file because there may still be messages in the memory
690 * queue. We will apply all spooled messages the next time we call this
691 * function and that will ensure there are no messages left in the memory
692 * queue.
693 */
694 if (fileset_state == FS_SERIALIZE_DONE)
695 {
697 }
698 else if (fileset_state == FS_READY)
699 {
704 }
705
706 return true;
707}
void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, PartialFileSetState fileset_state)
void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn)
Definition: worker.c:2238
@ FS_SERIALIZE_DONE
@ FS_READY
@ FS_SERIALIZE_IN_PROGRESS
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, apply_spooled_messages(), ParallelApplyWorkerShared::fileset, FS_EMPTY, FS_READY, FS_SERIALIZE_DONE, FS_SERIALIZE_IN_PROGRESS, InvalidXLogRecPtr, MyParallelShared, pa_get_fileset_state(), pa_lock_stream(), pa_set_fileset_state(), pa_unlock_stream(), and ParallelApplyWorkerShared::xid.

Referenced by LogicalParallelApplyLoop().

pa_reset_subtrans()

void pa_reset_subtrans ( void  )

Definition at line 1410 of file applyparallelworker.c.

1411{
1412 /*
1413 * We don't need to free this explicitly as the allocated memory will be
1414 * freed at the transaction end.
1415 */
1416 subxactlist = NIL;
1417}
static List * subxactlist
#define NIL
Definition: pg_list.h:68

References NIL, and subxactlist.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), and pa_stream_abort().

pa_savepoint_name()

static void pa_savepoint_name ( Oid  suboid,
TransactionId  xid,
char *  spname,
Size  szsp 
)
static

Definition at line 1356 of file applyparallelworker.c.

1357{
1358 snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1359}
#define snprintf
Definition: port.h:239

References snprintf.

Referenced by pa_start_subtrans(), and pa_stream_abort().

pa_send_data()

bool pa_send_data ( ParallelApplyWorkerInfowinfo,
Size  nbytes,
const void *  data 
)

Definition at line 1154 of file applyparallelworker.c.

1155{
1156 int rc;
1157 shm_mq_result result;
1158 TimestampTz startTime = 0;
1159
1161 Assert(!winfo->serialize_changes);
1162
1163 /*
1164 * We don't try to send data to parallel worker for 'immediate' mode. This
1165 * is primarily used for testing purposes.
1166 */
1168 return false;
1169
1170/*
1171 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1172 * to send the message unless the parallel apply worker is waiting on some
1173 * lock or there is a serious resource crunch. See the comments atop this file
1174 * to know why we are using a non-blocking way to send the message.
1175 */
1176#define SHM_SEND_RETRY_INTERVAL_MS 1000
1177#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1178
1179 for (;;)
1180 {
1181 result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1182
1183 if (result == SHM_MQ_SUCCESS)
1184 return true;
1185 else if (result == SHM_MQ_DETACHED)
1186 ereport(ERROR,
1187 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1188 errmsg("could not send data to shared-memory queue")));
1189
1190 Assert(result == SHM_MQ_WOULD_BLOCK);
1191
1192 /* Wait before retrying. */
1193 rc = WaitLatch(MyLatch,
1196 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1197
1198 if (rc & WL_LATCH_SET)
1199 {
1202 }
1203
1204 if (startTime == 0)
1205 startTime = GetCurrentTimestamp();
1206 else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1208 return false;
1209 }
1210}
#define SHM_SEND_TIMEOUT_MS
#define SHM_SEND_RETRY_INTERVAL_MS
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
#define unlikely(x)
Definition: c.h:402
int64 TimestampTz
Definition: timestamp.h:39
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
int debug_logical_replication_streaming
Definition: reorderbuffer.c:229
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, bool force_flush)
Definition: shm_mq.c:329
bool IsTransactionState(void)
Definition: xact.c:387

References Assert(), CHECK_FOR_INTERRUPTS, data, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, ereport, errcode(), errmsg(), ERROR, GetCurrentTimestamp(), IsTransactionState(), ParallelApplyWorkerInfo::mq_handle, MyLatch, ResetLatch(), ParallelApplyWorkerInfo::serialize_changes, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, SHM_SEND_RETRY_INTERVAL_MS, SHM_SEND_TIMEOUT_MS, TimestampDifferenceExceeds(), unlikely, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

pa_set_fileset_state()

void pa_set_fileset_state ( ParallelApplyWorkerSharedwshared,
PartialFileSetState  fileset_state 
)

Definition at line 1506 of file applyparallelworker.c.

1508{
1509 SpinLockAcquire(&wshared->mutex);
1510 wshared->fileset_state = fileset_state;
1511
1512 if (fileset_state == FS_SERIALIZE_DONE)
1513 {
1517 }
1518
1519 SpinLockRelease(&wshared->mutex);
1520}
FileSet * stream_fileset

References am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::fileset, ParallelApplyWorkerShared::fileset_state, FS_SERIALIZE_DONE, ParallelApplyWorkerShared::mutex, MyLogicalRepWorker, SpinLockAcquire, SpinLockRelease, and LogicalRepWorker::stream_fileset.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_process_spooled_messages_if_required(), and pa_switch_to_partial_serialize().

pa_set_stream_apply_worker()

void pa_set_stream_apply_worker ( ParallelApplyWorkerInfowinfo )

Definition at line 1342 of file applyparallelworker.c.

1343{
1344 stream_apply_worker = winfo;
1345}

References stream_apply_worker.

Referenced by apply_handle_stream_start(), and apply_handle_stream_stop().

pa_set_xact_state()

void pa_set_xact_state ( ParallelApplyWorkerSharedwshared,
ParallelTransState  xact_state 
)

Definition at line 1315 of file applyparallelworker.c.

1317{
1318 SpinLockAcquire(&wshared->mutex);
1319 wshared->xact_state = xact_state;
1320 SpinLockRelease(&wshared->mutex);
1321}

References ParallelApplyWorkerShared::mutex, SpinLockAcquire, SpinLockRelease, and ParallelApplyWorkerShared::xact_state.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), and pa_stream_abort().

pa_setup_dsm()

static bool pa_setup_dsm ( ParallelApplyWorkerInfowinfo )
static

Definition at line 327 of file applyparallelworker.c.

328{
330 Size segsize;
331 dsm_segment *seg;
332 shm_toc *toc;
334 shm_mq *mq;
335 Size queue_size = DSM_QUEUE_SIZE;
336 Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
337
338 /*
339 * Estimate how much shared memory we need.
340 *
341 * Because the TOC machinery may choose to insert padding of oddly-sized
342 * requests, we must estimate each chunk separately.
343 *
344 * We need one key to register the location of the header, and two other
345 * keys to track the locations of the message queue and the error message
346 * queue.
347 */
350 shm_toc_estimate_chunk(&e, queue_size);
351 shm_toc_estimate_chunk(&e, error_queue_size);
352
354 segsize = shm_toc_estimate(&e);
355
356 /* Create the shared memory segment and establish a table of contents. */
357 seg = dsm_create(shm_toc_estimate(&e), 0);
358 if (!seg)
359 return false;
360
362 segsize);
363
364 /* Set up the header region. */
365 shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
366 SpinLockInit(&shared->mutex);
367
371 shared->fileset_state = FS_EMPTY;
372
374
375 /* Set up message queue for the worker. */
376 mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
379
380 /* Attach the queue. */
381 winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
382
383 /* Set up error queue for the worker. */
384 mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
385 error_queue_size);
388
389 /* Attach the queue. */
390 winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
391
392 /* Return results to caller. */
393 winfo->dsm_seg = seg;
394 winfo->shared = shared;
395
396 return true;
397}
#define DSM_ERROR_QUEUE_SIZE
#define DSM_QUEUE_SIZE
#define PARALLEL_APPLY_KEY_SHARED
#define PARALLEL_APPLY_KEY_ERROR_QUEUE
#define PARALLEL_APPLY_KEY_MQ
#define PG_LOGICAL_APPLY_SHM_MAGIC
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:219
void * dsm_segment_address(dsm_segment *seg)
Definition: dsm.c:1095
dsm_segment * dsm_create(Size size, int flags)
Definition: dsm.c:516
e
e
Definition: preproc-init.c:82
void shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:224
shm_mq * shm_mq_create(void *address, Size size)
Definition: shm_mq.c:177
void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
Definition: shm_mq.c:206
shm_mq_handle * shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
Definition: shm_mq.c:290
void * shm_toc_allocate(shm_toc *toc, Size nbytes)
Definition: shm_toc.c:88
Size shm_toc_estimate(shm_toc_estimator *e)
Definition: shm_toc.c:263
shm_toc * shm_toc_create(uint64 magic, void *address, Size nbytes)
Definition: shm_toc.c:40
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
Definition: shm_toc.c:171
#define shm_toc_estimate_chunk(e, sz)
Definition: shm_toc.h:51
#define shm_toc_initialize_estimator(e)
Definition: shm_toc.h:49
#define shm_toc_estimate_keys(e, cnt)
Definition: shm_toc.h:53
#define SpinLockInit(lock)
Definition: spin.h:57
PGPROC * MyProc
Definition: proc.c:66
Definition: dsm.c:67
Definition: shm_mq.c:72
Definition: shm_toc.c:27

References dsm_create(), DSM_ERROR_QUEUE_SIZE, DSM_QUEUE_SIZE, ParallelApplyWorkerInfo::dsm_seg, dsm_segment_address(), ParallelApplyWorkerInfo::error_mq_handle, ParallelApplyWorkerShared::fileset_state, FS_EMPTY, InvalidXLogRecPtr, ParallelApplyWorkerShared::last_commit_end, ParallelApplyWorkerInfo::mq_handle, ParallelApplyWorkerShared::mutex, MyProc, PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PARALLEL_TRANS_UNKNOWN, ParallelApplyWorkerShared::pending_stream_count, pg_atomic_init_u32(), PG_LOGICAL_APPLY_SHM_MAGIC, ParallelApplyWorkerInfo::shared, shm_mq_attach(), shm_mq_create(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_allocate(), shm_toc_create(), shm_toc_estimate(), shm_toc_estimate_chunk, shm_toc_estimate_keys, shm_toc_initialize_estimator, shm_toc_insert(), SpinLockInit, and ParallelApplyWorkerShared::xact_state.

Referenced by pa_launch_parallel_worker().

pa_shutdown()

static void pa_shutdown ( int  code,
Datum  arg 
)
static

Definition at line 845 of file applyparallelworker.c.

846{
850
852}
void * arg
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_PARALLEL_APPLY_MESSAGE
Definition: procsignal.h:38

References arg, DatumGetPointer(), dsm_detach(), INVALID_PROC_NUMBER, LogicalRepWorker::leader_pid, MyLogicalRepWorker, PROCSIG_PARALLEL_APPLY_MESSAGE, and SendProcSignal().

Referenced by ParallelApplyWorkerMain().

pa_start_subtrans()

void pa_start_subtrans ( TransactionId  current_xid,
TransactionId  top_xid 
)

Definition at line 1370 of file applyparallelworker.c.

1371{
1372 if (current_xid != top_xid &&
1373 !list_member_xid(subxactlist, current_xid))
1374 {
1375 MemoryContext oldctx;
1376 char spname[NAMEDATALEN];
1377
1378 pa_savepoint_name(MySubscription->oid, current_xid,
1379 spname, sizeof(spname));
1380
1381 elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1382
1383 /* We must be in transaction block to define the SAVEPOINT. */
1384 if (!IsTransactionBlock())
1385 {
1386 if (!IsTransactionState())
1388
1391 }
1392
1393 DefineSavepoint(spname);
1394
1395 /*
1396 * CommitTransactionCommand is needed to start a subtransaction after
1397 * issuing a SAVEPOINT inside a transaction block (see
1398 * StartSubTransaction()).
1399 */
1401
1403 subxactlist = lappend_xid(subxactlist, current_xid);
1404 MemoryContextSwitchTo(oldctx);
1405 }
1406}
static void pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
#define DEBUG1
Definition: elog.h:30
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742
MemoryContext TopTransactionContext
Definition: mcxt.c:171
#define NAMEDATALEN
void DefineSavepoint(const char *name)
Definition: xact.c:4385
void StartTransactionCommand(void)
Definition: xact.c:3071
bool IsTransactionBlock(void)
Definition: xact.c:4983
void BeginTransactionBlock(void)
Definition: xact.c:3936
void CommitTransactionCommand(void)
Definition: xact.c:3169

References BeginTransactionBlock(), CommitTransactionCommand(), DEBUG1, DefineSavepoint(), elog, IsTransactionBlock(), IsTransactionState(), lappend_xid(), list_member_xid(), MemoryContextSwitchTo(), MySubscription, NAMEDATALEN, Subscription::oid, pa_savepoint_name(), StartTransactionCommand(), subxactlist, and TopTransactionContext.

Referenced by handle_streamed_transaction().

pa_stream_abort()

void pa_stream_abort ( LogicalRepStreamAbortDataabort_data )

Definition at line 1424 of file applyparallelworker.c.

1425{
1426 TransactionId xid = abort_data->xid;
1427 TransactionId subxid = abort_data->subxid;
1428
1429 /*
1430 * Update origin state so we can restart streaming from correct position
1431 * in case of crash.
1432 */
1435
1436 /*
1437 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1438 * just free the subxactlist.
1439 */
1440 if (subxid == xid)
1441 {
1443
1444 /*
1445 * Release the lock as we might be processing an empty streaming
1446 * transaction in which case the lock won't be released during
1447 * transaction rollback.
1448 *
1449 * Note that it's ok to release the transaction lock before aborting
1450 * the transaction because even if the parallel apply worker dies due
1451 * to crash or some other reason, such a transaction would still be
1452 * considered aborted.
1453 */
1455
1457
1458 if (IsTransactionBlock())
1459 {
1460 EndTransactionBlock(false);
1462 }
1463
1465
1467 }
1468 else
1469 {
1470 /* OK, so it's a subxact. Rollback to the savepoint. */
1471 int i;
1472 char spname[NAMEDATALEN];
1473
1474 pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1475
1476 elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1477
1478 /*
1479 * Search the subxactlist, determine the offset tracked for the
1480 * subxact, and truncate the list.
1481 *
1482 * Note that for an empty sub-transaction we won't find the subxid
1483 * here.
1484 */
1485 for (i = list_length(subxactlist) - 1; i >= 0; i--)
1486 {
1488
1489 if (xid_tmp == subxid)
1490 {
1491 RollbackToSavepoint(spname);
1494 break;
1495 }
1496 }
1497 }
1498}
void pa_set_xact_state(ParallelApplyWorkerShared *wshared, ParallelTransState xact_state)
void pa_reset_subtrans(void)
void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_IDLE
Definition: backend_status.h:28
i
int i
Definition: isn.c:77
List * list_truncate(List *list, int new_size)
Definition: list.c:631
#define AccessExclusiveLock
Definition: lockdefs.h:43
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
static ListCell * list_nth_cell(const List *list, int n)
Definition: pg_list.h:277
#define lfirst_xid(lc)
Definition: pg_list.h:175
TransactionId subxid
Definition: logicalproto.h:189
void RollbackToSavepoint(const char *name)
Definition: xact.c:4579
bool EndTransactionBlock(bool chain)
Definition: xact.c:4056
void AbortCurrentTransaction(void)
Definition: xact.c:3463

References LogicalRepStreamAbortData::abort_lsn, LogicalRepStreamAbortData::abort_time, AbortCurrentTransaction(), AccessExclusiveLock, CommitTransactionCommand(), DEBUG1, elog, EndTransactionBlock(), i, IsTransactionBlock(), lfirst_xid, list_length(), list_nth_cell(), list_truncate(), MyParallelShared, MySubscription, NAMEDATALEN, Subscription::oid, pa_reset_subtrans(), pa_savepoint_name(), pa_set_xact_state(), pa_unlock_transaction(), PARALLEL_TRANS_FINISHED, pgstat_report_activity(), replorigin_session_origin_lsn, replorigin_session_origin_timestamp, RollbackToSavepoint(), STATE_IDLE, subxactlist, LogicalRepStreamAbortData::subxid, and LogicalRepStreamAbortData::xid.

Referenced by apply_handle_stream_abort().

pa_switch_to_partial_serialize()

void pa_switch_to_partial_serialize ( ParallelApplyWorkerInfowinfo,
bool  stream_locked 
)

Definition at line 1219 of file applyparallelworker.c.

1221{
1222 ereport(LOG,
1223 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1224 winfo->shared->xid)));
1225
1226 /*
1227 * The parallel apply worker could be stuck for some reason (say waiting
1228 * on some lock by other backend), so stop trying to send data directly to
1229 * it and start serializing data to the file instead.
1230 */
1231 winfo->serialize_changes = true;
1232
1233 /* Initialize the stream fileset. */
1234 stream_start_internal(winfo->shared->xid, true);
1235
1236 /*
1237 * Acquires the stream lock if not already to make sure that the parallel
1238 * apply worker will wait for the leader to release the stream lock until
1239 * the end of the transaction.
1240 */
1241 if (!stream_locked)
1243
1245}
void stream_start_internal(TransactionId xid, bool first_segment)
Definition: worker.c:1666
#define LOG
Definition: elog.h:31

References AccessExclusiveLock, ereport, errmsg(), FS_SERIALIZE_IN_PROGRESS, LOG, pa_lock_stream(), pa_set_fileset_state(), ParallelApplyWorkerInfo::serialize_changes, ParallelApplyWorkerInfo::shared, stream_start_internal(), and ParallelApplyWorkerShared::xid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), apply_handle_stream_stop(), and handle_streamed_transaction().

pa_unlock_stream()

void pa_unlock_stream ( TransactionId  xid,
LOCKMODE  lockmode 
)

Definition at line 1555 of file applyparallelworker.c.

1556{
1558 PARALLEL_APPLY_LOCK_STREAM, lockmode);
1559}
void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid, LOCKMODE lockmode)
Definition: lmgr.c:1227

References MyLogicalRepWorker, PARALLEL_APPLY_LOCK_STREAM, LogicalRepWorker::subid, and UnlockApplyTransactionForSession().

Referenced by apply_handle_stream_abort(), apply_handle_stream_start(), pa_decr_and_wait_stream_block(), pa_process_spooled_messages_if_required(), and pa_xact_finish().

pa_unlock_transaction()

void pa_unlock_transaction ( TransactionId  xid,
LOCKMODE  lockmode 
)

Definition at line 1588 of file applyparallelworker.c.

References MyLogicalRepWorker, PARALLEL_APPLY_LOCK_XACT, LogicalRepWorker::subid, and UnlockApplyTransactionForSession().

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), pa_stream_abort(), and pa_wait_for_xact_finish().

pa_wait_for_xact_finish()

static void pa_wait_for_xact_finish ( ParallelApplyWorkerInfowinfo )
static

Definition at line 1282 of file applyparallelworker.c.

1283{
1284 /*
1285 * Wait until the parallel apply worker set the state to
1286 * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1287 * lock. This is to prevent leader apply worker from acquiring the
1288 * transaction lock earlier than the parallel apply worker.
1289 */
1291
1292 /*
1293 * Wait for the transaction lock to be released. This is required to
1294 * detect deadlock among leader and parallel apply workers. Refer to the
1295 * comments atop this file.
1296 */
1299
1300 /*
1301 * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1302 * apply worker failed while applying changes causing the lock to be
1303 * released.
1304 */
1306 ereport(ERROR,
1307 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 errmsg("lost connection to the logical replication parallel apply worker")));
1309}
void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
static void pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, ParallelTransState xact_state)
@ PARALLEL_TRANS_STARTED

References AccessShareLock, ereport, errcode(), errmsg(), ERROR, pa_get_xact_state(), pa_lock_transaction(), pa_unlock_transaction(), pa_wait_for_xact_state(), PARALLEL_TRANS_FINISHED, PARALLEL_TRANS_STARTED, ParallelApplyWorkerInfo::shared, and ParallelApplyWorkerShared::xid.

Referenced by pa_xact_finish().

pa_wait_for_xact_state()

static void pa_wait_for_xact_state ( ParallelApplyWorkerInfowinfo,
ParallelTransState  xact_state 
)
static

Definition at line 1252 of file applyparallelworker.c.

1254{
1255 for (;;)
1256 {
1257 /*
1258 * Stop if the transaction state has reached or exceeded the given
1259 * xact_state.
1260 */
1261 if (pa_get_xact_state(winfo->shared) >= xact_state)
1262 break;
1263
1264 /* Wait to be signalled. */
1265 (void) WaitLatch(MyLatch,
1267 10L,
1268 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1269
1270 /* Reset the latch so we don't spin. */
1272
1273 /* An interrupt may have occurred while we were waiting. */
1275 }
1276}

References CHECK_FOR_INTERRUPTS, MyLatch, pa_get_xact_state(), ResetLatch(), ParallelApplyWorkerInfo::shared, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by pa_wait_for_xact_finish().

pa_xact_finish()

void pa_xact_finish ( ParallelApplyWorkerInfowinfo,
XLogRecPtr  remote_lsn 
)

Definition at line 1626 of file applyparallelworker.c.

1627{
1629
1630 /*
1631 * Unlock the shared object lock so that parallel apply worker can
1632 * continue to receive and apply changes.
1633 */
1635
1636 /*
1637 * Wait for that worker to finish. This is necessary to maintain commit
1638 * order which avoids failures due to transaction dependencies and
1639 * deadlocks.
1640 */
1642
1643 if (!XLogRecPtrIsInvalid(remote_lsn))
1644 store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1645
1646 pa_free_worker(winfo);
1647}
static void pa_free_worker(ParallelApplyWorkerInfo *winfo)
static void pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
Definition: worker.c:3911

References AccessExclusiveLock, am_leader_apply_worker(), Assert(), ParallelApplyWorkerShared::last_commit_end, pa_free_worker(), pa_unlock_stream(), pa_wait_for_xact_finish(), ParallelApplyWorkerInfo::shared, store_flush_position(), ParallelApplyWorkerShared::xid, and XLogRecPtrIsInvalid.

Referenced by apply_handle_stream_abort(), apply_handle_stream_commit(), and apply_handle_stream_prepare().

ParallelApplyWorkerMain()

void ParallelApplyWorkerMain ( Datum  main_arg )

Definition at line 858 of file applyparallelworker.c.

859{
861 dsm_handle handle;
862 dsm_segment *seg;
863 shm_toc *toc;
864 shm_mq *mq;
865 shm_mq_handle *mqh;
866 shm_mq_handle *error_mqh;
867 RepOriginId originid;
868 int worker_slot = DatumGetInt32(main_arg);
869 char originname[NAMEDATALEN];
870
872
873 /*
874 * Setup signal handling.
875 *
876 * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
877 * initiated by the leader apply worker. This helps to differentiate it
878 * from the case where we abort the current transaction and exit on
879 * receiving SIGTERM.
880 */
882 pqsignal(SIGTERM, die);
885
886 /*
887 * Attach to the dynamic shared memory segment for the parallel apply, and
888 * find its table of contents.
889 *
890 * Like parallel query, we don't need resource owner by this time. See
891 * ParallelWorkerMain.
892 */
893 memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
894 seg = dsm_attach(handle);
895 if (!seg)
897 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
898 errmsg("could not map dynamic shared memory segment")));
899
901 if (!toc)
903 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 errmsg("invalid magic number in dynamic shared memory segment")));
905
906 /* Look up the shared information. */
907 shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
908 MyParallelShared = shared;
909
910 /*
911 * Attach to the message queue.
912 */
913 mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
915 mqh = shm_mq_attach(mq, seg, NULL);
916
917 /*
918 * Primary initialization is complete. Now, we can attach to our slot.
919 * This is to ensure that the leader apply worker does not write data to
920 * the uninitialized memory queue.
921 */
922 logicalrep_worker_attach(worker_slot);
923
924 /*
925 * Register the shutdown callback after we are attached to the worker
926 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
927 * callback is invoked.
928 */
930
935
936 /*
937 * Attach to the error queue.
938 */
941 error_mqh = shm_mq_attach(mq, seg, NULL);
942
943 pq_redirect_to_shm_mq(seg, error_mqh);
946
949
951
953
954 /* Setup replication origin tracking. */
957 originname, sizeof(originname));
958 originid = replorigin_by_name(originname, false);
959
960 /*
961 * The parallel apply worker doesn't need to monopolize this replication
962 * origin which was already acquired by its leader process.
963 */
965 replorigin_session_origin = originid;
967
968 /*
969 * Setup callback for syscache so that we know when something changes in
970 * the subscription relation state.
971 */
972 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
974 (Datum) 0);
975
977
979
980 /*
981 * The parallel apply worker must not get here because the parallel apply
982 * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
983 * leader, or SIGINT from itself, or when there is an error. None of these
984 * cases will allow the code to reach here.
985 */
986 Assert(false);
987}
static void pa_shutdown(int code, Datum arg)
static void LogicalParallelApplyLoop(shm_mq_handle *mqh)
bool InitializingApplyWorker
Definition: worker.c:499
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:641
void set_apply_error_context_origin(char *originname)
Definition: worker.c:6260
void InitializeLogRepWorker(void)
Definition: worker.c:5705
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:927
dsm_segment * dsm_attach(dsm_handle h)
Definition: dsm.c:665
uint32 dsm_handle
Definition: dsm_impl.h:55
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:104
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1812
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void logicalrep_worker_attach(int slot)
Definition: launcher.c:731
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1120
#define die(msg)
Definition: pg_test_fsync.c:100
#define pqsignal
Definition: port.h:531
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
uint64_t Datum
Definition: postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
BackgroundWorker * MyBgworkerEntry
Definition: postmaster.c:200
void pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
Definition: pqmq.c:82
void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
Definition: pqmq.c:53
void * shm_toc_lookup(shm_toc *toc, uint64 key, bool noError)
Definition: shm_toc.c:232
shm_toc * shm_toc_attach(uint64 magic, void *address)
Definition: shm_toc.c:64
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
TimestampTz last_recv_time
TimestampTz reply_time
TimestampTz last_send_time
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:280
#define SIGHUP
Definition: win32_port.h:158
#define SIGUSR2
Definition: win32_port.h:171
uint16 RepOriginId
Definition: xlogdefs.h:68

References Assert(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), BackgroundWorker::bgw_extra, CacheRegisterSyscacheCallback(), CommitTransactionCommand(), DatumGetInt32(), die, dsm_attach(), dsm_segment_address(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::generation, InitializeLogRepWorker(), InitializingApplyWorker, INVALID_PROC_NUMBER, invalidate_syncing_table_states(), InvalidOid, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalParallelApplyLoop(), logicalrep_worker_attach(), ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, ParallelApplyWorkerShared::mutex, MyBgworkerEntry, MyLogicalRepWorker, MyParallelShared, MyProc, MySubscription, NAMEDATALEN, Subscription::oid, pa_shutdown(), PARALLEL_APPLY_KEY_ERROR_QUEUE, PARALLEL_APPLY_KEY_MQ, PARALLEL_APPLY_KEY_SHARED, PG_LOGICAL_APPLY_SHM_MAGIC, PointerGetDatum(), pq_redirect_to_shm_mq(), pq_set_parallel_leader(), pqsignal, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_session_origin, replorigin_session_setup(), LogicalRepWorker::reply_time, set_apply_error_context_origin(), shm_mq_attach(), shm_mq_set_receiver(), shm_mq_set_sender(), shm_toc_attach(), shm_toc_lookup(), SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGUSR2, SpinLockAcquire, SpinLockRelease, and StartTransactionCommand().

ProcessParallelApplyInterrupts()

static void ProcessParallelApplyInterrupts ( void  )
static

Definition at line 713 of file applyparallelworker.c.

714{
716
718 {
719 ereport(LOG,
720 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
722
723 proc_exit(0);
724 }
725
727 {
728 ConfigReloadPending = false;
730 }
731}
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void proc_exit(int code)
Definition: ipc.c:104

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, ereport, errmsg(), LOG, MySubscription, Subscription::name, PGC_SIGHUP, proc_exit(), ProcessConfigFile(), and ShutdownRequestPending.

Referenced by LogicalParallelApplyLoop().

ProcessParallelApplyMessage()

static void ProcessParallelApplyMessage ( StringInfo  msg )
static

Definition at line 1009 of file applyparallelworker.c.

1010{
1011 char msgtype;
1012
1013 msgtype = pq_getmsgbyte(msg);
1014
1015 switch (msgtype)
1016 {
1018 {
1019 ErrorData edata;
1020
1021 /* Parse ErrorResponse. */
1022 pq_parse_errornotice(msg, &edata);
1023
1024 /*
1025 * If desired, add a context line to show that this is a
1026 * message propagated from a parallel apply worker. Otherwise,
1027 * it can sometimes be confusing to understand what actually
1028 * happened.
1029 */
1030 if (edata.context)
1031 edata.context = psprintf("%s\n%s", edata.context,
1032 _("logical replication parallel apply worker"));
1033 else
1034 edata.context = pstrdup(_("logical replication parallel apply worker"));
1035
1036 /*
1037 * Context beyond that should use the error context callbacks
1038 * that were in effect in LogicalRepApplyLoop().
1039 */
1041
1042 /*
1043 * The actual error must have been reported by the parallel
1044 * apply worker.
1045 */
1046 ereport(ERROR,
1047 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1048 errmsg("logical replication parallel apply worker exited due to error"),
1049 errcontext("%s", edata.context)));
1050 }
1051
1052 /*
1053 * Don't need to do anything about NoticeResponse and
1054 * NotificationResponse as the logical replication worker doesn't
1055 * need to send messages to the client.
1056 */
1059 break;
1060
1061 default:
1062 elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1063 msgtype, msg->len);
1064 }
1065}
ErrorContextCallback * apply_error_context_stack
Definition: worker.c:469
_
#define _(x)
Definition: elog.c:91
#define errcontext
Definition: elog.h:198
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
Definition: pqmq.c:222
#define PqMsg_NotificationResponse
Definition: protocol.h:41
#define PqMsg_ErrorResponse
Definition: protocol.h:44
#define PqMsg_NoticeResponse
Definition: protocol.h:49
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
Definition: elog.h:420
char * context
Definition: elog.h:436

References _, apply_error_context_stack, ErrorData::context, elog, ereport, errcode(), errcontext, errmsg(), ERROR, error_context_stack, StringInfoData::len, pq_getmsgbyte(), pq_parse_errornotice(), PqMsg_ErrorResponse, PqMsg_NoticeResponse, PqMsg_NotificationResponse, psprintf(), and pstrdup().

Referenced by ProcessParallelApplyMessages().

ProcessParallelApplyMessages()

void ProcessParallelApplyMessages ( void  )

Definition at line 1071 of file applyparallelworker.c.

1072{
1073 ListCell *lc;
1074 MemoryContext oldcontext;
1075
1076 static MemoryContext hpam_context = NULL;
1077
1078 /*
1079 * This is invoked from ProcessInterrupts(), and since some of the
1080 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1081 * for recursive calls if more signals are received while this runs. It's
1082 * unclear that recursive entry would be safe, and it doesn't seem useful
1083 * even if it is safe, so let's block interrupts until done.
1084 */
1086
1087 /*
1088 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1089 * don't want to risk leaking data into long-lived contexts, so let's do
1090 * our work here in a private context that we can reset on each use.
1091 */
1092 if (!hpam_context) /* first time through? */
1094 "ProcessParallelApplyMessages",
1096 else
1097 MemoryContextReset(hpam_context);
1098
1099 oldcontext = MemoryContextSwitchTo(hpam_context);
1100
1102
1103 foreach(lc, ParallelApplyWorkerPool)
1104 {
1105 shm_mq_result res;
1106 Size nbytes;
1107 void *data;
1109
1110 /*
1111 * The leader will detach from the error queue and set it to NULL
1112 * before preparing to stop all parallel apply workers, so we don't
1113 * need to handle error messages anymore. See
1114 * logicalrep_worker_detach.
1115 */
1116 if (!winfo->error_mq_handle)
1117 continue;
1118
1119 res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1120
1121 if (res == SHM_MQ_WOULD_BLOCK)
1122 continue;
1123 else if (res == SHM_MQ_SUCCESS)
1124 {
1125 StringInfoData msg;
1126
1127 initStringInfo(&msg);
1128 appendBinaryStringInfo(&msg, data, nbytes);
1130 pfree(msg.data);
1131 }
1132 else
1133 ereport(ERROR,
1134 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1135 errmsg("lost connection to the logical replication parallel apply worker")));
1136 }
1137
1138 MemoryContextSwitchTo(oldcontext);
1139
1140 /* Might as well clear the context on our way out */
1141 MemoryContextReset(hpam_context);
1142
1144}
static void ProcessParallelApplyMessage(StringInfo msg)
MemoryContext TopMemoryContext
Definition: mcxt.c:166
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:281
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
char * data
Definition: stringinfo.h:48

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, appendBinaryStringInfo(), StringInfoData::data, data, ereport, errcode(), errmsg(), ERROR, ParallelApplyWorkerInfo::error_mq_handle, HOLD_INTERRUPTS, initStringInfo(), lfirst, MemoryContextReset(), MemoryContextSwitchTo(), ParallelApplyMessagePending, ParallelApplyWorkerPool, pfree(), ProcessParallelApplyMessage(), RESUME_INTERRUPTS, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, and TopMemoryContext.

Referenced by ProcessInterrupts().

Variable Documentation

MyParallelShared

ParallelApplyWorkerShared* MyParallelShared = NULL

Definition at line 239 of file applyparallelworker.c.

Referenced by apply_handle_stream_commit(), apply_handle_stream_prepare(), apply_handle_stream_start(), pa_decr_and_wait_stream_block(), pa_get_fileset_state(), pa_process_spooled_messages_if_required(), pa_stream_abort(), and ParallelApplyWorkerMain().

ParallelApplyMessagePending

volatile sig_atomic_t ParallelApplyMessagePending = false

Definition at line 245 of file applyparallelworker.c.

Referenced by HandleParallelApplyMessageInterrupt(), ProcessInterrupts(), and ProcessParallelApplyMessages().

ParallelApplyTxnHash

HTAB* ParallelApplyTxnHash = NULL
static

Definition at line 225 of file applyparallelworker.c.

Referenced by pa_allocate_worker(), pa_find_worker(), and pa_free_worker().

ParallelApplyWorkerPool

List* ParallelApplyWorkerPool = NIL
static

Definition at line 234 of file applyparallelworker.c.

Referenced by pa_detach_all_error_mq(), pa_free_worker(), pa_free_worker_info(), pa_launch_parallel_worker(), and ProcessParallelApplyMessages().

stream_apply_worker

ParallelApplyWorkerInfo* stream_apply_worker = NULL
static

Definition at line 252 of file applyparallelworker.c.

Referenced by pa_find_worker(), and pa_set_stream_apply_worker().

subxactlist

List* subxactlist = NIL
static

Definition at line 255 of file applyparallelworker.c.

Referenced by pa_reset_subtrans(), pa_start_subtrans(), and pa_stream_abort().

AltStyle によって変換されたページ (->オリジナル) /