diff --git a/Makefile b/Makefile index 30d5967a..ff9d7af8 100644 --- a/Makefile +++ b/Makefile @@ -2,11 +2,13 @@ EXTENSION = aqo EXTVERSION = 1.2 -PGFILEDESC = "AQO - adaptive query optimization" -MODULES = aqo +PGFILEDESC = "AQO - Adaptive Query Optimization" +MODULE_big = aqo OBJS = aqo.o auto_tuning.o cardinality_estimation.o cardinality_hooks.o \ hash.o machine_learning.o path_utils.o postprocessing.o preprocessing.o \ -selectivity_cache.o storage.o utils.o $(WIN32RES) +selectivity_cache.o storage.o utils.o ignorance.o $(WIN32RES) + +TAP_TESTS = 1 REGRESS = aqo_disabled \ aqo_controlled \ @@ -14,13 +16,17 @@ REGRESS = aqo_disabled \ aqo_forced \ aqo_learn \ schema \ - aqo_CVE-2020-14350 + aqo_fdw \ + aqo_CVE-2020-14350 \ + gucs +fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw +PG_CPPFLAGS += -I$(libpq_srcdir) -I$(fdw_srcdir) EXTRA_REGRESS_OPTS=--temp-config=$(top_srcdir)/$(subdir)/conf.add +EXTRA_INSTALL = contrib/postgres_fdw DATA = aqo--1.0.sql aqo--1.0--1.1.sql aqo--1.1--1.2.sql aqo--1.2.sql -MODULE_big = aqo ifdef USE_PGXS PG_CONFIG ?= pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 1b5284dd..45ea1072 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,11 @@ complicated queries. ## Installation The module works with PostgreSQL 9.6 and above. +To avoid compatibility issues, the following branches in the git-repository are allocated: +* `stable9_6`. +* `stable11` - for PG v10 and v11. +* `stable12` - for PG v12. +* the `master` branch of the AQO repository correctly works with PGv13 and the PostgreSQL `master` branch. The module contains a patch and an extension. Patch has to be applied to the sources of PostgresSQL. Patch affects header files, that is why PostgreSQL @@ -19,7 +24,7 @@ installed with `make install`. ``` cd postgresql-9.6 # enter postgresql source directory -git clone https://github.com/tigvarts/aqo.git contrib/aqo # clone aqo into contrib +git clone https://github.com/postgrespro/aqo.git contrib/aqo # clone aqo into contrib patch -p1 --no-backup-if-mismatch < contrib/aqo/aqo_pg.patch # patch postgresql make clean && make && make install # recompile postgresql cd contrib/aqo # enter aqo directory @@ -28,7 +33,7 @@ make check # check whether it works ``` Tag `version` at the patch name corresponds to suitable PostgreSQL release. -For PostgreSQL 10 use aqo_pg10.patch; for PostgreSQL 11 use aqo_pg11.patch and so on. +For PostgreSQL 9.6 use the 'aqo_pg9_6.patch' file; PostgreSQL 10 use aqo_pg10.patch; for PostgreSQL 11 use aqo_pg11.patch and so on. Also, you can see git tags at the master branch for more accurate definition of suitable PostgreSQL version. @@ -50,7 +55,7 @@ of per-database. The typical case is follows: you have complicated query, which executes too long. `EXPLAIN ANALYZE` shows, that the possible reason is bad cardinality -estimnation. +estimation. Example: ``` @@ -127,16 +132,16 @@ When the plan stops changing, you can often observe performance improvement: (23 rows) ``` -The settings system in AQO works with normalized queries, i. e. queries with -removed constants. For example, the normalized version of +The settings system in AQO works with normalised queries, i. e. queries with +removed constants. For example, the normalised version of `SELECT * FROM tbl WHERE a < 25 AND b = 'str';` is `SELECT * FROM tbl WHERE a < CONST and b = CONST;` -So the queries have equal normalization if and only if they differ only +So the queries have equal normalisation if and only if they differ only in their constants. -Each normalized query has its own hash. The correspondence between normalized +Each normalised query has its own hash. The correspondence between normalised query hash and query text is stored in aqo_query_texts table: ``` SELECT * FROM aqo_query_texts; @@ -174,6 +179,10 @@ if the data tends to change significantly), you can do `UPDATE SET aqo_learn=false WHERE query_hash = ;` before commit. +The extension includes two GUC's to display the executed cardinality predictions for a query. +The `aqo.show_details = 'on'` (default - off) allows to see the aqo cardinality prediction results for each node of a query plan and an AQO summary. +The `aqo.show_hash = 'on'` (default - off) will print hash signature for each plan node and overall query. It is system-specific information and should be used for situational analysis. + The more detailed reference of AQO settings mechanism is available further. ## Advanced tuning diff --git a/aqo.c b/aqo.c index 4f0eac87..b35bc2fc 100644 --- a/aqo.c +++ b/aqo.c @@ -2,23 +2,43 @@ * aqo.c * Adaptive query optimization extension * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/aqo.c */ #include "aqo.h" +#include "ignorance.h" + +#include "access/relation.h" +#include "access/table.h" +#include "catalog/pg_extension.h" +#include "commands/extension.h" PG_MODULE_MAGIC; void _PG_init(void); +#define AQO_MODULE_MAGIC (1234) /* Strategy of determining feature space for new queries. */ int aqo_mode; bool force_collect_stat; +/* + * Show special info in EXPLAIN mode. + * + * aqo_show_hash - show query class (hash) and a feature space value (hash) + * of each plan node. This is instance-dependent value and can't be used + * in regression and TAP tests. + * + * aqo_show_details - show AQO settings for this class and prediction + * for each plan node. + */ +bool aqo_show_hash; +bool aqo_show_details; + /* GUC variables */ static const struct config_enum_entry format_options[] = { {"intelligent", AQO_MODE_INTELLIGENT, false}, @@ -76,12 +96,14 @@ post_parse_analyze_hook_type prev_post_parse_analyze_hook; planner_hook_type prev_planner_hook; ExecutorStart_hook_type prev_ExecutorStart_hook; ExecutorEnd_hook_type prev_ExecutorEnd_hook; +set_baserel_rows_estimate_hook_type prev_set_foreign_rows_estimate_hook; set_baserel_rows_estimate_hook_type prev_set_baserel_rows_estimate_hook; get_parameterized_baserel_size_hook_type prev_get_parameterized_baserel_size_hook; set_joinrel_size_estimates_hook_type prev_set_joinrel_size_estimates_hook; get_parameterized_joinrel_size_hook_type prev_get_parameterized_joinrel_size_hook; copy_generic_path_info_hook_type prev_copy_generic_path_info_hook; ExplainOnePlan_hook_type prev_ExplainOnePlan_hook; +ExplainOneNode_hook_type prev_ExplainOneNode_hook; /***************************************************************************** * @@ -102,7 +124,8 @@ _PG_init(void) 0, NULL, NULL, - NULL); + NULL + ); DefineCustomBoolVariable( "aqo.force_collect_stat", @@ -115,7 +138,46 @@ _PG_init(void) NULL, NULL, NULL - ); + ); + + DefineCustomBoolVariable( + "aqo.show_hash", + "Show query and node hash on explain.", + "Hash value depend on each instance and is not good to enable it in regression or TAP tests.", + &aqo_show_hash, + false, + PGC_USERSET, + 0, + NULL, + NULL, + NULL + ); + + DefineCustomBoolVariable( + "aqo.show_details", + "Show AQO state on a query.", + NULL, + &aqo_show_details, + false, + PGC_USERSET, + 0, + NULL, + NULL, + NULL + ); + + DefineCustomBoolVariable( + "aqo.log_ignorance", + "Log in a special table all feature spaces for which the AQO prediction was not successful.", + NULL, + &aqo_log_ignorance, + false, + PGC_SUSET, + 0, + NULL, + set_ignorance, + NULL + ); prev_planner_hook = planner_hook; planner_hook = aqo_planner; @@ -126,6 +188,7 @@ _PG_init(void) prev_ExecutorEnd_hook = ExecutorEnd_hook; ExecutorEnd_hook = aqo_ExecutorEnd; prev_set_baserel_rows_estimate_hook = set_baserel_rows_estimate_hook; + set_foreign_rows_estimate_hook = aqo_set_baserel_rows_estimate; set_baserel_rows_estimate_hook = aqo_set_baserel_rows_estimate; prev_get_parameterized_baserel_size_hook = get_parameterized_baserel_size_hook; get_parameterized_baserel_size_hook = aqo_get_parameterized_baserel_size; @@ -137,6 +200,8 @@ _PG_init(void) copy_generic_path_info_hook = aqo_copy_generic_path_info; prev_ExplainOnePlan_hook = ExplainOnePlan_hook; ExplainOnePlan_hook = print_into_explain; + prev_ExplainOneNode_hook = ExplainOneNode_hook; + ExplainOneNode_hook = print_node_explain; parampathinfo_postinit_hook = ppi_hook; init_deactivated_queries_storage(); @@ -158,3 +223,63 @@ invalidate_deactivated_queries_cache(PG_FUNCTION_ARGS) init_deactivated_queries_storage(); PG_RETURN_POINTER(NULL); } + +/* + * Return AQO schema's Oid or InvalidOid if that's not possible. + */ +Oid +get_aqo_schema(void) +{ + Oid result; + Relation rel; + SysScanDesc scandesc; + HeapTuple tuple; + ScanKeyData entry[1]; + Oid ext_oid; + + /* It's impossible to fetch pg_aqo's schema now */ + if (!IsTransactionState()) + return InvalidOid; + + ext_oid = get_extension_oid("aqo", true); + if (ext_oid == InvalidOid) + return InvalidOid; /* exit if pg_aqo does not exist */ + + ScanKeyInit(&entry[0], +#if PG_VERSION_NUM>= 120000 + Anum_pg_extension_oid, +#else + ObjectIdAttributeNumber, +#endif + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(ext_oid)); + + rel = relation_open(ExtensionRelationId, AccessShareLock); + scandesc = systable_beginscan(rel, ExtensionOidIndexId, true, + NULL, 1, entry); + tuple = systable_getnext(scandesc); + + /* We assume that there can be at most one matching tuple */ + if (HeapTupleIsValid(tuple)) + result = ((Form_pg_extension) GETSTRUCT(tuple))->extnamespace; + else + result = InvalidOid; + + systable_endscan(scandesc); + relation_close(rel, AccessShareLock); + return result; +} + +/* + * Init userlock + */ +void +init_lock_tag(LOCKTAG *tag, uint32 key1, uint32 key2) +{ + tag->locktag_field1 = AQO_MODULE_MAGIC; + tag->locktag_field2 = key1; + tag->locktag_field3 = key2; + tag->locktag_field4 = 0; + tag->locktag_type = LOCKTAG_USERLOCK; + tag->locktag_lockmethodid = USER_LOCKMETHOD; +} diff --git a/aqo.h b/aqo.h index 080d076b..1b37a3a7 100644 --- a/aqo.h +++ b/aqo.h @@ -105,7 +105,7 @@ * Module storage.c is responsible for storage query settings and models * (i. e. all information which is used in extension). * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/aqo.h @@ -174,6 +174,8 @@ typedef enum extern int aqo_mode; extern bool force_collect_stat; +extern bool aqo_show_hash; +extern bool aqo_show_details; /* * It is mostly needed for auto tuning of query. with auto tuning mode aqo @@ -253,6 +255,8 @@ extern post_parse_analyze_hook_type prev_post_parse_analyze_hook; extern planner_hook_type prev_planner_hook; extern ExecutorStart_hook_type prev_ExecutorStart_hook; extern ExecutorEnd_hook_type prev_ExecutorEnd_hook; +extern set_baserel_rows_estimate_hook_type + prev_set_foreign_rows_estimate_hook; extern set_baserel_rows_estimate_hook_type prev_set_baserel_rows_estimate_hook; extern get_parameterized_baserel_size_hook_type @@ -263,6 +267,7 @@ extern get_parameterized_joinrel_size_hook_type prev_get_parameterized_joinrel_size_hook; extern copy_generic_path_info_hook_type prev_copy_generic_path_info_hook; extern ExplainOnePlan_hook_type prev_ExplainOnePlan_hook; +extern ExplainOneNode_hook_type prev_ExplainOneNode_hook; extern void ppi_hook(ParamPathInfo *ppi); @@ -277,40 +282,42 @@ int get_clause_hash(Expr *clause, int nargs, int *args_hash, int *eclass_hash); /* Storage interaction */ -bool find_query(int query_hash, - Datum *search_values, - bool *search_nulls); -bool add_query(int query_hash, bool learn_aqo, bool use_aqo, - int fspace_hash, bool auto_tuning); -bool update_query(int query_hash, bool learn_aqo, bool use_aqo, - int fspace_hash, bool auto_tuning); -bool add_query_text(int query_hash, const char *query_text); -bool load_fss(int fss_hash, int ncols, - double **matrix, double *targets, int *rows); -extern bool update_fss(int fss_hash, int nrows, int ncols, +extern bool find_query(int qhash, Datum *search_values, bool *search_nulls); +extern bool update_query(int qhash, int fhash, + bool learn_aqo, bool use_aqo, bool auto_tuning); +extern bool add_query_text(int query_hash, const char *query_text); +extern bool load_fss(int fhash, int fss_hash, + int ncols, double **matrix, double *targets, int *rows); +extern bool update_fss(int fhash, int fss_hash, int nrows, int ncols, double **matrix, double *targets); QueryStat *get_aqo_stat(int query_hash); void update_aqo_stat(int query_hash, QueryStat * stat); +extern bool my_index_insert(Relation indexRelation, Datum *values, bool *isnull, + ItemPointer heap_t_ctid, Relation heapRelation, + IndexUniqueCheck checkUnique); void init_deactivated_queries_storage(void); void fini_deactivated_queries_storage(void); bool query_is_deactivated(int query_hash); void add_deactivated_query(int query_hash); /* Query preprocessing hooks */ -void get_query_text(ParseState *pstate, Query *query); -PlannedStmt *call_default_planner(Query *parse, - const char *query_string, - int cursorOptions, - ParamListInfo boundParams); -PlannedStmt *aqo_planner(Query *parse, - const char *query_string, - int cursorOptions, - ParamListInfo boundParams); -void print_into_explain(PlannedStmt *plannedstmt, IntoClause *into, - ExplainState *es, const char *queryString, - ParamListInfo params, const instr_time *planduration, - QueryEnvironment *queryEnv); -void disable_aqo_for_query(void); +extern void get_query_text(ParseState *pstate, Query *query); +extern PlannedStmt *call_default_planner(Query *parse, + const char *query_string, + int cursorOptions, + ParamListInfo boundParams); +extern PlannedStmt *aqo_planner(Query *parse, + const char *query_string, + int cursorOptions, + ParamListInfo boundParams); +extern void print_into_explain(PlannedStmt *plannedstmt, IntoClause *into, + ExplainState *es, const char *queryString, + ParamListInfo params, + const instr_time *planduration, + QueryEnvironment *queryEnv); +extern void print_node_explain(ExplainState *es, PlanState *ps, Plan *plan, + double rows); +extern void disable_aqo_for_query(void); /* Cardinality estimation hooks */ extern void aqo_set_baserel_rows_estimate(PlannerInfo *root, RelOptInfo *rel); @@ -356,7 +363,7 @@ extern int OkNNr_learn(int matrix_rows, int matrix_cols, double *features, double target); /* Automatic query tuning */ -void automatical_query_tuning(int query_hash, QueryStat * stat); +extern void automatical_query_tuning(int query_hash, QueryStat * stat); /* Utilities */ int int_cmp(const void *a, const void *b); @@ -368,11 +375,13 @@ QueryStat *palloc_query_stat(void); void pfree_query_stat(QueryStat *stat); /* Selectivity cache for parametrized baserels */ -void cache_selectivity(int clause_hash, - int relid, - int global_relid, - double selectivity); -double *selectivity_cache_find_global_relid(int clause_hash, int global_relid); -void selectivity_cache_clear(void); +extern void cache_selectivity(int clause_hash, int relid, int global_relid, + double selectivity); +extern double *selectivity_cache_find_global_relid(int clause_hash, + int global_relid); +extern void selectivity_cache_clear(void); + +extern Oid get_aqo_schema(void); +extern void init_lock_tag(LOCKTAG *tag, uint32 key1, uint32 key2); #endif diff --git a/aqo_pg13.patch b/aqo_pg13.patch index b933ca49..1c30cadc 100644 --- a/aqo_pg13.patch +++ b/aqo_pg13.patch @@ -1,5 +1,5 @@ diff --git a/contrib/Makefile b/contrib/Makefile -index 7a4866e338..47a18b9698 100644 +index 1846d415b6..95519ac11d 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -7,6 +7,7 @@ include $(top_builddir)/src/Makefile.global @@ -11,7 +11,7 @@ index 7a4866e338..47a18b9698 100644 auto_explain \ bloom \ diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c -index 43f9b01e83..707211308c 100644 +index 0ad49612d2..7c0b82bde7 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -24,6 +24,7 @@ @@ -22,17 +22,20 @@ index 43f9b01e83..707211308c 100644 #include "parser/parsetree.h" #include "rewrite/rewriteHandler.h" #include "storage/bufmgr.h" -@@ -46,6 +47,9 @@ ExplainOneQuery_hook_type ExplainOneQuery_hook = NULL; +@@ -46,6 +47,12 @@ ExplainOneQuery_hook_type ExplainOneQuery_hook = NULL; /* Hook for plugins to get control in explain_get_index_name() */ explain_get_index_name_hook_type explain_get_index_name_hook = NULL; +/* Hook for plugins to get control in ExplainOnePlan() */ +ExplainOnePlan_hook_type ExplainOnePlan_hook = NULL; ++ ++/* Hook for plugins to get control in ExplainOnePlan() */ ++ExplainOneNode_hook_type ExplainOneNode_hook = NULL; + /* OR-able flags for ExplainXMLTag() */ #define X_OPENING 0 -@@ -638,6 +642,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, +@@ -638,6 +645,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, ExplainPropertyFloat("Execution Time", "ms", 1000.0 * totaltime, 3, es); @@ -43,50 +46,21 @@ index 43f9b01e83..707211308c 100644 ExplainCloseGroup("Query", NULL, true, es); } -@@ -1579,6 +1587,38 @@ ExplainNode(PlanState *planstate, List *ancestors, +@@ -1582,6 +1593,9 @@ ExplainNode(PlanState *planstate, List *ancestors, appendStringInfo(es->str, " (actual rows=%.0f loops=%.0f)", rows, nloops); + -+#ifdef AQO_EXPLAIN -+ if (es->verbose && plan && planstate->instrument) -+ { -+ int wrkrs = 1; -+ double error = -1.; -+ -+ if (planstate->worker_instrument && IsParallelTuplesProcessing(plan)) -+ { -+ int i; -+ for (i = 0; i < planstate->worker_instrument->num_workers; i++) -+ { -+ Instrumentation *instrument = &planstate->worker_instrument->instrument[i]; -+ if (instrument->nloops <= 0) -+ continue; -+ wrkrs++; -+ } -+ } -+ -+ if (plan->predicted_cardinality> 0.) -+ { -+ error = 100. * (plan->predicted_cardinality - (rows*wrkrs)) -+ / plan->predicted_cardinality; -+ appendStringInfo(es->str, -+ " (AQO: cardinality=%.0lf, error=%.0lf%%, fsspace_hash=%d)", -+ plan->predicted_cardinality, error, plan->fss_hash); -+ } -+ else -+ appendStringInfo(es->str, " (AQO not used, fsspace_hash=%d)", -+ plan->fss_hash); -+ } -+#endif ++ if (ExplainOneNode_hook) ++ ExplainOneNode_hook(es, planstate, plan, rows); } else { diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c -index 530aac68a7..1d94feadb9 100644 +index 256ab54003..cfdc0247ec 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c -@@ -126,6 +126,12 @@ CopyPlanFields(const Plan *from, Plan *newnode) +@@ -127,6 +127,12 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_NODE_FIELD(lefttree); COPY_NODE_FIELD(righttree); COPY_NODE_FIELD(initPlan); @@ -100,21 +74,22 @@ index 530aac68a7..1d94feadb9 100644 COPY_BITMAPSET_FIELD(allParam); } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c -index f1dfdc1a4a..359cafa531 100644 +index ef7e8281cc..93d24b905a 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c -@@ -97,6 +97,10 @@ +@@ -97,6 +97,11 @@ #include "utils/spccache.h" #include "utils/tuplesort.h" +set_baserel_rows_estimate_hook_type set_baserel_rows_estimate_hook = NULL; ++set_foreign_rows_estimate_hook_type set_foreign_rows_estimate_hook = NULL; +get_parameterized_baserel_size_hook_type get_parameterized_baserel_size_hook = NULL; +get_parameterized_joinrel_size_hook_type get_parameterized_joinrel_size_hook = NULL; +set_joinrel_size_estimates_hook_type set_joinrel_size_estimates_hook = NULL; #define LOG2(x) (log(x) / 0.693147180559945) -@@ -185,7 +189,6 @@ static Cost append_nonpartial_cost(List *subpaths, int numpaths, +@@ -178,7 +183,6 @@ static Cost append_nonpartial_cost(List *subpaths, int numpaths, static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); @@ -122,7 +97,7 @@ index f1dfdc1a4a..359cafa531 100644 /* -@@ -266,7 +269,7 @@ cost_seqscan(Path *path, PlannerInfo *root, +@@ -256,7 +260,7 @@ cost_seqscan(Path *path, PlannerInfo *root, /* Adjust costing for parallelism, if used. */ if (path->parallel_workers> 0) { @@ -131,7 +106,7 @@ index f1dfdc1a4a..359cafa531 100644 /* The CPU cost is divided among all the workers. */ cpu_run_cost /= parallel_divisor; -@@ -745,7 +748,7 @@ cost_index(IndexPath *path, PlannerInfo *root, double loop_count, +@@ -735,7 +739,7 @@ cost_index(IndexPath *path, PlannerInfo *root, double loop_count, /* Adjust costing for parallelism, if used. */ if (path->path.parallel_workers> 0) { @@ -140,7 +115,7 @@ index f1dfdc1a4a..359cafa531 100644 path->path.rows = clamp_row_est(path->path.rows / parallel_divisor); -@@ -1026,7 +1029,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, +@@ -1016,7 +1020,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, /* Adjust costing for parallelism, if used. */ if (path->parallel_workers> 0) { @@ -149,7 +124,7 @@ index f1dfdc1a4a..359cafa531 100644 /* The CPU cost is divided among all the workers. */ cpu_run_cost /= parallel_divisor; -@@ -2129,7 +2132,7 @@ cost_append(AppendPath *apath) +@@ -2119,7 +2123,7 @@ cost_append(AppendPath *apath) else /* parallel-aware */ { int i = 0; @@ -158,7 +133,7 @@ index f1dfdc1a4a..359cafa531 100644 /* Parallel-aware Append never produces ordered output. */ Assert(apath->path.pathkeys == NIL); -@@ -2163,7 +2166,7 @@ cost_append(AppendPath *apath) +@@ -2153,7 +2157,7 @@ cost_append(AppendPath *apath) { double subpath_parallel_divisor; @@ -167,7 +142,7 @@ index f1dfdc1a4a..359cafa531 100644 apath->path.rows += subpath->rows * (subpath_parallel_divisor / parallel_divisor); apath->path.total_cost += subpath->total_cost; -@@ -2761,7 +2764,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path, +@@ -2752,7 +2756,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path, /* For partial paths, scale row estimate. */ if (path->path.parallel_workers> 0) { @@ -176,7 +151,7 @@ index f1dfdc1a4a..359cafa531 100644 path->path.rows = clamp_row_est(path->path.rows / parallel_divisor); -@@ -3207,7 +3210,7 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path, +@@ -3200,7 +3204,7 @@ final_cost_mergejoin(PlannerInfo *root, MergePath *path, /* For partial paths, scale row estimate. */ if (path->jpath.path.parallel_workers> 0) { @@ -185,7 +160,7 @@ index f1dfdc1a4a..359cafa531 100644 path->jpath.path.rows = clamp_row_est(path->jpath.path.rows / parallel_divisor); -@@ -3541,7 +3544,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, +@@ -3534,7 +3538,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace, * number, so we need to undo the division. */ if (parallel_hash) @@ -194,7 +169,7 @@ index f1dfdc1a4a..359cafa531 100644 /* * Get hash table size that executor would use for inner relation. -@@ -3638,7 +3641,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path, +@@ -3631,7 +3635,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path, /* For partial paths, scale row estimate. */ if (path->jpath.path.parallel_workers> 0) { @@ -203,10 +178,19 @@ index f1dfdc1a4a..359cafa531 100644 path->jpath.path.rows = clamp_row_est(path->jpath.path.rows / parallel_divisor); -@@ -4633,6 +4636,49 @@ approx_tuple_count(PlannerInfo *root, JoinPath *path, List *quals) +@@ -4626,6 +4630,58 @@ approx_tuple_count(PlannerInfo *root, JoinPath *path, List *quals) } ++void ++set_foreign_rows_estimate(PlannerInfo *root, RelOptInfo *rel) ++{ ++ if (set_foreign_rows_estimate_hook) ++ (*set_foreign_rows_estimate_hook) (root, rel); ++ else ++ rel->rows = 1000; /* entirely bogus default estimate */ ++} ++ +/* + * set_baserel_rows_estimate + * Set the rows estimate for the given base relation. @@ -253,7 +237,7 @@ index f1dfdc1a4a..359cafa531 100644 /* * set_baserel_size_estimates * Set the size estimates for the given base relation. -@@ -4649,19 +4695,10 @@ approx_tuple_count(PlannerInfo *root, JoinPath *path, List *quals) +@@ -4642,19 +4698,10 @@ approx_tuple_count(PlannerInfo *root, JoinPath *path, List *quals) void set_baserel_size_estimates(PlannerInfo *root, RelOptInfo *rel) { @@ -274,7 +258,7 @@ index f1dfdc1a4a..359cafa531 100644 cost_qual_eval(&rel->baserestrictcost, rel->baserestrictinfo, root); -@@ -4672,13 +4709,33 @@ set_baserel_size_estimates(PlannerInfo *root, RelOptInfo *rel) +@@ -4665,13 +4712,33 @@ set_baserel_size_estimates(PlannerInfo *root, RelOptInfo *rel) * get_parameterized_baserel_size * Make a size estimate for a parameterized scan of a base relation. * @@ -310,7 +294,7 @@ index f1dfdc1a4a..359cafa531 100644 { List *allclauses; double nrows; -@@ -4707,6 +4764,36 @@ get_parameterized_baserel_size(PlannerInfo *root, RelOptInfo *rel, +@@ -4700,6 +4767,36 @@ get_parameterized_baserel_size(PlannerInfo *root, RelOptInfo *rel, * set_joinrel_size_estimates * Set the size estimates for the given join relation. * @@ -347,7 +331,7 @@ index f1dfdc1a4a..359cafa531 100644 * The rel's targetlist must have been constructed already, and a * restriction clause list that matches the given component rels must * be provided. -@@ -4726,11 +4813,11 @@ get_parameterized_baserel_size(PlannerInfo *root, RelOptInfo *rel, +@@ -4719,11 +4816,11 @@ get_parameterized_baserel_size(PlannerInfo *root, RelOptInfo *rel, * build_joinrel_tlist, and baserestrictcost is not used for join rels. */ void @@ -364,7 +348,7 @@ index f1dfdc1a4a..359cafa531 100644 { rel->rows = calc_joinrel_size_estimate(root, rel, -@@ -4746,6 +4833,35 @@ set_joinrel_size_estimates(PlannerInfo *root, RelOptInfo *rel, +@@ -4739,6 +4836,35 @@ set_joinrel_size_estimates(PlannerInfo *root, RelOptInfo *rel, * get_parameterized_joinrel_size * Make a size estimate for a parameterized scan of a join relation. * @@ -400,7 +384,7 @@ index f1dfdc1a4a..359cafa531 100644 * 'rel' is the joinrel under consideration. * 'outer_path', 'inner_path' are (probably also parameterized) Paths that * produce the relations being joined. -@@ -4758,11 +4874,11 @@ set_joinrel_size_estimates(PlannerInfo *root, RelOptInfo *rel, +@@ -4751,11 +4877,11 @@ set_joinrel_size_estimates(PlannerInfo *root, RelOptInfo *rel, * set_joinrel_size_estimates must have been applied already. */ double @@ -417,7 +401,16 @@ index f1dfdc1a4a..359cafa531 100644 { double nrows; -@@ -5760,14 +5876,25 @@ page_size(double tuples, int width) +@@ -5424,7 +5550,7 @@ set_foreign_size_estimates(PlannerInfo *root, RelOptInfo *rel) + /* Should only be applied to base relations */ + Assert(rel->relid> 0); + +- rel->rows = 1000; /* entirely bogus default estimate */ ++ set_foreign_rows_estimate(root, rel); + + cost_qual_eval(&rel->baserestrictcost, rel->baserestrictinfo, root); + +@@ -5706,14 +5832,25 @@ page_size(double tuples, int width) return ceil(relation_byte_size(tuples, width) / BLCKSZ); } @@ -446,7 +439,7 @@ index f1dfdc1a4a..359cafa531 100644 /* * Early experience with parallel query suggests that when there is only -@@ -5784,7 +5911,7 @@ get_parallel_divisor(Path *path) +@@ -5730,7 +5867,7 @@ get_parallel_divisor(Path *path) { double leader_contribution; @@ -456,7 +449,7 @@ index f1dfdc1a4a..359cafa531 100644 parallel_divisor += leader_contribution; } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c -index 40abe6f9f6..9edd6daeff 100644 +index 84f2d186d9..a35d8ec9ee 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -70,6 +70,8 @@ @@ -486,7 +479,7 @@ index 40abe6f9f6..9edd6daeff 100644 return plan; } -@@ -1257,7 +1259,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) +@@ -1258,7 +1260,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; @@ -495,7 +488,7 @@ index 40abe6f9f6..9edd6daeff 100644 /* * If prepare_sort_from_pathkeys added sort columns, but we were told to -@@ -1303,7 +1305,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, +@@ -1304,7 +1306,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, * prepare_sort_from_pathkeys on it before we do so on the individual * child plans, to make cross-checking the sort info easier. */ @@ -504,7 +497,7 @@ index 40abe6f9f6..9edd6daeff 100644 plan->targetlist = tlist; plan->qual = NIL; plan->lefttree = NULL; -@@ -1456,7 +1458,7 @@ create_group_result_plan(PlannerInfo *root, GroupResultPath *best_path) +@@ -1458,7 +1460,7 @@ create_group_result_plan(PlannerInfo *root, GroupResultPath *best_path) plan = make_result(tlist, (Node *) quals, NULL); @@ -513,7 +506,7 @@ index 40abe6f9f6..9edd6daeff 100644 return plan; } -@@ -1481,7 +1483,7 @@ create_project_set_plan(PlannerInfo *root, ProjectSetPath *best_path) +@@ -1483,7 +1485,7 @@ create_project_set_plan(PlannerInfo *root, ProjectSetPath *best_path) plan = make_project_set(tlist, subplan); @@ -522,7 +515,7 @@ index 40abe6f9f6..9edd6daeff 100644 return plan; } -@@ -1509,7 +1511,7 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path, int flags) +@@ -1511,7 +1513,7 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path, int flags) plan = make_material(subplan); @@ -531,7 +524,7 @@ index 40abe6f9f6..9edd6daeff 100644 return plan; } -@@ -1709,7 +1711,7 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path, int flags) +@@ -1711,7 +1713,7 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path, int flags) } /* Copy cost data from Path to Plan */ @@ -866,10 +859,10 @@ index 40abe6f9f6..9edd6daeff 100644 /* diff --git a/src/backend/optimizer/util/relnode.c b/src/backend/optimizer/util/relnode.c -index 76245c1ff3..cac6adf35e 100644 +index a203e6f1ff..a335ede976 100644 --- a/src/backend/optimizer/util/relnode.c +++ b/src/backend/optimizer/util/relnode.c -@@ -1261,6 +1261,7 @@ find_childrel_parents(PlannerInfo *root, RelOptInfo *rel) +@@ -1264,6 +1264,7 @@ find_childrel_parents(PlannerInfo *root, RelOptInfo *rel) } @@ -877,7 +870,7 @@ index 76245c1ff3..cac6adf35e 100644 /* * get_baserel_parampathinfo * Get the ParamPathInfo for a parameterized path for a base relation, -@@ -1329,6 +1330,10 @@ get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, +@@ -1332,6 +1333,10 @@ get_baserel_parampathinfo(PlannerInfo *root, RelOptInfo *baserel, ppi->ppi_req_outer = required_outer; ppi->ppi_rows = rows; ppi->ppi_clauses = pclauses; @@ -888,7 +881,7 @@ index 76245c1ff3..cac6adf35e 100644 baserel->ppilist = lappend(baserel->ppilist, ppi); return ppi; -@@ -1554,6 +1559,10 @@ get_joinrel_parampathinfo(PlannerInfo *root, RelOptInfo *joinrel, +@@ -1557,6 +1562,10 @@ get_joinrel_parampathinfo(PlannerInfo *root, RelOptInfo *joinrel, ppi->ppi_req_outer = required_outer; ppi->ppi_rows = rows; ppi->ppi_clauses = NIL; @@ -900,10 +893,10 @@ index 76245c1ff3..cac6adf35e 100644 return ppi; diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h -index ba661d32a6..3c2595d639 100644 +index ba661d32a6..74e4f7592c 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h -@@ -75,6 +75,12 @@ extern PGDLLIMPORT ExplainOneQuery_hook_type ExplainOneQuery_hook; +@@ -75,6 +75,19 @@ extern PGDLLIMPORT ExplainOneQuery_hook_type ExplainOneQuery_hook; typedef const char *(*explain_get_index_name_hook_type) (Oid indexId); extern PGDLLIMPORT explain_get_index_name_hook_type explain_get_index_name_hook; @@ -913,14 +906,21 @@ index ba661d32a6..3c2595d639 100644 + ParamListInfo params, const instr_time *planduration, + QueryEnvironment *queryEnv); +extern PGDLLIMPORT ExplainOnePlan_hook_type ExplainOnePlan_hook; ++ ++/* Explain a node info */ ++typedef void (*ExplainOneNode_hook_type) (ExplainState *es, ++ PlanState *ps, ++ Plan *plan, ++ double rows); ++extern PGDLLIMPORT ExplainOneNode_hook_type ExplainOneNode_hook; extern void ExplainQuery(ParseState *pstate, ExplainStmt *stmt, ParamListInfo params, DestReceiver *dest); diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h -index 8f62d61702..cfcd2c249d 100644 +index 10f0a149e9..fecf543f44 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h -@@ -734,6 +734,10 @@ typedef struct RelOptInfo +@@ -738,6 +738,10 @@ typedef struct RelOptInfo Relids top_parent_relids; /* Relids of topmost parents (if "other" * rel) */ @@ -931,7 +931,7 @@ index 8f62d61702..cfcd2c249d 100644 /* used for partitioned relations: */ PartitionScheme part_scheme; /* Partitioning scheme */ int nparts; /* Number of partitions; -1 if not yet set; in -@@ -1101,6 +1105,10 @@ typedef struct ParamPathInfo +@@ -1104,6 +1108,10 @@ typedef struct ParamPathInfo Relids ppi_req_outer; /* rels supplying parameters used by path */ double ppi_rows; /* estimated number of result tuples */ List *ppi_clauses; /* join clauses available from outer rels */ @@ -943,10 +943,10 @@ index 8f62d61702..cfcd2c249d 100644 diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h -index 7e6b10f86b..148720a566 100644 +index 83e01074ed..5f1de775ca 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h -@@ -140,6 +140,19 @@ typedef struct Plan +@@ -146,6 +146,19 @@ typedef struct Plan List *initPlan; /* Init Plan nodes (un-correlated expr * subselects) */ @@ -967,18 +967,22 @@ index 7e6b10f86b..148720a566 100644 * Information for management of parameter-change-driven rescanning * diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h -index 6141654e47..0915da8618 100644 +index 6141654e47..3288548af6 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h -@@ -39,6 +39,33 @@ typedef enum +@@ -39,6 +39,37 @@ typedef enum } ConstraintExclusionType; +/* Hook for plugins to get control of cardinality estimation */ +typedef void (*set_baserel_rows_estimate_hook_type) (PlannerInfo *root, + RelOptInfo *rel); ++typedef void (*set_foreign_rows_estimate_hook_type) (PlannerInfo *root, ++ RelOptInfo *rel); +extern PGDLLIMPORT set_baserel_rows_estimate_hook_type + set_baserel_rows_estimate_hook; ++extern PGDLLIMPORT set_foreign_rows_estimate_hook_type ++ set_foreign_rows_estimate_hook; +typedef double (*get_parameterized_baserel_size_hook_type) (PlannerInfo *root, + RelOptInfo *rel, + List *param_clauses); @@ -1004,10 +1008,11 @@ index 6141654e47..0915da8618 100644 /* * prototypes for costsize.c * routines to compute costs and sizes -@@ -175,10 +202,21 @@ extern void compute_semi_anti_join_factors(PlannerInfo *root, +@@ -175,10 +206,22 @@ extern void compute_semi_anti_join_factors(PlannerInfo *root, SpecialJoinInfo *sjinfo, List *restrictlist, SemiAntiJoinFactors *semifactors); ++extern void set_foreign_rows_estimate(PlannerInfo *root, RelOptInfo *rel); +extern void set_baserel_rows_estimate(PlannerInfo *root, RelOptInfo *rel); +extern void set_baserel_rows_estimate_standard(PlannerInfo *root, RelOptInfo *rel); extern void set_baserel_size_estimates(PlannerInfo *root, RelOptInfo *rel); @@ -1026,7 +1031,7 @@ index 6141654e47..0915da8618 100644 extern double get_parameterized_joinrel_size(PlannerInfo *root, RelOptInfo *rel, Path *outer_path, -@@ -190,6 +228,11 @@ extern void set_joinrel_size_estimates(PlannerInfo *root, RelOptInfo *rel, +@@ -190,6 +233,11 @@ extern void set_joinrel_size_estimates(PlannerInfo *root, RelOptInfo *rel, RelOptInfo *inner_rel, SpecialJoinInfo *sjinfo, List *restrictlist); @@ -1038,7 +1043,7 @@ index 6141654e47..0915da8618 100644 extern void set_subquery_size_estimates(PlannerInfo *root, RelOptInfo *rel); extern void set_function_size_estimates(PlannerInfo *root, RelOptInfo *rel); extern void set_values_size_estimates(PlannerInfo *root, RelOptInfo *rel); -@@ -202,5 +245,7 @@ extern void set_foreign_size_estimates(PlannerInfo *root, RelOptInfo *rel); +@@ -202,5 +250,7 @@ extern void set_foreign_size_estimates(PlannerInfo *root, RelOptInfo *rel); extern PathTarget *set_pathtarget_cost_width(PlannerInfo *root, PathTarget *target); extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel, Path *bitmapqual, int loop_count, Cost *cost, double *tuple); @@ -1047,7 +1052,7 @@ index 6141654e47..0915da8618 100644 #endif /* COST_H */ diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h -index 715a24ad29..7311ba92f4 100644 +index 3bd7072ae8..21bbaba11c 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -18,6 +18,10 @@ @@ -1062,7 +1067,7 @@ index 715a24ad29..7311ba92f4 100644 * prototypes for pathnode.c */ diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h -index 81c4a7e560..59daf7fb81 100644 +index f3cefe67b8..6d77f6e871 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -24,6 +24,12 @@ extern double cursor_tuple_fraction; diff --git a/auto_tuning.c b/auto_tuning.c index a19f42d0..b82b415b 100644 --- a/auto_tuning.c +++ b/auto_tuning.c @@ -8,7 +8,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/auto_tuning.c @@ -187,8 +187,11 @@ automatical_query_tuning(int query_hash, QueryStat * stat) } if (num_iterations <= auto_tuning_max_iterations || p_use> 0.5) - update_query(query_hash, query_context.learn_aqo, query_context.use_aqo, - query_context.fspace_hash, true); + update_query(query_hash, + query_context.fspace_hash, + query_context.learn_aqo, + query_context.use_aqo, + true); else - update_query(query_hash, false, false, query_context.fspace_hash, false); + update_query(query_hash, query_context.fspace_hash, false, false, false); } diff --git a/cardinality_estimation.c b/cardinality_estimation.c index 89ddf1ee..3b4dda09 100644 --- a/cardinality_estimation.c +++ b/cardinality_estimation.c @@ -8,7 +8,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/cardinality_estimation.c @@ -40,7 +40,8 @@ predict_for_relation(List *restrict_clauses, List *selectivities, for (i = 0; i < aqo_K; ++i) matrix[i] = palloc0(sizeof(**matrix) * nfeatures); - if (load_fss(*fss_hash, nfeatures, matrix, targets, &rows)) + if (load_fss(query_context.fspace_hash, *fss_hash, nfeatures, matrix, + targets, &rows)) result = OkNNr_predict(rows, nfeatures, matrix, targets, features); else { diff --git a/cardinality_hooks.c b/cardinality_hooks.c index 76f54d68..dd631161 100644 --- a/cardinality_hooks.c +++ b/cardinality_hooks.c @@ -18,7 +18,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/cardinality_hooks.c @@ -156,7 +156,8 @@ aqo_set_baserel_rows_estimate(PlannerInfo *root, RelOptInfo *rel) relids = list_make1_int(relid); restrict_clauses = list_copy(rel->baserestrictinfo); - predicted = predict_for_relation(restrict_clauses, selectivities, relids, &fss); + predicted = predict_for_relation(restrict_clauses, selectivities, + relids, &fss); rel->fss_hash = fss; if (predicted>= 0) @@ -208,12 +209,16 @@ aqo_get_parameterized_baserel_size(PlannerInfo *root, if (query_context.use_aqo || query_context.learn_aqo) { + MemoryContext mcxt; + allclauses = list_concat(list_copy(param_clauses), list_copy(rel->baserestrictinfo)); selectivities = get_selectivities(root, allclauses, rel->relid, JOIN_INNER, NULL); relid = planner_rt_fetch(rel->relid, root)->relid; get_eclasses(allclauses, &nargs, &args_hash, &eclass_hash); + + mcxt = MemoryContextSwitchTo(CacheMemoryContext); forboth(l, allclauses, l2, selectivities) { current_hash = get_clause_hash( @@ -222,6 +227,8 @@ aqo_get_parameterized_baserel_size(PlannerInfo *root, cache_selectivity(current_hash, rel->relid, relid, *((double *) lfirst(l2))); } + + MemoryContextSwitchTo(mcxt); pfree(args_hash); pfree(eclass_hash); } diff --git a/conf.add b/conf.add index 21843d00..3556e4d6 100644 --- a/conf.add +++ b/conf.add @@ -1 +1,3 @@ -shared_preload_libraries = 'aqo' +autovacuum = off +shared_preload_libraries = 'postgres_fdw, aqo' +max_parallel_workers = 0 # switch off parallel workers because of unsteadiness diff --git a/expected/aqo_fdw.out b/expected/aqo_fdw.out new file mode 100644 index 00000000..23cd2f3f --- /dev/null +++ b/expected/aqo_fdw.out @@ -0,0 +1,149 @@ +-- Tests on cardinality estimation of FDW-queries: +-- simple ForeignScan. +-- JOIN push-down (check push of baserestrictinfo and joininfo) +-- Aggregate push-down +-- Push-down of groupings with HAVING clause. +CREATE EXTENSION aqo; +CREATE EXTENSION postgres_fdw; +SET aqo.mode = 'learn'; +SET aqo.show_details = 'true'; -- show AQO info for each node and entire query. +SET aqo.show_hash = 'false'; -- a hash value is system-depended. Ignore it. +DO $d$ + BEGIN + EXECUTE $$CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; + END; +$d$; +CREATE USER MAPPING FOR PUBLIC SERVER loopback; +CREATE TABLE local (x int); +CREATE FOREIGN TABLE frgn(x int) SERVER loopback OPTIONS (table_name 'local'); +INSERT INTO frgn (x) VALUES (1); +ANALYZE local; +-- Trivial foreign scan.s +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT x FROM frgn; + QUERY PLAN +---------------------------------------------- + Foreign Scan on frgn (actual rows=1 loops=1) + AQO not used + Using aqo: true + AQO mode: LEARN + JOINS: 0 +(5 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT x FROM frgn; + QUERY PLAN +---------------------------------------------- + Foreign Scan on frgn (actual rows=1 loops=1) + AQO: rows=1, error=0% + Using aqo: true + AQO mode: LEARN + JOINS: 0 +(5 rows) + +-- Push down base filters. Use verbose mode to see filters. +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE)) +SELECT x FROM frgn WHERE x < 10; +ERROR: syntax error at or near ")" +LINE 1: ...LAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE)) + ^ +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE) +SELECT x FROM frgn WHERE x < 10; + QUERY PLAN +----------------------------------------------------------- + Foreign Scan on public.frgn (actual rows=1 loops=1) + AQO not used + Output: x + Remote SQL: SELECT x FROM public.local WHERE ((x < 10)) + Using aqo: true + AQO mode: LEARN + JOINS: 0 +(7 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT x FROM frgn WHERE x < -10; -- AQO ignores constants + QUERY PLAN +---------------------------------------------- + Foreign Scan on frgn (actual rows=0 loops=1) + AQO: rows=1, error=100% + Using aqo: true + AQO mode: LEARN + JOINS: 0 +(5 rows) + +-- Trivial JOIN push-down. +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM frgn AS a, frgn AS b WHERE a.x=b.x; + QUERY PLAN +------------------------------------------------------------ + Merge Join (actual rows=1 loops=1) + AQO not used + Merge Cond: (a.x = b.x) + -> Sort (actual rows=1 loops=1) + AQO not used + Sort Key: a.x + Sort Method: quicksort Memory: 25kB + -> Foreign Scan on frgn a (actual rows=1 loops=1) + AQO not used + -> Sort (actual rows=1 loops=1) + AQO not used + Sort Key: b.x + Sort Method: quicksort Memory: 25kB + -> Foreign Scan on frgn b (actual rows=1 loops=1) + AQO not used + Using aqo: true + AQO mode: LEARN + JOINS: 0 +(18 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE) +SELECT * FROM frgn AS a, frgn AS b WHERE a.x=b.x; + QUERY PLAN +-------------------------------------------------------------------------------------------------------- + Foreign Scan (actual rows=1 loops=1) + AQO: rows=1, error=0% + Output: a.x, b.x + Relations: (public.frgn a) INNER JOIN (public.frgn b) + Remote SQL: SELECT r1.x, r2.x FROM (public.local r1 INNER JOIN public.local r2 ON (((r1.x = r2.x)))) + Using aqo: true + AQO mode: LEARN + JOINS: 0 +(8 rows) + +-- TODO: Non-mergejoinable join condition. +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM frgn AS a, frgn AS b WHERE a.xpredicted_cardinality < 0.) + { + char nodestr[1024]; + char *qplan = nodeToString(plan); + + memset(nodestr, 0, 1024); + strncpy(nodestr, qplan, 1023); + pfree(qplan); + + /* + * AQO failed to predict cardinality for this node. + */ + values[0] = Int32GetDatum(qhash); + values[1] = Int32GetDatum(fhash); + values[2] = Int32GetDatum(fss_hash); + values[3] = Int32GetDatum(nodeTag(plan)); + values[4] = CStringGetTextDatum(nodestr); + tuple = heap_form_tuple(tupDesc, values, isnull); + + simple_heap_insert(hrel, tuple); + my_index_insert(irel, values, isnull, &(tuple->t_self), + hrel, UNIQUE_CHECK_YES); + } + else + { + /* AQO works as expected. */ + } + } + else if (!TransactionIdIsValid(snap.xmin) && + !TransactionIdIsValid(snap.xmax)) + { + /* + * AQO made prediction for this node. Delete it from the ignorance + * table. + */ + tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + Assert(shouldFree != true); + simple_heap_delete(hrel, &(tuple->t_self)); + } + else + { + /* + * The data exists. We can't do anything for now. + */ + } + + ExecDropSingleTupleTableSlot(slot); + index_endscan(scan); + index_close(irel, RowExclusiveLock); + table_close(hrel, RowExclusiveLock); + + CommandCounterIncrement(); + LockRelease(&tag, ExclusiveLock, false); +} diff --git a/ignorance.h b/ignorance.h new file mode 100644 index 00000000..bceb855b --- /dev/null +++ b/ignorance.h @@ -0,0 +1,12 @@ +#ifndef IGNORANCE_H +#define IGNORANCE_H + +#include "postgres.h" + +extern bool aqo_log_ignorance; + +extern void set_ignorance(bool newval, void *extra); +extern bool create_ignorance_table(bool fail_ok); +extern void update_ignorance(int qhash, int fhash, int fss_hash, Plan *plan); + +#endif /* IGNORANCE_H */ diff --git a/machine_learning.c b/machine_learning.c index 7b4612cd..9ebbae6a 100644 --- a/machine_learning.c +++ b/machine_learning.c @@ -12,7 +12,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/machine_learning.c diff --git a/path_utils.c b/path_utils.c index 6e809818..f91d8be8 100644 --- a/path_utils.c +++ b/path_utils.c @@ -5,7 +5,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/path_utils.c @@ -79,7 +79,7 @@ get_path_clauses(Path *path, PlannerInfo *root, List **selectivities) List *inner_sel = NIL; List *outer; List *outer_sel = NIL; - List *cur; + List *cur = NIL; List *cur_sel = NIL; Assert(selectivities != NULL); @@ -113,6 +113,7 @@ get_path_clauses(Path *path, PlannerInfo *root, List **selectivities) selectivities); break; case T_GatherPath: + case T_GatherMergePath: return get_path_clauses(((GatherPath *) path)->subpath, root, selectivities); break; @@ -160,6 +161,32 @@ get_path_clauses(Path *path, PlannerInfo *root, List **selectivities) return get_path_clauses(((LimitPath *) path)->subpath, root, selectivities); break; + case T_SubqueryScanPath: + return get_path_clauses(((SubqueryScanPath *) path)->subpath, root, + selectivities); + break; + case T_AppendPath: + { + ListCell *lc; + + foreach (lc, ((AppendPath *) path)->subpaths) + { + Path *subpath = lfirst(lc); + + cur = list_concat(cur, list_copy( + get_path_clauses(subpath, root, selectivities))); + cur_sel = list_concat(cur_sel, *selectivities); + } + cur = list_concat(cur, list_copy(path->parent->baserestrictinfo)); + *selectivities = list_concat(cur_sel, + get_selectivities(root, + path->parent->baserestrictinfo, + 0, JOIN_INNER, NULL)); + return cur; + } + break; + case T_ForeignPath: + /* The same as in the default case */ default: cur = list_concat(list_copy(path->parent->baserestrictinfo), path->param_info ? diff --git a/postprocessing.c b/postprocessing.c index db38b2d4..74072fdb 100644 --- a/postprocessing.c +++ b/postprocessing.c @@ -9,7 +9,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/postprocessing.c @@ -17,10 +17,14 @@ */ #include "aqo.h" +#include "ignorance.h" + #include "access/parallel.h" #include "optimizer/optimizer.h" +#include "postgres_fdw.h" #include "utils/queryenvironment.h" + typedef struct { List *clauselist; @@ -40,29 +44,30 @@ static char *PlanStateInfo = "PlanStateInfo"; /* Query execution statistics collecting utilities */ -static void atomic_fss_learn_step(int fss_hash, int ncols, - double **matrix, double *targets, - double *features, double target); +static void atomic_fss_learn_step(int fhash, int fss_hash, int ncols, + double **matrix, double *targets, + double *features, double target); static void learn_sample(List *clauselist, - List *selectivities, - List *relidslist, - double true_cardinality, - double predicted_cardinality); + List *selectivities, + List *relidslist, + double true_cardinality, + Plan *plan); static List *restore_selectivities(List *clauselist, - List *relidslist, - JoinType join_type, - bool was_parametrized); + List *relidslist, + JoinType join_type, + bool was_parametrized); static void update_query_stat_row(double *et, int *et_size, - double *pt, int *pt_size, - double *ce, int *ce_size, - double planning_time, - double execution_time, - double cardinality_error, - int64 *n_exec); -static void StoreToQueryContext(QueryDesc *queryDesc); + double *pt, int *pt_size, + double *ce, int *ce_size, + double planning_time, + double execution_time, + double cardinality_error, + int64 *n_exec); +static void StoreToQueryEnv(QueryDesc *queryDesc); static void StorePlanInternals(QueryDesc *queryDesc); -static bool ExtractFromQueryContext(QueryDesc *queryDesc); -static void RemoveFromQueryContext(QueryDesc *queryDesc); +static bool ExtractFromQueryEnv(QueryDesc *queryDesc); +static void RemoveFromQueryEnv(QueryDesc *queryDesc); + /* * This is the critical section: only one runner is allowed to be inside this @@ -70,17 +75,23 @@ static void RemoveFromQueryContext(QueryDesc *queryDesc); * matrix and targets are just preallocated memory for computations. */ static void -atomic_fss_learn_step(int fss_hash, int ncols, - double **matrix, double *targets, - double *features, double target) +atomic_fss_learn_step(int fhash, int fss_hash, int ncols, + double **matrix, double *targets, + double *features, double target) { - int nrows; + LOCKTAG tag; + int nrows; - if (!load_fss(fss_hash, ncols, matrix, targets, &nrows)) + init_lock_tag(&tag, (uint32) fhash, (uint32) fss_hash); + LockAcquire(&tag, ExclusiveLock, false, false); + + if (!load_fss(fhash, fss_hash, ncols, matrix, targets, &nrows)) nrows = 0; nrows = OkNNr_learn(nrows, ncols, matrix, targets, features, target); - update_fss(fss_hash, nrows, ncols, matrix, targets); + update_fss(fhash, fss_hash, nrows, ncols, matrix, targets); + + LockRelease(&tag, ExclusiveLock, false); } /* @@ -89,36 +100,38 @@ atomic_fss_learn_step(int fss_hash, int ncols, */ static void learn_sample(List *clauselist, List *selectivities, List *relidslist, - double true_cardinality, double predicted_cardinality) + double true_cardinality, Plan *plan) { - int fss_hash; - int nfeatures; - double *matrix[aqo_K]; - double targets[aqo_K]; - double *features; - double target; - int i; + int fhash = query_context.fspace_hash; + int fss_hash; + int nfeatures; + double *matrix[aqo_K]; + double targets[aqo_K]; + double *features; + double target; + int i; -/* - * Suppress the optimization for debug purposes. - if (fabs(log(predicted_cardinality) - log(true_cardinality)) < - object_selection_prediction_threshold) - { - return; - } -*/ target = log(true_cardinality); - fss_hash = get_fss_for_object(clauselist, selectivities, relidslist, - &nfeatures, &features); + &nfeatures, &features); + + if (aqo_log_ignorance /* && load_fss(fhash, fss_hash, 0, NULL, NULL, NULL) */) + { + /* + * If ignorance logging is enabled and the feature space was existed in + * the ML knowledge base, log this issue. + */ + update_ignorance(query_context.query_hash, fhash, fss_hash, plan); + } if (nfeatures> 0) for (i = 0; i < aqo_K; ++i) matrix[i] = palloc(sizeof(double) * nfeatures); - /* Here should be critical section */ - atomic_fss_learn_step(fss_hash, nfeatures, matrix, targets, features, target); - /* Here should be the end of critical section */ + /* Critical section */ + atomic_fss_learn_step(fhash, fss_hash, + nfeatures, matrix, targets, features, target); + /* End of critical section */ if (nfeatures> 0) for (i = 0; i < aqo_K; ++i) @@ -137,14 +150,14 @@ restore_selectivities(List *clauselist, JoinType join_type, bool was_parametrized) { - List *lst = NIL; - ListCell *l; + List *lst = NIL; + ListCell *l; int i = 0; bool parametrized_sel; int nargs; - int *args_hash; - int *eclass_hash; - double *cur_sel; + int *args_hash; + int *eclass_hash; + double *cur_sel; int cur_hash; int cur_relid; @@ -252,7 +265,9 @@ learnOnPlanState(PlanState *p, void *context) ctx->relidslist = list_copy(p->plan->path_relids); if (p->instrument && (p->righttree != NULL || p->lefttree == NULL || - p->plan->path_clauses != NIL)) + p->plan->path_clauses != NIL || + IsA(p, ForeignScanState) || + IsA(p, AppendState) || IsA(p, MergeAppendState))) { double learn_rows = 0.; double predicted = 0.; @@ -287,7 +302,7 @@ learnOnPlanState(PlanState *p, void *context) } else /* This node does not required to sum tuples of each worker - * to calculate produced rows. */ + * to calculate produced rows. */ learn_rows = p->instrument->ntuples / p->instrument->nloops; if (p->plan->predicted_cardinality> 0.) @@ -329,13 +344,14 @@ learnOnPlanState(PlanState *p, void *context) if (ctx->learn) learn_sample(SubplanCtx.clauselist, SubplanCtx.selectivities, - p->plan->path_relids, learn_rows, predicted); + p->plan->path_relids, learn_rows, + p->plan); } } ctx->clauselist = list_concat(ctx->clauselist, SubplanCtx.clauselist); ctx->selectivities = list_concat(ctx->selectivities, - SubplanCtx.selectivities); + SubplanCtx.selectivities); return false; } @@ -411,7 +427,7 @@ aqo_ExecutorStart(QueryDesc *queryDesc, int eflags) queryDesc->instrument_options |= INSTRUMENT_ROWS; /* Save all query-related parameters into the query context. */ - StoreToQueryContext(queryDesc); + StoreToQueryEnv(queryDesc); } if (prev_ExecutorStart_hook) @@ -437,11 +453,12 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) QueryStat *stat = NULL; instr_time endtime; EphemeralNamedRelation enr = get_ENR(queryDesc->queryEnv, PlanStateInfo); + LOCKTAG tag; cardinality_sum_errors = 0.; cardinality_num_objects = 0; - if (!ExtractFromQueryContext(queryDesc)) + if (!ExtractFromQueryEnv(queryDesc)) /* AQO keep all query-related preferences at the query context. * It is needed to prevent from possible recursive changes, at * preprocessing stage of subqueries. @@ -471,6 +488,11 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) list_free(ctx.selectivities); } + /* Prevent concurrent updates. */ + init_lock_tag(&tag, (uint32) query_context.query_hash, + (uint32) query_context.fspace_hash); + LockAcquire(&tag, ExclusiveLock, false, false); + if (query_context.collect_stat) { INSTR_TIME_SET_CURRENT(endtime); @@ -487,26 +509,26 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) { if (query_context.use_aqo) update_query_stat_row(stat->execution_time_with_aqo, - &stat->execution_time_with_aqo_size, - stat->planning_time_with_aqo, - &stat->planning_time_with_aqo_size, - stat->cardinality_error_with_aqo, - &stat->cardinality_error_with_aqo_size, - query_context.query_planning_time, - totaltime - query_context.query_planning_time, - cardinality_error, - &stat->executions_with_aqo); + &stat->execution_time_with_aqo_size, + stat->planning_time_with_aqo, + &stat->planning_time_with_aqo_size, + stat->cardinality_error_with_aqo, + &stat->cardinality_error_with_aqo_size, + query_context.query_planning_time, + totaltime - query_context.query_planning_time, + cardinality_error, + &stat->executions_with_aqo); else update_query_stat_row(stat->execution_time_without_aqo, - &stat->execution_time_without_aqo_size, - stat->planning_time_without_aqo, - &stat->planning_time_without_aqo_size, - stat->cardinality_error_without_aqo, - &stat->cardinality_error_without_aqo_size, - query_context.query_planning_time, - totaltime - query_context.query_planning_time, - cardinality_error, - &stat->executions_without_aqo); + &stat->execution_time_without_aqo_size, + stat->planning_time_without_aqo, + &stat->planning_time_without_aqo_size, + stat->cardinality_error_without_aqo, + &stat->cardinality_error_without_aqo_size, + query_context.query_planning_time, + totaltime - query_context.query_planning_time, + cardinality_error, + &stat->executions_without_aqo); } } selectivity_cache_clear(); @@ -522,7 +544,9 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) update_aqo_stat(query_context.fspace_hash, stat); pfree_query_stat(stat); } - RemoveFromQueryContext(queryDesc); + + LockRelease(&tag, ExclusiveLock, false); + RemoveFromQueryEnv(queryDesc); end: if (prev_ExecutorEnd_hook) @@ -542,7 +566,7 @@ aqo_ExecutorEnd(QueryDesc *queryDesc) void aqo_copy_generic_path_info(PlannerInfo *root, Plan *dest, Path *src) { - bool is_join_path; + bool is_join_path; if (prev_copy_generic_path_info_hook) prev_copy_generic_path_info_hook(root, dest, src); @@ -558,7 +582,7 @@ aqo_copy_generic_path_info(PlannerInfo *root, Plan *dest, Path *src) * path_parallel_workers, and was_parameterized. */ Assert(dest->path_clauses && dest->path_jointype && - dest->path_relids && dest->path_parallel_workers); + dest->path_relids && dest->path_parallel_workers); return; } @@ -567,6 +591,47 @@ aqo_copy_generic_path_info(PlannerInfo *root, Plan *dest, Path *src) dest->path_clauses = ((JoinPath *) src)->joinrestrictinfo; dest->path_jointype = ((JoinPath *) src)->jointype; } + else if (src->type == T_ForeignPath) + { + ForeignPath *fpath = (ForeignPath *) src; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) fpath->path.parent->fdw_private; + + /* + * Pushed down foreign join keeps clauses in special fdw_private + * structure. + * I'm not sure what fpinfo structure keeps clauses for sufficient time. + * So, copy clauses. + */ + + dest->path_clauses = list_concat(list_copy(fpinfo->joinclauses), + list_copy(fpinfo->remote_conds)); + dest->path_clauses = list_concat(dest->path_clauses, + list_copy(fpinfo->local_conds)); + + dest->path_jointype = ((JoinPath *) src)->jointype; + + dest->path_relids = get_list_of_relids(root, fpinfo->lower_subquery_rels); + + if (fpinfo->outerrel) + { + dest->path_clauses = list_concat(dest->path_clauses, + list_copy(fpinfo->outerrel->baserestrictinfo)); + dest->path_clauses = list_concat(dest->path_clauses, + list_copy(fpinfo->outerrel->joininfo)); + dest->path_relids = list_concat(dest->path_relids, + get_list_of_relids(root, fpinfo->outerrel->relids)); + } + + if (fpinfo->innerrel) + { + dest->path_clauses = list_concat(dest->path_clauses, + list_copy(fpinfo->innerrel->baserestrictinfo)); + dest->path_clauses = list_concat(dest->path_clauses, + list_copy(fpinfo->innerrel->joininfo)); + dest->path_relids = list_concat(dest->path_relids, + get_list_of_relids(root, fpinfo->innerrel->relids)); + } + } else { dest->path_clauses = list_concat( @@ -575,7 +640,8 @@ aqo_copy_generic_path_info(PlannerInfo *root, Plan *dest, Path *src) dest->path_jointype = JOIN_INNER; } - dest->path_relids = get_list_of_relids(root, src->parent->relids); + dest->path_relids = list_concat(dest->path_relids, + get_list_of_relids(root, src->parent->relids)); dest->path_parallel_workers = src->parallel_workers; dest->was_parametrized = (src->param_info != NULL); @@ -599,7 +665,7 @@ aqo_copy_generic_path_info(PlannerInfo *root, Plan *dest, Path *src) * top-level query. */ static void -StoreToQueryContext(QueryDesc *queryDesc) +StoreToQueryEnv(QueryDesc *queryDesc) { EphemeralNamedRelation enr; int qcsize = sizeof(QueryContextData); @@ -667,7 +733,7 @@ StorePlanInternals(QueryDesc *queryDesc) * Restore AQO data, related to the query. */ static bool -ExtractFromQueryContext(QueryDesc *queryDesc) +ExtractFromQueryEnv(QueryDesc *queryDesc) { EphemeralNamedRelation enr; @@ -690,7 +756,7 @@ ExtractFromQueryContext(QueryDesc *queryDesc) } static void -RemoveFromQueryContext(QueryDesc *queryDesc) +RemoveFromQueryEnv(QueryDesc *queryDesc) { EphemeralNamedRelation enr = get_ENR(queryDesc->queryEnv, AQOPrivateData); unregister_ENR(queryDesc->queryEnv, AQOPrivateData); @@ -704,59 +770,106 @@ RemoveFromQueryContext(QueryDesc *queryDesc) pfree(enr); } +void +print_node_explain(ExplainState *es, PlanState *ps, Plan *plan, double rows) +{ + int wrkrs = 1; + double error = -1.; + + if (!aqo_show_details || !plan || !ps->instrument) + return; + + if (ps->worker_instrument && IsParallelTuplesProcessing(plan)) + { + int i; + + for (i = 0; i < ps->worker_instrument->num_workers; i++) + { + Instrumentation *instrument = &ps->worker_instrument->instrument[i]; + + if (instrument->nloops <= 0) + continue; + + wrkrs++; + } + } + + appendStringInfoChar(es->str, '\n'); + Assert(es->format == EXPLAIN_FORMAT_TEXT); + if (es->str->len == 0 || es->str->data[es->str->len - 1] == '\n') + appendStringInfoSpaces(es->str, es->indent * 2); + + if (plan->predicted_cardinality> 0.) + { + error = 100. * (plan->predicted_cardinality - (rows*wrkrs)) + / plan->predicted_cardinality; + appendStringInfo(es->str, + "AQO: rows=%.0lf, error=%.0lf%%", + plan->predicted_cardinality, error); + } + else + appendStringInfo(es->str, "AQO not used"); + + if (aqo_show_hash) + appendStringInfo(es->str, ", fss=%d", plan->fss_hash); + + if (prev_ExplainOneNode_hook) + prev_ExplainOneNode_hook(es, ps, plan, rows); +} + /* * Prints if the plan was constructed with AQO. */ -void print_into_explain(PlannedStmt *plannedstmt, IntoClause *into, - ExplainState *es, const char *queryString, - ParamListInfo params, const instr_time *planduration, - QueryEnvironment *queryEnv) +void +print_into_explain(PlannedStmt *plannedstmt, IntoClause *into, + ExplainState *es, const char *queryString, + ParamListInfo params, const instr_time *planduration, + QueryEnvironment *queryEnv) { if (prev_ExplainOnePlan_hook) prev_ExplainOnePlan_hook(plannedstmt, into, es, queryString, - params, planduration, queryEnv); + params, planduration, queryEnv); + + if (!aqo_show_details) + return; -#ifdef AQO_EXPLAIN /* Report to user about aqo state only in verbose mode */ - if (es->verbose) - { - ExplainPropertyBool("Using aqo", query_context.use_aqo, es); + ExplainPropertyBool("Using aqo", query_context.use_aqo, es); - switch (aqo_mode) - { - case AQO_MODE_INTELLIGENT: - ExplainPropertyText("AQO mode", "INTELLIGENT", es); - break; - case AQO_MODE_FORCED: - ExplainPropertyText("AQO mode", "FORCED", es); - break; - case AQO_MODE_CONTROLLED: - ExplainPropertyText("AQO mode", "CONTROLLED", es); - break; - case AQO_MODE_LEARN: - ExplainPropertyText("AQO mode", "LEARN", es); - break; - case AQO_MODE_FROZEN: - ExplainPropertyText("AQO mode", "FROZEN", es); - break; - case AQO_MODE_DISABLED: - ExplainPropertyText("AQO mode", "DISABLED", es); - break; - default: - elog(ERROR, "Bad AQO state"); - break; - } + switch (aqo_mode) + { + case AQO_MODE_INTELLIGENT: + ExplainPropertyText("AQO mode", "INTELLIGENT", es); + break; + case AQO_MODE_FORCED: + ExplainPropertyText("AQO mode", "FORCED", es); + break; + case AQO_MODE_CONTROLLED: + ExplainPropertyText("AQO mode", "CONTROLLED", es); + break; + case AQO_MODE_LEARN: + ExplainPropertyText("AQO mode", "LEARN", es); + break; + case AQO_MODE_FROZEN: + ExplainPropertyText("AQO mode", "FROZEN", es); + break; + case AQO_MODE_DISABLED: + ExplainPropertyText("AQO mode", "DISABLED", es); + break; + default: + elog(ERROR, "Bad AQO state"); + break; + } - /* - * Query hash provides an user the conveniently use of the AQO - * auxiliary functions. - */ - if (aqo_mode != AQO_MODE_DISABLED || force_collect_stat) - { + /* + * Query class provides an user the conveniently use of the AQO + * auxiliary functions. + */ + if (aqo_mode != AQO_MODE_DISABLED || force_collect_stat) + { + if (aqo_show_hash) ExplainPropertyInteger("Query hash", NULL, - query_context.query_hash, es); - ExplainPropertyInteger("JOINS", NULL, njoins, es); - } + query_context.query_hash, es); + ExplainPropertyInteger("JOINS", NULL, njoins, es); } -#endif } diff --git a/preprocessing.c b/preprocessing.c index 79097a92..3ef0ac20 100644 --- a/preprocessing.c +++ b/preprocessing.c @@ -49,7 +49,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/preprocessing.c @@ -124,6 +124,7 @@ aqo_planner(Query *parse, bool query_is_stored; Datum query_params[5]; bool query_nulls[5] = {false, false, false, false, false}; + LOCKTAG tag; selectivity_cache_clear(); @@ -134,6 +135,8 @@ aqo_planner(Query *parse, */ if ((parse->commandType != CMD_SELECT && parse->commandType != CMD_INSERT && parse->commandType != CMD_UPDATE && parse->commandType != CMD_DELETE) || + strstr(application_name, "postgres_fdw") != NULL || /* Prevent distributed deadlocks */ + strstr(application_name, "pgfdw:") != NULL || /* caused by fdw */ get_extension_oid("aqo", true) == InvalidOid || creating_extension || IsParallelWorker() || @@ -161,6 +164,13 @@ aqo_planner(Query *parse, boundParams); } + /* + * find-add query and query text must be atomic operation to prevent + * concurrent insertions. + */ + init_lock_tag(&tag, (uint32) query_context.query_hash, (uint32) 0); + LockAcquire(&tag, ExclusiveLock, false, false); + query_is_stored = find_query(query_context.query_hash, &query_params[0], &query_nulls[0]); @@ -215,9 +225,18 @@ aqo_planner(Query *parse, if (query_context.adding_query || force_collect_stat) { - add_query(query_context.query_hash, query_context.learn_aqo, - query_context.use_aqo, query_context.fspace_hash, - query_context.auto_tuning); + /* + * Add query into the AQO knowledge base. To process an error with + * concurrent addition from another backend we will try to restart + * preprocessing routine. + */ + update_query(query_context.query_hash, + query_context.fspace_hash, + query_context.learn_aqo, + query_context.use_aqo, + query_context.auto_tuning); + + add_query_text(query_context.query_hash, query_text); } } @@ -271,6 +290,8 @@ aqo_planner(Query *parse, } } + LockRelease(&tag, ExclusiveLock, false); + /* * This mode is possible here, because force collect statistics uses AQO * machinery. @@ -317,6 +338,22 @@ isQueryUsingSystemRelation(Query *query) return isQueryUsingSystemRelation_walker((Node *) query, NULL); } +static bool +IsAQORelation(Relation rel) +{ + char *relname = NameStr(rel->rd_rel->relname); + + if (strcmp(relname, "aqo_data") == 0 || + strcmp(relname, "aqo_query_texts") == 0 || + strcmp(relname, "aqo_query_stat") == 0 || + strcmp(relname, "aqo_queries") == 0 || + strcmp(relname, "aqo_ignorance") == 0 + ) + return true; + + return false; +} + bool isQueryUsingSystemRelation_walker(Node *node, void *context) { @@ -336,9 +373,10 @@ isQueryUsingSystemRelation_walker(Node *node, void *context) { Relation rel = table_open(rte->relid, AccessShareLock); bool is_catalog = IsCatalogRelation(rel); + bool is_aqo_rel = IsAQORelation(rel); table_close(rel, AccessShareLock); - if (is_catalog) + if (is_catalog || is_aqo_rel) return true; } } diff --git a/selectivity_cache.c b/selectivity_cache.c index 455d13b1..12ecd699 100644 --- a/selectivity_cache.c +++ b/selectivity_cache.c @@ -9,7 +9,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/selectivity_cache.c diff --git a/sql/aqo_fdw.sql b/sql/aqo_fdw.sql new file mode 100644 index 00000000..38e31ea1 --- /dev/null +++ b/sql/aqo_fdw.sql @@ -0,0 +1,58 @@ +-- Tests on cardinality estimation of FDW-queries: +-- simple ForeignScan. +-- JOIN push-down (check push of baserestrictinfo and joininfo) +-- Aggregate push-down +-- Push-down of groupings with HAVING clause. + +CREATE EXTENSION aqo; +CREATE EXTENSION postgres_fdw; +SET aqo.mode = 'learn'; +SET aqo.show_details = 'true'; -- show AQO info for each node and entire query. +SET aqo.show_hash = 'false'; -- a hash value is system-depended. Ignore it. + +DO $d$ + BEGIN + EXECUTE $$CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; + END; +$d$; + +CREATE USER MAPPING FOR PUBLIC SERVER loopback; + +CREATE TABLE local (x int); +CREATE FOREIGN TABLE frgn(x int) SERVER loopback OPTIONS (table_name 'local'); +INSERT INTO frgn (x) VALUES (1); +ANALYZE local; + +-- Trivial foreign scan.s +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT x FROM frgn; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT x FROM frgn; + +-- Push down base filters. Use verbose mode to see filters. +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE)) +SELECT x FROM frgn WHERE x < 10; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE) +SELECT x FROM frgn WHERE x < 10; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT x FROM frgn WHERE x < -10; -- AQO ignores constants + +-- Trivial JOIN push-down. +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM frgn AS a, frgn AS b WHERE a.x=b.x; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF, VERBOSE) +SELECT * FROM frgn AS a, frgn AS b WHERE a.x=b.x; + +-- TODO: Non-mergejoinable join condition. +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM frgn AS a, frgn AS b WHERE a.xheapRelation->rd_att, - &TTSOpsBufferHeapTuple); - find_ok = index_getnext_slot(query_index_scan, ForwardScanDirection, slot); + index_rescan(scan, &key, 1, NULL, 0); + slot = MakeSingleTupleTableSlot(hrel->rd_att, &TTSOpsBufferHeapTuple); + find_ok = index_getnext_slot(scan, ForwardScanDirection, slot); if (find_ok) { tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); Assert(shouldFree != true); - heap_deform_tuple(tuple, aqo_queries_heap->rd_att, - search_values, search_nulls); + heap_deform_tuple(tuple, hrel->rd_att, search_values, search_nulls); } ExecDropSingleTupleTableSlot(slot); - index_endscan(query_index_scan); - index_close(query_index_rel, lockmode); - table_close(aqo_queries_heap, lockmode); + index_endscan(scan); + index_close(irel, AccessShareLock); + table_close(hrel, AccessShareLock); return find_ok; } /* - * Creates entry for new query in aqo_queries table with given fields. - * Returns false if the operation failed, true otherwise. + * Update query status in intelligent mode. + * + * Do it gently: to prevent possible deadlocks, revert this update if any + * concurrent transaction is doing it. + * + * Such logic is possible, because this update is performed by AQO itself. It is + * not break any learning logic besides possible additional learning iterations. */ bool -add_query(int query_hash, bool learn_aqo, bool use_aqo, - int fspace_hash, bool auto_tuning) -{ - RangeVar *aqo_queries_table_rv; - Relation aqo_queries_heap; - HeapTuple tuple; - - LOCKMODE lockmode = RowExclusiveLock; - - Datum values[5]; - bool nulls[5] = {false, false, false, false, false}; - - Relation query_index_rel; - Oid query_index_rel_oid; - - values[0] = Int32GetDatum(query_hash); - values[1] = BoolGetDatum(learn_aqo); - values[2] = BoolGetDatum(use_aqo); - values[3] = Int32GetDatum(fspace_hash); - values[4] = BoolGetDatum(auto_tuning); - - query_index_rel_oid = RelnameGetRelid("aqo_queries_query_hash_idx"); - if (!OidIsValid(query_index_rel_oid)) - { - disable_aqo_for_query(); - return false; - } - query_index_rel = index_open(query_index_rel_oid, lockmode); - - aqo_queries_table_rv = makeRangeVar("public", "aqo_queries", -1); - aqo_queries_heap = table_openrv(aqo_queries_table_rv, lockmode); - - tuple = heap_form_tuple(RelationGetDescr(aqo_queries_heap), - values, nulls); - PG_TRY(); - { - simple_heap_insert(aqo_queries_heap, tuple); - my_index_insert(query_index_rel, - values, nulls, - &(tuple->t_self), - aqo_queries_heap, - UNIQUE_CHECK_YES); - } - PG_CATCH(); - { - /* - * Main goal is to catch deadlock errors during the index insertion. - */ - CommandCounterIncrement(); - simple_heap_delete(aqo_queries_heap, &(tuple->t_self)); - PG_RE_THROW(); - } - PG_END_TRY(); - - index_close(query_index_rel, lockmode); - table_close(aqo_queries_heap, lockmode); - - CommandCounterIncrement(); - - return true; -} - -bool -update_query(int query_hash, bool learn_aqo, bool use_aqo, - int fspace_hash, bool auto_tuning) +update_query(int qhash, int fhash, + bool learn_aqo, bool use_aqo, bool auto_tuning) { - RangeVar *aqo_queries_table_rv; - Relation aqo_queries_heap; + RangeVar *rv; + Relation hrel; + Relation irel; + TupleTableSlot *slot; HeapTuple tuple, nw_tuple; - - TupleTableSlot *slot; - bool shouldFree; - bool find_ok = false; - bool update_indexes; - - LOCKMODE lockmode = RowExclusiveLock; - - Relation query_index_rel; - Oid query_index_rel_oid; - IndexScanDesc query_index_scan; - ScanKeyData key; - Datum values[5]; bool isnull[5] = { false, false, false, false, false }; bool replace[5] = { false, true, true, true, true }; + bool shouldFree; + bool result = true; + bool update_indexes; + Oid reloid; + IndexScanDesc scan; + ScanKeyData key; + SnapshotData snap; - query_index_rel_oid = RelnameGetRelid("aqo_queries_query_hash_idx"); - if (!OidIsValid(query_index_rel_oid)) + reloid = RelnameGetRelid("aqo_queries_query_hash_idx"); + if (!OidIsValid(reloid)) { disable_aqo_for_query(); return false; } - aqo_queries_table_rv = makeRangeVar("public", "aqo_queries", -1); - aqo_queries_heap = table_openrv(aqo_queries_table_rv, lockmode); - - query_index_rel = index_open(query_index_rel_oid, lockmode); - query_index_scan = index_beginscan(aqo_queries_heap, - query_index_rel, - SnapshotSelf, - 1, - 0); - - ScanKeyInit(&key, - 1, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(query_hash)); - - index_rescan(query_index_scan, &key, 1, NULL, 0); - slot = MakeSingleTupleTableSlot(query_index_scan->heapRelation->rd_att, - &TTSOpsBufferHeapTuple); - find_ok = index_getnext_slot(query_index_scan, ForwardScanDirection, slot); - Assert(find_ok); - tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); - Assert(shouldFree != true); - - heap_deform_tuple(tuple, aqo_queries_heap->rd_att, - values, isnull); + rv = makeRangeVar("public", "aqo_queries", -1); + hrel = table_openrv(rv, RowExclusiveLock); + irel = index_open(reloid, RowExclusiveLock); + + /* + * Start an index scan. Use dirty snapshot to check concurrent updates that + * can be made before, but still not visible. + */ + InitDirtySnapshot(snap); + scan = index_beginscan(hrel, irel, &snap, 1, 0); + ScanKeyInit(&key, 1, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(qhash)); + index_rescan(scan, &key, 1, NULL, 0); + slot = MakeSingleTupleTableSlot(hrel->rd_att, &TTSOpsBufferHeapTuple); + + values[0] = Int32GetDatum(qhash); values[1] = BoolGetDatum(learn_aqo); values[2] = BoolGetDatum(use_aqo); - values[3] = Int32GetDatum(fspace_hash); + values[3] = Int32GetDatum(fhash); values[4] = BoolGetDatum(auto_tuning); - nw_tuple = heap_modify_tuple(tuple, aqo_queries_heap->rd_att, - values, isnull, replace); - if (my_simple_heap_update(aqo_queries_heap, &(nw_tuple->t_self), nw_tuple, - &update_indexes)) + if (!index_getnext_slot(scan, ForwardScanDirection, slot)) + { + /* New tuple for the ML knowledge base */ + tuple = heap_form_tuple(RelationGetDescr(hrel), values, isnull); + simple_heap_insert(hrel, tuple); + my_index_insert(irel, values, isnull, &(tuple->t_self), + hrel, UNIQUE_CHECK_YES); + } + else if (!TransactionIdIsValid(snap.xmin) && + !TransactionIdIsValid(snap.xmax)) { - if (update_indexes) - my_index_insert(query_index_rel, values, isnull, - &(nw_tuple->t_self), - aqo_queries_heap, UNIQUE_CHECK_YES); + /* + * Update existed data. No one concurrent transaction doesn't update this + * right now. + */ + tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + Assert(shouldFree != true); + nw_tuple = heap_modify_tuple(tuple, hrel->rd_att, values, isnull, replace); + + if (my_simple_heap_update(hrel, &(nw_tuple->t_self), nw_tuple, + &update_indexes)) + { + if (update_indexes) + my_index_insert(irel, values, isnull, + &(nw_tuple->t_self), + hrel, UNIQUE_CHECK_YES); + result = true; + } + else + { + /* + * Ooops, somebody concurrently updated the tuple. It is possible + * only in the case of changes made by third-party code. + */ + elog(ERROR, "AQO feature space data for signature (%d, %d) concurrently" + " updated by a stranger backend.", + qhash, fhash); + result = false; + } } else { /* - * Ooops, somebody concurrently updated the tuple. We have to merge - * our changes somehow, but now we just discard ours. We don't believe - * in high probability of simultaneously finishing of two long, - * complex, and important queries, so we don't loss important data. + * Concurrent update was made. To prevent deadlocks refuse to update. */ + result = false; } ExecDropSingleTupleTableSlot(slot); - index_endscan(query_index_scan); - index_close(query_index_rel, lockmode); - table_close(aqo_queries_heap, lockmode); + index_endscan(scan); + index_close(irel, RowExclusiveLock); + table_close(hrel, RowExclusiveLock); CommandCounterIncrement(); - - return true; + return result; } /* @@ -281,64 +215,39 @@ update_query(int query_hash, bool learn_aqo, bool use_aqo, * Returns false if the operation failed, true otherwise. */ bool -add_query_text(int query_hash, const char *query_text) +add_query_text(int qhash, const char *query_text) { - RangeVar *aqo_query_texts_table_rv; - Relation aqo_query_texts_heap; + RangeVar *rv; + Relation hrel; + Relation irel; HeapTuple tuple; - - LOCKMODE lockmode = RowExclusiveLock; - Datum values[2]; bool isnull[2] = {false, false}; + Oid reloid; - Relation query_index_rel; - Oid query_index_rel_oid; - - values[0] = Int32GetDatum(query_hash); + values[0] = Int32GetDatum(qhash); values[1] = CStringGetTextDatum(query_text); - query_index_rel_oid = RelnameGetRelid("aqo_query_texts_query_hash_idx"); - if (!OidIsValid(query_index_rel_oid)) + reloid = RelnameGetRelid("aqo_query_texts_query_hash_idx"); + if (!OidIsValid(reloid)) { disable_aqo_for_query(); return false; } - query_index_rel = index_open(query_index_rel_oid, lockmode); - - aqo_query_texts_table_rv = makeRangeVar("public", - "aqo_query_texts", - -1); - aqo_query_texts_heap = table_openrv(aqo_query_texts_table_rv, - lockmode); - tuple = heap_form_tuple(RelationGetDescr(aqo_query_texts_heap), - values, isnull); + rv = makeRangeVar("public", "aqo_query_texts", -1); + hrel = table_openrv(rv, RowExclusiveLock); + irel = index_open(reloid, RowExclusiveLock); + tuple = heap_form_tuple(RelationGetDescr(hrel), values, isnull); - PG_TRY(); - { - simple_heap_insert(aqo_query_texts_heap, tuple); - my_index_insert(query_index_rel, - values, isnull, - &(tuple->t_self), - aqo_query_texts_heap, - UNIQUE_CHECK_YES); - } - PG_CATCH(); - { - CommandCounterIncrement(); - simple_heap_delete(aqo_query_texts_heap, &(tuple->t_self)); - index_close(query_index_rel, lockmode); - table_close(aqo_query_texts_heap, lockmode); - PG_RE_THROW(); - } - PG_END_TRY(); + simple_heap_insert(hrel, tuple); + my_index_insert(irel, values, isnull, &(tuple->t_self), hrel, + UNIQUE_CHECK_YES); - index_close(query_index_rel, lockmode); - table_close(aqo_query_texts_heap, lockmode); + index_close(irel, RowExclusiveLock); + table_close(hrel, RowExclusiveLock); CommandCounterIncrement(); - return true; } @@ -357,67 +266,52 @@ add_query_text(int query_hash, const char *query_text) * objects in the given feature space */ bool -load_fss(int fss_hash, int ncols, double **matrix, double *targets, int *rows) +load_fss(int fhash, int fss_hash, + int ncols, double **matrix, double *targets, int *rows) { - RangeVar *aqo_data_table_rv; - Relation aqo_data_heap; + RangeVar *rv; + Relation hrel; + Relation irel; HeapTuple tuple; TupleTableSlot *slot; bool shouldFree; bool find_ok = false; - - Relation data_index_rel; - Oid data_index_rel_oid; - IndexScanDesc data_index_scan; + Oid reloid; + IndexScanDesc scan; ScanKeyData key[2]; - - LOCKMODE lockmode = AccessShareLock; - Datum values[5]; bool isnull[5]; - bool success = true; - data_index_rel_oid = RelnameGetRelid("aqo_fss_access_idx"); - if (!OidIsValid(data_index_rel_oid)) + reloid = RelnameGetRelid("aqo_fss_access_idx"); + if (!OidIsValid(reloid)) { disable_aqo_for_query(); return false; } - aqo_data_table_rv = makeRangeVar("public", "aqo_data", -1); - aqo_data_heap = table_openrv(aqo_data_table_rv, lockmode); + rv = makeRangeVar("public", "aqo_data", -1); + hrel = table_openrv(rv, AccessShareLock); + irel = index_open(reloid, AccessShareLock); + scan = index_beginscan(hrel, irel, SnapshotSelf, 2, 0); - data_index_rel = index_open(data_index_rel_oid, lockmode); - data_index_scan = index_beginscan(aqo_data_heap, - data_index_rel, - SnapshotSelf, - 2, - 0); + ScanKeyInit(&key[0], 1, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(fhash)); + ScanKeyInit(&key[1], 2, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(fss_hash)); + index_rescan(scan, key, 2, NULL, 0); - ScanKeyInit(&key[0], - 1, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(query_context.fspace_hash)); + slot = MakeSingleTupleTableSlot(hrel->rd_att, &TTSOpsBufferHeapTuple); + find_ok = index_getnext_slot(scan, ForwardScanDirection, slot); - ScanKeyInit(&key[1], - 2, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(fss_hash)); - - index_rescan(data_index_scan, key, 2, NULL, 0); - - slot = MakeSingleTupleTableSlot(data_index_scan->heapRelation->rd_att, - &TTSOpsBufferHeapTuple); - find_ok = index_getnext_slot(data_index_scan, ForwardScanDirection, slot); - - if (find_ok) + if (matrix == NULL && targets == NULL && rows == NULL) + { + /* Just check availability */ + success = find_ok; + } + else if (find_ok) { tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); Assert(shouldFree != true); - heap_deform_tuple(tuple, aqo_data_heap->rd_att, values, isnull); + heap_deform_tuple(tuple, hrel->rd_att, values, isnull); if (DatumGetInt32(values[2]) == ncols) { @@ -430,21 +324,17 @@ load_fss(int fss_hash, int ncols, double **matrix, double *targets, int *rows) deform_vector(values[4], targets, rows); } else - { - elog(WARNING, "unexpected number of features for hash (%d, %d):\ + elog(ERROR, "unexpected number of features for hash (%d, %d):\ expected %d features, obtained %d", - query_context.fspace_hash, - fss_hash, ncols, DatumGetInt32(values[2])); - success = false; - } + fhash, fss_hash, ncols, DatumGetInt32(values[2])); } else success = false; ExecDropSingleTupleTableSlot(slot); - index_endscan(data_index_scan); - index_close(data_index_rel, lockmode); - table_close(aqo_data_heap, lockmode); + index_endscan(scan); + index_close(irel, AccessShareLock); + table_close(hrel, AccessShareLock); return success; } @@ -453,76 +343,64 @@ load_fss(int fss_hash, int ncols, double **matrix, double *targets, int *rows) * Updates the specified line in the specified feature subspace. * Returns false if the operation failed, true otherwise. * - * 'fss_hash' specifies the feature subspace - * 'nrows' x 'ncols' is the shape of 'matrix' - * 'targets' is vector of size 'nrows' + * 'fss_hash' specifies the feature subspace 'nrows' x 'ncols' is the shape + * of 'matrix' 'targets' is vector of size 'nrows' + * + * Necessary to prevent waiting for another transaction to commit in index + * insertion or heap update. + * + * Caller guaranteed that no one AQO process insert or update this data row. */ bool -update_fss(int fss_hash, int nrows, int ncols, double **matrix, double *targets) +update_fss(int fhash, int fsshash, int nrows, int ncols, + double **matrix, double *targets) { - RangeVar *aqo_data_table_rv; - Relation aqo_data_heap; - TupleDesc tuple_desc; + RangeVar *rv; + Relation hrel; + Relation irel; + SnapshotData snap; + TupleTableSlot *slot; + TupleDesc tupDesc; HeapTuple tuple, nw_tuple; - - TupleTableSlot *slot; + Datum values[5]; + bool isnull[5] = { false, false, false, false, false }; + bool replace[5] = { false, false, false, true, true }; bool shouldFree; bool find_ok = false; bool update_indexes; - - LOCKMODE lockmode = RowExclusiveLock; - - Relation data_index_rel; - Oid data_index_rel_oid; - IndexScanDesc data_index_scan; + Oid reloid; + IndexScanDesc scan; ScanKeyData key[2]; + bool result = true; - Datum values[5]; - bool isnull[5] = { false, false, false, false, false }; - bool replace[5] = { false, false, false, true, true }; - - data_index_rel_oid = RelnameGetRelid("aqo_fss_access_idx"); - if (!OidIsValid(data_index_rel_oid)) + reloid = RelnameGetRelid("aqo_fss_access_idx"); + if (!OidIsValid(reloid)) { disable_aqo_for_query(); return false; } - aqo_data_table_rv = makeRangeVar("public", "aqo_data", -1); - aqo_data_heap = table_openrv(aqo_data_table_rv, lockmode); + rv = makeRangeVar("public", "aqo_data", -1); + hrel = table_openrv(rv, RowExclusiveLock); + irel = index_open(reloid, RowExclusiveLock); + tupDesc = RelationGetDescr(hrel); - tuple_desc = RelationGetDescr(aqo_data_heap); + InitDirtySnapshot(snap); + scan = index_beginscan(hrel, irel, &snap, 2, 0); - data_index_rel = index_open(data_index_rel_oid, lockmode); - data_index_scan = index_beginscan(aqo_data_heap, - data_index_rel, - SnapshotSelf, - 2, - 0); + ScanKeyInit(&key[0], 1, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(fhash)); + ScanKeyInit(&key[1], 2, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(fsshash)); - ScanKeyInit(&key[0], - 1, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(query_context.fspace_hash)); + index_rescan(scan, key, 2, NULL, 0); - ScanKeyInit(&key[1], - 2, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(fss_hash)); - - index_rescan(data_index_scan, key, 2, NULL, 0); - - slot = MakeSingleTupleTableSlot(data_index_scan->heapRelation->rd_att, - &TTSOpsBufferHeapTuple); - find_ok = index_getnext_slot(data_index_scan, ForwardScanDirection, slot); + slot = MakeSingleTupleTableSlot(tupDesc, &TTSOpsBufferHeapTuple); + find_ok = index_getnext_slot(scan, ForwardScanDirection, slot); if (!find_ok) { - values[0] = Int32GetDatum(query_context.fspace_hash); - values[1] = Int32GetDatum(fss_hash); + values[0] = Int32GetDatum(fhash); + values[1] = Int32GetDatum(fsshash); values[2] = Int32GetDatum(ncols); if (ncols> 0) @@ -531,26 +409,22 @@ update_fss(int fss_hash, int nrows, int ncols, double **matrix, double *targets) isnull[3] = true; values[4] = PointerGetDatum(form_vector(targets, nrows)); - tuple = heap_form_tuple(tuple_desc, values, isnull); - PG_TRY(); - { - simple_heap_insert(aqo_data_heap, tuple); - my_index_insert(data_index_rel, values, isnull, &(tuple->t_self), - aqo_data_heap, UNIQUE_CHECK_YES); - } - PG_CATCH(); - { - CommandCounterIncrement(); - simple_heap_delete(aqo_data_heap, &(tuple->t_self)); - PG_RE_THROW(); - } - PG_END_TRY(); + tuple = heap_form_tuple(tupDesc, values, isnull); + + /* + * Don't use PG_TRY() section because of dirty snapshot and caller atomic + * prerequisities guarantees to us that no one concurrent insertion can + * exists. + */ + simple_heap_insert(hrel, tuple); + my_index_insert(irel, values, isnull, &(tuple->t_self), + hrel, UNIQUE_CHECK_YES); } - else + else if (!TransactionIdIsValid(snap.xmin) && !TransactionIdIsValid(snap.xmax)) { tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); Assert(shouldFree != true); - heap_deform_tuple(tuple, aqo_data_heap->rd_att, values, isnull); + heap_deform_tuple(tuple, hrel->rd_att, values, isnull); if (ncols> 0) values[3] = PointerGetDatum(form_matrix(matrix, nrows, ncols)); @@ -558,36 +432,44 @@ update_fss(int fss_hash, int nrows, int ncols, double **matrix, double *targets) isnull[3] = true; values[4] = PointerGetDatum(form_vector(targets, nrows)); - nw_tuple = heap_modify_tuple(tuple, tuple_desc, + nw_tuple = heap_modify_tuple(tuple, tupDesc, values, isnull, replace); - if (my_simple_heap_update(aqo_data_heap, &(nw_tuple->t_self), nw_tuple, + if (my_simple_heap_update(hrel, &(nw_tuple->t_self), nw_tuple, &update_indexes)) { if (update_indexes) - my_index_insert(data_index_rel, values, isnull, + my_index_insert(irel, values, isnull, &(nw_tuple->t_self), - aqo_data_heap, UNIQUE_CHECK_YES); + hrel, UNIQUE_CHECK_YES); + result = true; } else { /* - * Ooops, somebody concurrently updated the tuple. We have to - * merge our changes somehow, but now we just discard ours. We - * don't believe in high probability of simultaneously finishing - * of two long, complex, and important queries, so we don't loss - * important data. + * Ooops, somebody concurrently updated the tuple. It is possible + * only in the case of changes made by third-party code. */ + elog(ERROR, "AQO data piece (%d %d) concurrently updated" + " by a stranger backend.", + fhash, fsshash); + result = false; } } + else + { + /* + * Concurrent update was made. To prevent deadlocks refuse to update. + */ + result = false; + } ExecDropSingleTupleTableSlot(slot); - index_endscan(data_index_scan); - index_close(data_index_rel, lockmode); - table_close(aqo_data_heap, lockmode); + index_endscan(scan); + index_close(irel, RowExclusiveLock); + table_close(hrel, RowExclusiveLock); CommandCounterIncrement(); - - return true; + return result; } /* @@ -597,62 +479,43 @@ update_fss(int fss_hash, int nrows, int ncols, double **matrix, double *targets) * is not found. */ QueryStat * -get_aqo_stat(int query_hash) +get_aqo_stat(int qhash) { - RangeVar *aqo_stat_table_rv; - Relation aqo_stat_heap; - HeapTuple tuple; - LOCKMODE heap_lock = AccessShareLock; - - Relation stat_index_rel; - Oid stat_index_rel_oid; - IndexScanDesc stat_index_scan; + RangeVar *rv; + Relation hrel; + Relation irel; + TupleTableSlot *slot; + Oid reloid; + IndexScanDesc scan; ScanKeyData key; - LOCKMODE index_lock = AccessShareLock; - - Datum values[9]; - bool nulls[9]; - QueryStat *stat = palloc_query_stat(); - - TupleTableSlot *slot; bool shouldFree; - bool find_ok = false; - stat_index_rel_oid = RelnameGetRelid("aqo_query_stat_idx"); - if (!OidIsValid(stat_index_rel_oid)) + reloid = RelnameGetRelid("aqo_query_stat_idx"); + if (!OidIsValid(reloid)) { disable_aqo_for_query(); return NULL; } - aqo_stat_table_rv = makeRangeVar("public", "aqo_query_stat", -1); - aqo_stat_heap = table_openrv(aqo_stat_table_rv, heap_lock); - - stat_index_rel = index_open(stat_index_rel_oid, index_lock); - stat_index_scan = index_beginscan(aqo_stat_heap, - stat_index_rel, - SnapshotSelf, - 1, - 0); - - ScanKeyInit(&key, - 1, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(query_hash)); + rv = makeRangeVar("public", "aqo_query_stat", -1); + hrel = table_openrv(rv, AccessShareLock); + irel = index_open(reloid, AccessShareLock); - index_rescan(stat_index_scan, &key, 1, NULL, 0); + scan = index_beginscan(hrel, irel, SnapshotSelf, 1, 0); + ScanKeyInit(&key, 1, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(qhash)); + index_rescan(scan, &key, 1, NULL, 0); + slot = MakeSingleTupleTableSlot(hrel->rd_att, &TTSOpsBufferHeapTuple); - slot = MakeSingleTupleTableSlot(stat_index_scan->heapRelation->rd_att, - &TTSOpsBufferHeapTuple); - find_ok = index_getnext_slot(stat_index_scan, ForwardScanDirection, slot); - - if (find_ok) + if (index_getnext_slot(scan, ForwardScanDirection, slot)) { + HeapTuple tuple; + Datum values[9]; + bool nulls[9]; + tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); Assert(shouldFree != true); - heap_deform_tuple(tuple, aqo_stat_heap->rd_att, values, nulls); + heap_deform_tuple(tuple, hrel->rd_att, values, nulls); DeformVectorSz(values[1], stat->execution_time_with_aqo); DeformVectorSz(values[2], stat->execution_time_without_aqo); @@ -666,10 +529,9 @@ get_aqo_stat(int query_hash) } ExecDropSingleTupleTableSlot(slot); - index_endscan(stat_index_scan); - index_close(stat_index_rel, index_lock); - table_close(aqo_stat_heap, heap_lock); - + index_endscan(scan); + index_close(irel, AccessShareLock); + table_close(hrel, AccessShareLock); return stat; } @@ -678,26 +540,16 @@ get_aqo_stat(int query_hash) * Executes disable_aqo_for_query if aqo_query_stat is not found. */ void -update_aqo_stat(int query_hash, QueryStat *stat) +update_aqo_stat(int qhash, QueryStat *stat) { - RangeVar *aqo_stat_table_rv; - Relation aqo_stat_heap; + RangeVar *rv; + Relation hrel; + Relation irel; + SnapshotData snap; + TupleTableSlot *slot; + TupleDesc tupDesc; HeapTuple tuple, nw_tuple; - TupleDesc tuple_desc; - - TupleTableSlot *slot; - bool shouldFree; - bool find_ok = false; - bool update_indexes; - - LOCKMODE lockmode = RowExclusiveLock; - - Relation stat_index_rel; - Oid stat_index_rel_oid; - IndexScanDesc stat_index_scan; - ScanKeyData key; - Datum values[9]; bool isnull[9] = { false, false, false, false, false, false, @@ -705,37 +557,29 @@ update_aqo_stat(int query_hash, QueryStat *stat) bool replace[9] = { false, true, true, true, true, true, true, true, true }; + bool shouldFree; + bool update_indexes; + Oid reloid; + IndexScanDesc scan; + ScanKeyData key; - stat_index_rel_oid = RelnameGetRelid("aqo_query_stat_idx"); - if (!OidIsValid(stat_index_rel_oid)) + reloid = RelnameGetRelid("aqo_query_stat_idx"); + if (!OidIsValid(reloid)) { disable_aqo_for_query(); return; } - aqo_stat_table_rv = makeRangeVar("public", "aqo_query_stat", -1); - aqo_stat_heap = table_openrv(aqo_stat_table_rv, lockmode); - - tuple_desc = RelationGetDescr(aqo_stat_heap); - - stat_index_rel = index_open(stat_index_rel_oid, lockmode); - stat_index_scan = index_beginscan(aqo_stat_heap, - stat_index_rel, - SnapshotSelf, - 1, - 0); + rv = makeRangeVar("public", "aqo_query_stat", -1); + hrel = table_openrv(rv, RowExclusiveLock); + irel = index_open(reloid, RowExclusiveLock); + tupDesc = RelationGetDescr(hrel); - ScanKeyInit(&key, - 1, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(query_hash)); - - index_rescan(stat_index_scan, &key, 1, NULL, 0); - - slot = MakeSingleTupleTableSlot(stat_index_scan->heapRelation->rd_att, - &TTSOpsBufferHeapTuple); - find_ok = index_getnext_slot(stat_index_scan, ForwardScanDirection, slot); + InitDirtySnapshot(snap); + scan = index_beginscan(hrel, irel, &snap, 1, 0); + ScanKeyInit(&key, 1, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(qhash)); + index_rescan(scan, &key, 1, NULL, 0); + slot = MakeSingleTupleTableSlot(hrel->rd_att, &TTSOpsBufferHeapTuple); /*values[0] will be initialized later */ values[1] = PointerGetDatum(FormVectorSz(stat->execution_time_with_aqo)); @@ -748,57 +592,53 @@ update_aqo_stat(int query_hash, QueryStat *stat) values[7] = Int64GetDatum(stat->executions_with_aqo); values[8] = Int64GetDatum(stat->executions_without_aqo); - if (!find_ok) + if (!index_getnext_slot(scan, ForwardScanDirection, slot)) { - values[0] = Int32GetDatum(query_hash); - tuple = heap_form_tuple(tuple_desc, values, isnull); - PG_TRY(); - { - simple_heap_insert(aqo_stat_heap, tuple); - my_index_insert(stat_index_rel, values, isnull, &(tuple->t_self), - aqo_stat_heap, UNIQUE_CHECK_YES); - } - PG_CATCH(); - { - CommandCounterIncrement(); - simple_heap_delete(aqo_stat_heap, &(tuple->t_self)); - PG_RE_THROW(); - } - PG_END_TRY(); + /* Such signature (hash) doesn't yet exist in the ML knowledge base. */ + values[0] = Int32GetDatum(qhash); + tuple = heap_form_tuple(tupDesc, values, isnull); + simple_heap_insert(hrel, tuple); + my_index_insert(irel, values, isnull, &(tuple->t_self), + hrel, UNIQUE_CHECK_YES); } - else + else if (!TransactionIdIsValid(snap.xmin) && !TransactionIdIsValid(snap.xmax)) { + /* Need to update ML data row and no one backend concurrently doing it. */ tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); Assert(shouldFree != true); - values[0] = heap_getattr(tuple, 1, - RelationGetDescr(aqo_stat_heap), &isnull[0]); - nw_tuple = heap_modify_tuple(tuple, tuple_desc, - values, isnull, replace); - if (my_simple_heap_update(aqo_stat_heap, &(nw_tuple->t_self), nw_tuple, + values[0] = heap_getattr(tuple, 1, tupDesc, &isnull[0]); + nw_tuple = heap_modify_tuple(tuple, tupDesc, values, isnull, replace); + if (my_simple_heap_update(hrel, &(nw_tuple->t_self), nw_tuple, &update_indexes)) { /* NOTE: insert index tuple iff heap update succeeded! */ if (update_indexes) - my_index_insert(stat_index_rel, values, isnull, + my_index_insert(irel, values, isnull, &(nw_tuple->t_self), - aqo_stat_heap, UNIQUE_CHECK_YES); + hrel, UNIQUE_CHECK_YES); } else { /* - * Ooops, somebody concurrently updated the tuple. We have to - * merge our changes somehow, but now we just discard ours. We - * don't believe in high probability of simultaneously finishing - * of two long, complex, and important queries, so we don't loss - * important data. + * Ooops, somebody concurrently updated the tuple. It is possible + * only in the case of changes made by third-party code. */ + elog(ERROR, "AQO statistic data for query signature %d concurrently" + " updated by a stranger backend.", + qhash); } } + else + { + /* + * Concurrent update was made. To prevent deadlocks refuse to update. + */ + } ExecDropSingleTupleTableSlot(slot); - index_endscan(stat_index_scan); - index_close(stat_index_rel, lockmode); - table_close(aqo_stat_heap, lockmode); + index_endscan(scan); + index_close(irel, RowExclusiveLock); + table_close(hrel, RowExclusiveLock); CommandCounterIncrement(); } @@ -951,7 +791,7 @@ my_simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup, /* Provides correct insert in both PostgreQL 9.6.X and 10.X.X */ -static bool +bool my_index_insert(Relation indexRelation, Datum *values, bool *isnull, ItemPointer heap_t_ctid, @@ -965,10 +805,14 @@ my_index_insert(Relation indexRelation, #if PG_VERSION_NUM < 100000 return index_insert(indexRelation, values, isnull, heap_t_ctid, heapRelation, checkUnique); -#else +#elif PG_VERSION_NUM < 140000 return index_insert(indexRelation, values, isnull, heap_t_ctid, heapRelation, checkUnique, BuildIndexInfo(indexRelation)); +#else + return index_insert(indexRelation, values, isnull, heap_t_ctid, + heapRelation, checkUnique, false, + BuildIndexInfo(indexRelation)); #endif } diff --git a/t/001_pgbench.pl b/t/001_pgbench.pl new file mode 100644 index 00000000..fcb7f3fd --- /dev/null +++ b/t/001_pgbench.pl @@ -0,0 +1,48 @@ +use strict; +use warnings; +use TestLib; +use Test::More tests => 6; +use PostgresNode; + +my $node = get_new_node('aqotest'); +$node->init; +$node->append_conf('postgresql.conf', qq{ + shared_preload_libraries = 'aqo' + aqo.mode = 'intelligent' + aqo.log_ignorance = 'on' + }); + +#my $result1; + +$node->start(); + +# Check conflicts of accessing to the ML knowledge base +# intelligent mode +$node->safe_psql('postgres', "CREATE EXTENSION aqo"); +$node->safe_psql('postgres', "ALTER SYSTEM SET aqo.mode = 'intelligent'"); +$node->command_ok([ 'pgbench', '-i', '-s', '1' ], 'init pgbench tables'); +$node->command_ok([ 'pgbench', '-t', "1000", '-c', "20", '-j', "20" ], + 'pgbench in intelligent mode'); + +$node->safe_psql('postgres', "ALTER SYSTEM SET aqo.mode = 'controlled'"); +$node->command_ok([ 'pgbench', '-t', "1000", '-c', "20", '-j', "20" ], + 'pgbench in controlled mode'); + +$node->safe_psql('postgres', "ALTER SYSTEM SET aqo.mode = 'disabled'"); +$node->command_ok([ 'pgbench', '-t', "1000", '-c', "20", '-j', "20" ], + 'pgbench in disabled mode'); + +$node->safe_psql('postgres', "DROP EXTENSION aqo"); +$node->safe_psql('postgres', "CREATE EXTENSION aqo"); + +$node->safe_psql('postgres', "ALTER SYSTEM SET aqo.mode = 'learn'"); +$node->command_ok([ 'pgbench', '-t', "1000", '-c', "20", '-j', "20" ], + 'pgbench in learn mode'); + +$node->safe_psql('postgres', "ALTER SYSTEM SET aqo.mode = 'frozen'"); +$node->command_ok([ 'pgbench', '-t', "1000", '-c', "20", '-j', "20" ], + 'pgbench in frozen mode'); + +$node->safe_psql('postgres', "DROP EXTENSION aqo"); + +$node->stop(); diff --git a/utils.c b/utils.c index 1ae45abe..62e6d122 100644 --- a/utils.c +++ b/utils.c @@ -5,7 +5,7 @@ * ******************************************************************************* * - * Copyright (c) 2016-2020, Postgres Professional + * Copyright (c) 2016-2021, Postgres Professional * * IDENTIFICATION * aqo/utils.c

AltStyle によって変換されたページ (->オリジナル) /