Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit ee03666

Browse files
Merge pull request #136 from arssher/foreign_copy_from
Foreign copy from for pg_shardman (PGPRO)
2 parents c963933 + d7520bb commit ee03666

File tree

3 files changed

+121
-78
lines changed

3 files changed

+121
-78
lines changed

‎src/include/partition_filter.h‎

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,15 @@ typedef struct
4343
} ResultRelInfoHolder;
4444

4545

46-
/* Forward declaration (for on_new_rri_holder()) */
46+
/* Forward declaration (for on_rri_holder()) */
4747
struct ResultPartsStorage;
4848
typedef struct ResultPartsStorage ResultPartsStorage;
4949

5050
/*
51-
* Callback to be fired at rri_holder creation.
51+
* Callback to be fired at rri_holder creation/destruction.
5252
*/
53-
typedef void (*on_new_rri_holder)(EState *estate,
54-
ResultRelInfoHolder *rri_holder,
55-
const ResultPartsStorage *rps_storage,
56-
void *arg);
53+
typedef void (*on_rri_holder)(ResultRelInfoHolder *rri_holder,
54+
const ResultPartsStorage *rps_storage);
5755

5856
/*
5957
* Cached ResultRelInfos of partitions.
@@ -66,7 +64,7 @@ struct ResultPartsStorage
6664

6765
bool speculative_inserts; /* for ExecOpenIndices() */
6866

69-
on_new_rri_holder on_new_rri_holder_callback;
67+
on_rri_holder on_new_rri_holder_callback;
7068
void *callback_arg;
7169

7270
EState *estate; /* pointer to executor's state */
@@ -116,11 +114,11 @@ void init_result_parts_storage(ResultPartsStorage *parts_storage,
116114
EState *estate,
117115
bool speculative_inserts,
118116
Size table_entry_size,
119-
on_new_rri_holder on_new_rri_holder_cb,
117+
on_rri_holder on_new_rri_holder_cb,
120118
void *on_new_rri_holder_cb_arg);
121119

122120
void fini_result_parts_storage(ResultPartsStorage *parts_storage,
123-
bool close_rels);
121+
bool close_rels, on_rri_holderhook);
124122

125123
ResultRelInfoHolder * scan_result_parts_storage(Oid partid,
126124
ResultPartsStorage *storage);

‎src/partition_filter.c‎

Lines changed: 37 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,12 @@ CustomScanMethods partition_filter_plan_methods;
6868
CustomExecMethods partition_filter_exec_methods;
6969

7070

71-
static void prepare_rri_for_insert(EState *estate,
72-
ResultRelInfoHolder *rri_holder,
73-
const ResultPartsStorage *rps_storage,
74-
void *arg);
75-
static void prepare_rri_returning_for_insert(EState *estate,
76-
ResultRelInfoHolder *rri_holder,
77-
const ResultPartsStorage *rps_storage,
78-
void *arg);
79-
static void prepare_rri_fdw_for_insert(EState *estate,
80-
ResultRelInfoHolder *rri_holder,
81-
const ResultPartsStorage *rps_storage,
82-
void *arg);
71+
static void prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
72+
const ResultPartsStorage *rps_storage);
73+
static void prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
74+
const ResultPartsStorage *rps_storage);
75+
static void prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
76+
const ResultPartsStorage *rps_storage);
8377
static Node *fix_returning_list_mutator(Node *node, void *state);
8478

8579
static Index append_rte_to_estate(EState *estate, RangeTblEntry *rte);
@@ -143,7 +137,7 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
143137
EState *estate,
144138
bool speculative_inserts,
145139
Size table_entry_size,
146-
on_new_rri_holder on_new_rri_holder_cb,
140+
on_rri_holder on_new_rri_holder_cb,
147141
void *on_new_rri_holder_cb_arg)
148142
{
149143
HASHCTL *result_rels_table_config = &parts_storage->result_rels_table_config;
@@ -177,16 +171,21 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
177171

178172
/* Free ResultPartsStorage (close relations etc) */
179173
void
180-
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
174+
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels,
175+
on_rri_holder hook)
181176
{
182177
HASH_SEQ_STATUS stat;
183178
ResultRelInfoHolder *rri_holder; /* ResultRelInfo holder */
184179

185-
/* Close partitions and free free conversion-related stuff */
186-
if (close_rels)
180+
hash_seq_init(&stat, parts_storage->result_rels_table);
181+
while ((rri_holder= (ResultRelInfoHolder*) hash_seq_search(&stat)) !=NULL)
187182
{
188-
hash_seq_init(&stat, parts_storage->result_rels_table);
189-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
183+
/* Call destruction hook, if needed */
184+
if (hook != NULL)
185+
hook(rri_holder, parts_storage);
186+
187+
/* Close partitions and free free conversion-related stuff */
188+
if (close_rels)
190189
{
191190
ExecCloseIndices(rri_holder->result_rel_info);
192191

@@ -202,13 +201,8 @@ fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
202201

203202
free_conversion_map(rri_holder->tuple_map);
204203
}
205-
}
206-
207-
/* Else just free conversion-related stuff */
208-
else
209-
{
210-
hash_seq_init(&stat, parts_storage->result_rels_table);
211-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
204+
/* Else just free conversion-related stuff */
205+
else
212206
{
213207
/* Skip if there's no map */
214208
if (!rri_holder->tuple_map)
@@ -329,10 +323,8 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
329323

330324
/* Call on_new_rri_holder_callback() if needed */
331325
if (parts_storage->on_new_rri_holder_callback)
332-
parts_storage->on_new_rri_holder_callback(parts_storage->estate,
333-
rri_holder,
334-
parts_storage,
335-
parts_storage->callback_arg);
326+
parts_storage->on_new_rri_holder_callback(rri_holder,
327+
parts_storage);
336328

337329
/* Finally append ResultRelInfo to storage->es_alloc_result_rels */
338330
append_rri_to_estate(parts_storage->estate, child_result_rel_info);
@@ -702,7 +694,7 @@ partition_filter_end(CustomScanState *node)
702694
PartitionFilterState *state = (PartitionFilterState *) node;
703695

704696
/* Executor will close rels via estate->es_result_relations */
705-
fini_result_parts_storage(&state->result_parts, false);
697+
fini_result_parts_storage(&state->result_parts, false, NULL);
706698

707699
Assert(list_length(node->custom_ps) == 1);
708700
ExecEndNode((PlanState *) linitial(node->custom_ps));
@@ -793,34 +785,33 @@ pfilter_build_tlist(Relation parent_rel, List *tlist)
793785

794786
/* Main trigger */
795787
static void
796-
prepare_rri_for_insert(EState *estate,
797-
ResultRelInfoHolder *rri_holder,
798-
const ResultPartsStorage *rps_storage,
799-
void *arg)
788+
prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
789+
const ResultPartsStorage *rps_storage)
800790
{
801-
prepare_rri_returning_for_insert(estate, rri_holder, rps_storage, arg);
802-
prepare_rri_fdw_for_insert(estate, rri_holder, rps_storage, arg);
791+
prepare_rri_returning_for_insert(rri_holder, rps_storage);
792+
prepare_rri_fdw_for_insert(rri_holder, rps_storage);
803793
}
804794

805795
/* Prepare 'RETURNING *' tlist & projection */
806796
static void
807-
prepare_rri_returning_for_insert(EState *estate,
808-
ResultRelInfoHolder *rri_holder,
809-
const ResultPartsStorage *rps_storage,
810-
void *arg)
797+
prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
798+
const ResultPartsStorage *rps_storage)
811799
{
812800
PartitionFilterState *pfstate;
813801
List *returning_list;
814802
ResultRelInfo *child_rri,
815803
*parent_rri;
816804
Index parent_rt_idx;
817805
TupleTableSlot *result_slot;
806+
EState *estate;
807+
808+
estate = rps_storage->estate;
818809

819810
/* We don't need to do anything ff there's no map */
820811
if (!rri_holder->tuple_map)
821812
return;
822813

823-
pfstate = (PartitionFilterState *) arg;
814+
pfstate = (PartitionFilterState *) rps_storage->callback_arg;
824815
returning_list = pfstate->returning_list;
825816

826817
/* Exit if there's no RETURNING list */
@@ -857,14 +848,15 @@ prepare_rri_returning_for_insert(EState *estate,
857848

858849
/* Prepare FDW access structs */
859850
static void
860-
prepare_rri_fdw_for_insert(EState *estate,
861-
ResultRelInfoHolder *rri_holder,
862-
const ResultPartsStorage *rps_storage,
863-
void *arg)
851+
prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
852+
const ResultPartsStorage *rps_storage)
864853
{
865854
ResultRelInfo *rri = rri_holder->result_rel_info;
866855
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
867856
Oid partid;
857+
EState *estate;
858+
859+
estate = rps_storage->estate;
868860

869861
/* Nothing to do if not FDW */
870862
if (fdw_routine == NULL)

‎src/utility_stmt_hooking.c‎

Lines changed: 77 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "access/xact.h"
2323
#include "catalog/namespace.h"
2424
#include "commands/copy.h"
25+
#include "commands/defrem.h"
2526
#include "commands/trigger.h"
2627
#include "commands/tablecmds.h"
2728
#include "foreign/fdwapi.h"
@@ -64,10 +65,10 @@ static uint64 PathmanCopyFrom(CopyState cstate,
6465
List *range_table,
6566
bool old_protocol);
6667

67-
static void prepare_rri_for_copy(EState*estate,
68-
ResultRelInfoHolder*rri_holder,
69-
constResultPartsStorage*rps_storage,
70-
void*arg);
68+
static void prepare_rri_for_copy(ResultRelInfoHolder*rri_holder,
69+
constResultPartsStorage*rps_storage);
70+
staticvoidfinish_rri_copy(ResultRelInfoHolder*rri_holder,
71+
constResultPartsStorage*rps_storage);
7172

7273

7374
/*
@@ -110,12 +111,18 @@ is_pathman_related_copy(Node *parsetree)
110111
/* Analyze options list */
111112
foreach (lc, copy_stmt->options)
112113
{
113-
DefElem *defel = (DefElem *) lfirst(lc);
114-
115-
Assert(IsA(defel, DefElem));
114+
DefElem *defel = lfirst_node(DefElem, lc);
116115

117116
/* We do not support freeze */
118-
if (strcmp(defel->defname, "freeze") == 0)
117+
/*
118+
* It would be great to allow copy.c extract option value and
119+
* check it ready. However, there is no possibility (hooks) to do
120+
* that before messaging 'ok, begin streaming data' to the client,
121+
* which is ugly and confusing: e.g. it would require us to
122+
* actually send something in regression tests before we notice
123+
* the error.
124+
*/
125+
if (strcmp(defel->defname, "freeze") == 0 && defGetBoolean(defel))
119126
elog(ERROR, "freeze is not supported for partitioned tables");
120127
}
121128

@@ -481,7 +488,6 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
481488

482489
uint64 processed = 0;
483490

484-
485491
tupDesc = RelationGetDescr(parent_rel);
486492

487493
parent_result_rel = makeNode(ResultRelInfo);
@@ -499,7 +505,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
499505
/* Initialize ResultPartsStorage */
500506
init_result_parts_storage(&parts_storage, estate, false,
501507
ResultPartsStorageStandard,
502-
prepare_rri_for_copy, NULL);
508+
prepare_rri_for_copy, cstate);
503509
parts_storage.saved_rel_info = parent_result_rel;
504510

505511
/* Set up a tuple slot too */
@@ -634,13 +640,22 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
634640
/* Check the constraints of the tuple */
635641
if (child_result_rel->ri_RelationDesc->rd_att->constr)
636642
ExecConstraints(child_result_rel, slot, estate);
643+
if (!child_result_rel->ri_FdwRoutine)
644+
{
645+
/* OK, store the tuple and create index entries for it */
646+
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);
637647

638-
/* OK, store the tuple and create index entries for it */
639-
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);
640-
641-
if (child_result_rel->ri_NumIndices > 0)
642-
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
643-
estate, false, NULL, NIL);
648+
if (child_result_rel->ri_NumIndices > 0)
649+
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
650+
estate, false, NULL, NIL);
651+
}
652+
#ifdef PG_SHARDMAN
653+
else /* FDW table */
654+
{
655+
child_result_rel->ri_FdwRoutine->ForeignNextCopyFrom(
656+
estate, child_result_rel, cstate);
657+
}
658+
#endif
644659

645660
/* AFTER ROW INSERT Triggers (FIXME: NULL transition) */
646661
ExecARInsertTriggersCompat(estate, child_result_rel, tuple,
@@ -678,7 +693,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
678693
ExecResetTupleTable(estate->es_tupleTable, false);
679694

680695
/* Close partitions and destroy hash table */
681-
fini_result_parts_storage(&parts_storage, true);
696+
fini_result_parts_storage(&parts_storage, true, finish_rri_copy);
682697

683698
/* Close parent's indices */
684699
ExecCloseIndices(parent_result_rel);
@@ -689,20 +704,58 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
689704
}
690705

691706
/*
692-
* COPY FROM does not support FDWs, emit ERROR.
707+
* Init COPY FROM, if supported.
693708
*/
694709
static void
695-
prepare_rri_for_copy(EState *estate,
696-
ResultRelInfoHolder *rri_holder,
697-
const ResultPartsStorage *rps_storage,
698-
void *arg)
710+
prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
711+
const ResultPartsStorage *rps_storage)
699712
{
700-
ResultRelInfo*rri = rri_holder->result_rel_info;
701-
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
713+
ResultRelInfo*rri = rri_holder->result_rel_info;
714+
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
702715

703716
if (fdw_routine != NULL)
717+
{
718+
/*
719+
* If this Postgres has no idea about shardman, behave as usual:
720+
* vanilla Postgres doesn't support COPY FROM to foreign partitions.
721+
* However, shardman patches to core extend FDW API to allow it.
722+
*/
723+
#ifdef PG_SHARDMAN
724+
/* shardman COPY FROM requested? */
725+
if (*find_rendezvous_variable(
726+
"shardman_pathman_copy_from_rendezvous") != NULL &&
727+
FdwCopyFromIsSupported(fdw_routine))
728+
{
729+
CopyState cstate = (CopyState) rps_storage->callback_arg;
730+
ResultRelInfo *parent_rri = rps_storage->saved_rel_info;
731+
EState *estate = rps_storage->estate;
732+
733+
fdw_routine->BeginForeignCopyFrom(estate, rri, cstate, parent_rri);
734+
return;
735+
}
736+
#endif
737+
704738
elog(ERROR, "cannot copy to foreign partition \"%s\"",
705739
get_rel_name(RelationGetRelid(rri->ri_RelationDesc)));
740+
}
741+
}
742+
743+
/*
744+
* Shut down FDWs.
745+
*/
746+
static void
747+
finish_rri_copy(ResultRelInfoHolder *rri_holder,
748+
const ResultPartsStorage *rps_storage)
749+
{
750+
#ifdef PG_SHARDMAN
751+
ResultRelInfo *resultRelInfo = rri_holder->result_rel_info;
752+
753+
if (resultRelInfo->ri_FdwRoutine)
754+
{
755+
resultRelInfo->ri_FdwRoutine->EndForeignCopyFrom(
756+
rps_storage->estate, resultRelInfo);
757+
}
758+
#endif
706759
}
707760

708761
/*

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /