From 44e8ff8be51bcf597c58ca4c184b6924715610c1 Mon Sep 17 00:00:00 2001 From: gayyappan Date: Wed, 18 Dec 2024 13:57:58 -0500 Subject: [PATCH 1/4] Refresh cagg uses min value for dimension when start_time is NULL When the refresh_continuous_aggregate window's start is NULL, use the min value in the hypertable to determine the beginning of the range instead of the min value for the partition column. We do this only if enable_tiered_reads is set to false. --- src/guc.h | 2 +- src/hypertable.c | 61 +++++++++++++++++ src/hypertable.h | 2 + src/ts_catalog/continuous_agg.c | 16 +++++ src/ts_catalog/continuous_agg.h | 3 + tsl/src/continuous_aggs/refresh.c | 107 ++++++++++++++++++++++++++++-- 6 files changed, 184 insertions(+), 7 deletions(-) diff --git a/src/guc.h b/src/guc.h index 54a341b7a3b..7691be19850 100644 --- a/src/guc.h +++ b/src/guc.h @@ -44,7 +44,7 @@ extern bool ts_guc_enable_cagg_reorder_groupby; extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations; extern bool ts_guc_enable_now_constify; extern bool ts_guc_enable_foreign_key_propagation; -extern bool ts_guc_enable_osm_reads; +extern TSDLLEXPORT bool ts_guc_enable_osm_reads; #if PG16_GE extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown; #endif diff --git a/src/hypertable.c b/src/hypertable.c index ac3c2c5e765..f91255ebcf0 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -2402,6 +2402,67 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, return max_value; } +/* + * Get the min value of an open dimension. + */ +int64 +ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, bool *isnull) +{ + StringInfo command; + const Dimension *dim; + int res; + bool min_isnull; + Datum mindat; + Oid timetype; + + dim = hyperspace_get_open_dimension(ht->space, dimension_index); + + if (NULL == dim) + elog(ERROR, "invalid open dimension index %d", dimension_index); + + timetype = ts_dimension_get_partition_type(dim); + + /* + * Query for the oldest chunk in the hypertable. + * Anyway to optimize this better? + */ + command = makeStringInfo(); + appendStringInfo(command, + "SELECT pg_catalog.min(%s) FROM %s.%s", + quote_identifier(NameStr(dim->fd.column_name)), + quote_identifier(NameStr(ht->fd.schema_name)), + quote_identifier(NameStr(ht->fd.table_name))); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "could not connect to SPI"); + + res = SPI_execute(command->data, true /* read_only */, 0 /*count*/); + + if (res < 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + (errmsg("could not find the minimum time value for hypertable \"%s\"", + get_rel_name(ht->main_table_relid))))); + + Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype, + "partition types for result (%d) and dimension (%d) do not match", + SPI_gettypeid(SPI_tuptable->tupdesc, 1), + ts_dimension_get_partition_type(dim)); + mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull); + + if (isnull) + *isnull = min_isnull; + + int64 min_value = + min_isnull ? ts_time_get_min(timetype) : ts_time_value_to_internal(mindat, timetype); + + res = SPI_finish(); + if (res != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res)); + + return min_value; +} + bool ts_hypertable_has_compression_table(const Hypertable *ht) { diff --git a/src/hypertable.h b/src/hypertable.h index 9b961b801e3..4d36591dc1c 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -146,6 +146,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht, int64 compress_interval); extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull); +extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_min_value(const Hypertable *ht, + int dimension_index, bool *isnull); extern TSDLLEXPORT bool ts_hypertable_has_compression_table(const Hypertable *ht); extern TSDLLEXPORT void ts_hypertable_formdata_fill(FormData_hypertable *fd, const TupleInfo *ti); diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index a786b4bf44a..e59d90d3336 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -1587,6 +1587,22 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e *end = ts_time_value_to_internal(end_new, TIMESTAMPOID); } +int64 +ts_compute_circumscribed_bucketed_refresh_window_start_variable( + int64 start, const ContinuousAggsBucketFunction *bf) +{ + Datum start_old, start_new; + + /* + * It's OK to use TIMESTAMPOID here. + * See the comment in ts_compute_inscribed_bucketed_refresh_window_variable() + */ + start_old = ts_internal_to_time_value(start, TIMESTAMPOID); + start_new = generic_time_bucket(bf, start_old); + + return ts_time_value_to_internal(start_new, TIMESTAMPOID); +} + /* * Calculates the beginning of the next bucket. * diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h index 268c42488d3..6a343370573 100644 --- a/src/ts_catalog/continuous_agg.h +++ b/src/ts_catalog/continuous_agg.h @@ -215,3 +215,6 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg); extern TSDLLEXPORT int64 ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function); + +extern TSDLLEXPORT int64 ts_compute_circumscribed_bucketed_refresh_window_start_variable( + int64 start, const ContinuousAggsBucketFunction *bf); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index d5a9783e003..79378c42f06 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -53,6 +53,10 @@ static InternalTimeRange compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window, const ContinuousAggsBucketFunction *bucket_function); +static int64 +compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg, + const InternalTimeRange *const refresh_window, + const ContinuousAggsBucketFunction *bucket_function); static void continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg, const InternalTimeRange *refresh_window); static void continuous_agg_refresh_execute(const CaggRefreshState *refresh, @@ -372,6 +376,57 @@ compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg, return result; } +static int64 +compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg, + const InternalTimeRange *const refresh_window, + const ContinuousAggsBucketFunction *bucket_function) +{ + Assert(refresh_window != NULL); + Assert(bucket_function != NULL); + + if (bucket_function->bucket_fixed_interval == false) + { + InternalTimeRange result = *refresh_window; + result.start = + ts_compute_circumscribed_bucketed_refresh_window_start_variable(refresh_window->start, + bucket_function); + return result.start; + } + + /* Interval is fixed */ + int64 bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function); + Assert(bucket_width > 0); + + InternalTimeRange result = *refresh_window; + InternalTimeRange largest_bucketed_window = + get_largest_bucketed_window(refresh_window->type, bucket_width); + + /* Get offset and origin for bucket function */ + NullableDatum offset = INIT_NULL_DATUM; + NullableDatum origin = INIT_NULL_DATUM; + fill_bucket_offset_origin(cagg, refresh_window, &offset, &origin); + + /* Defined offset and origin in one function is not supported */ + Assert(offset.isnull == true || origin.isnull == true); + + if (refresh_window->start <= largest_bucketed_window.start) + { + result.start = largest_bucketed_window.start; + } + else + { + /* For alignment with a bucket, which includes the start of the refresh window, we just + * need to get start of the bucket. */ + result.start = ts_time_bucket_by_type_extended(bucket_width, + refresh_window->start, + refresh_window->type, + offset, + origin); + } + + return result.start; +} + /* * Initialize the refresh state for a continuous aggregate. * @@ -794,26 +849,61 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, * still take a long time and it is probably best for consistency to always * prevent transaction blocks. */ PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME); - - /* No bucketing when open ended */ - if (!(start_isnull && end_isnull)) + /* elog(NOTICE, + "refresh window INPUT IS (%ld %ld) (start_is_null %d)", + refresh_window.start, + refresh_window.end, + start_isnull); + */ + /* No bucketing when open ended . + * Special case: if we have OSM reads disabled, use bucketed start so that + * the refresh window reflects the data that is visible to the cagg. + */ + if (!(start_isnull && end_isnull) || (start_isnull && !ts_guc_enable_osm_reads)) { + int64 bucket_mints = 0; + bool min_isnull = true; + if (start_isnull && ts_guc_enable_osm_reads == false) + { + /*set refresh window start to min(ts) of raw hypertable as tiered + * data is not visible */ + int64 mints = 0; + InternalTimeRange tw = *refresh_window_arg; + const Hypertable *raw_ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id); + mints = ts_hypertable_get_open_dim_min_value(raw_ht, 0, &min_isnull); + tw.start = mints; + bucket_mints = + compute_circumscribed_refresh_window_start(cagg, &tw, cagg->bucket_function); + // elog(NOTICE, "got mints=%ld bucketmints=%ld ", mints, bucket_mints); + } if (cagg->bucket_function->bucket_fixed_interval == false) { - refresh_window = *refresh_window_arg; + if (!min_isnull) + refresh_window.start = bucket_mints; + ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start, &refresh_window.end, cagg->bucket_function); + // elog(NOTICE, "NOT FIXED INTERVAL adjusted (%ld %ld)", refresh_window.start, + // refresh_window.end); } else { + InternalTimeRange refresh_window_arg_copy = *refresh_window_arg; + if (!min_isnull) + refresh_window_arg_copy.start = bucket_mints; int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function); Assert(bucket_width > 0); refresh_window = - compute_inscribed_bucketed_refresh_window(cagg, refresh_window_arg, bucket_width); + compute_inscribed_bucketed_refresh_window(cagg, + (const InternalTimeRange + *) (&refresh_window_arg_copy), + bucket_width); + // elog(NOTICE, "FIXED INTERVAL orig(%ld %ld) adjusted (%ld %ld)", + // refresh_window_arg_copy.start, refresh_window_arg_copy.end, refresh_window.start, + // refresh_window.end); } } - if (refresh_window.start >= refresh_window.end) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -863,6 +953,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, (IS_TIMESTAMP_TYPE(refresh_window.type) && invalidation_threshold == ts_time_get_min(refresh_window.type))) { + elog(NOTICE, + "upto date (start=%ld end=%ld) threshold=%ld", + refresh_window.start, + refresh_window.end, + invalidation_threshold); emit_up_to_date_notice(cagg, callctx); /* Restore search_path */ From 7c4f736ebc13928addf872c6a3f446467ed9fb5d Mon Sep 17 00:00:00 2001 From: gayyappan Date: Tue, 14 Jan 2025 09:38:00 -0500 Subject: [PATCH 2/4] Get min value from dimension slice information instead of reading the whole table --- src/hypertable.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/hypertable.c b/src/hypertable.c index f91255ebcf0..5e87564cd6e 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -2403,7 +2403,8 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, } /* - * Get the min value of an open dimension. + * Get the min value of an open dimension for the hypertable based on the dimension slice info + * Note: only takes non-tiered chunks into account. */ int64 ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, bool *isnull) @@ -2415,6 +2416,14 @@ ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, Datum mindat; Oid timetype; + const char *query_str = " SELECT min(dimsl.range_start) FROM _timescaledb_catalog.chunk srcch \ + INNER JOIN _timescaledb_catalog.hypertable ht ON ht.id = srcch.hypertable_id \ + INNER JOIN _timescaledb_catalog.chunk_constraint chcons ON srcch.id = chcons.chunk_id \ + INNER JOIN _timescaledb_catalog.dimension dim ON srcch.hypertable_id = dim.hypertable_id \ + INNER JOIN _timescaledb_catalog.dimension_slice dimsl ON dim.id = dimsl.dimension_id \ + AND chcons.dimension_slice_id = dimsl.id \ + AND dimsl.id = %d and ht.id = %d AND srcch.osm_chunk = false"; + dim = hyperspace_get_open_dimension(ht->space, dimension_index); if (NULL == dim) @@ -2424,14 +2433,9 @@ ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, /* * Query for the oldest chunk in the hypertable. - * Anyway to optimize this better? */ command = makeStringInfo(); - appendStringInfo(command, - "SELECT pg_catalog.min(%s) FROM %s.%s", - quote_identifier(NameStr(dim->fd.column_name)), - quote_identifier(NameStr(ht->fd.schema_name)), - quote_identifier(NameStr(ht->fd.table_name))); + appendStringInfo(command, query_str, ht->fd.id, dim->fd.id); if (SPI_connect() != SPI_OK_CONNECT) elog(ERROR, "could not connect to SPI"); @@ -2444,17 +2448,13 @@ ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, (errmsg("could not find the minimum time value for hypertable \"%s\"", get_rel_name(ht->main_table_relid))))); - Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype, - "partition types for result (%d) and dimension (%d) do not match", - SPI_gettypeid(SPI_tuptable->tupdesc, 1), - ts_dimension_get_partition_type(dim)); mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull); if (isnull) *isnull = min_isnull; - int64 min_value = - min_isnull ? ts_time_get_min(timetype) : ts_time_value_to_internal(mindat, timetype); + /* we fetch the int64 value from the dimension slice catalog. so read it back as int64 */ + int64 min_value = min_isnull ? ts_time_get_min(timetype) : DatumGetInt64(mindat); res = SPI_finish(); if (res != SPI_OK_FINISH) From 88e6f7b0748c2a0133cdcab5e07412f1617f8e9a Mon Sep 17 00:00:00 2001 From: gayyappan Date: Thu, 19 Dec 2024 10:33:26 -0500 Subject: [PATCH 3/4] Test case for cagg with enable_tiered_reads --- tsl/test/expected/cagg_osm.out | 180 +++++++++++++++++++++++++++++++++ tsl/test/sql/CMakeLists.txt | 1 + tsl/test/sql/cagg_osm.sql | 113 +++++++++++++++++++++ 3 files changed, 294 insertions(+) create mode 100644 tsl/test/expected/cagg_osm.out create mode 100644 tsl/test/sql/cagg_osm.sql diff --git a/tsl/test/expected/cagg_osm.out b/tsl/test/expected/cagg_osm.out new file mode 100644 index 00000000000..649502a845a --- /dev/null +++ b/tsl/test/expected/cagg_osm.out @@ -0,0 +1,180 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- These tests work for PG14 or greater +-- Remember to coordinate any changes to functionality with the Cloud +-- Native Storage team. Tests for the following API: +-- * cagg refresh with GUC enable_tiered_reads +\set EXPLAIN 'EXPLAIN (COSTS OFF)' +--SETUP for OSM chunk -- +--need superuser access to create foreign data server +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE DATABASE postgres_fdw_db; +GRANT ALL PRIVILEGES ON DATABASE postgres_fdw_db TO :ROLE_4; +\c postgres_fdw_db :ROLE_4 +CREATE TABLE fdw_table( timec timestamptz NOT NULL , acq_id bigint, value bigint); +INSERT INTO fdw_table VALUES( '2020-01-01 01:00', 100, 1000); +--create foreign server and user mappings as superuser +\c :TEST_DBNAME :ROLE_SUPERUSER +SELECT current_setting('port') as "PORTNO" \gset +CREATE EXTENSION postgres_fdw; +CREATE SERVER s3_server FOREIGN DATA WRAPPER postgres_fdw +OPTIONS ( host 'localhost', dbname 'postgres_fdw_db', port :'PORTNO'); +GRANT USAGE ON FOREIGN SERVER s3_server TO :ROLE_4; +CREATE USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS ( user :'ROLE_4' , password :'ROLE_4_PASS'); +ALTER USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS (ADD password_required 'false'); +\c :TEST_DBNAME :ROLE_4; +-- this is a stand-in for the OSM table +CREATE FOREIGN TABLE child_fdw_table +(timec timestamptz NOT NULL, acq_id bigint, value bigint) + SERVER s3_server OPTIONS ( schema_name 'public', table_name 'fdw_table'); +--now attach foreign table as a chunk of the hypertable. +CREATE TABLE ht_try(timec timestamptz NOT NULL, acq_id bigint, value bigint); +SELECT create_hypertable('ht_try', 'timec', chunk_time_interval => interval '1 day'); + create_hypertable +--------------------- + (1,public,ht_try,t) +(1 row) + +INSERT INTO ht_try VALUES ('2022-05-05 01:00', 222, 222); +SELECT * FROM child_fdw_table; + timec | acq_id | value +------------------------------+--------+------- + Wed Jan 01 01:00:00 2020 PST | 100 | 1000 +(1 row) + +SELECT _timescaledb_functions.attach_osm_table_chunk('ht_try', 'child_fdw_table'); + attach_osm_table_chunk +------------------------ + t +(1 row) + +-- must also update the range since the created chunk contains data +SELECT _timescaledb_functions.hypertable_osm_range_update('ht_try', '2020-01-01'::timestamptz, '2020-01-02'); + hypertable_osm_range_update +----------------------------- + f +(1 row) + +set timescaledb.enable_tiered_reads = true; +SELECT * from ht_try ORDER BY 1; + timec | acq_id | value +------------------------------+--------+------- + Wed Jan 01 01:00:00 2020 PST | 100 | 1000 + Thu May 05 01:00:00 2022 PDT | 222 | 222 +(2 rows) + +-- IMPORTANT we test with disabled tiered reads +-- This set of tests verify that refresh_continuous_aggregate( cagg, NULL, ...) does not +-- generate bad invalidation logs i.e. invlaidations are for ranges that strictly visible +-- to the refresh +-- we are able to handle this only for the special case : refresh with start_time = NULL +set timescaledb.enable_tiered_reads = false; +SELECT * from ht_try ORDER BY 1; + timec | acq_id | value +------------------------------+--------+------- + Thu May 05 01:00:00 2022 PDT | 222 | 222 +(1 row) + +--TEST cagg creation +CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('7 days'::interval, timec), count(*) +FROM ht_try +GROUP BY 1; +NOTICE: refreshing continuous aggregate "cagg_ht_osm" +SELECT * FROM cagg_ht_osm ORDER BY 1; + time_bucket | count +------------------------------+------- + Sun May 01 17:00:00 2022 PDT | 1 +(1 row) + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + materialization_id | lowest_modified_value | greatest_modified_value +--------------------+-----------------------+------------------------- + 2 | -9223372036854775808 | 1651449599999999 + 2 | 1652054400000000 | 9223372036854775807 +(2 rows) + +-- now refresh everything. it should still not change invalidation_log for tiered portion because +-- tiered reads are off +CALL refresh_continuous_aggregate('cagg_ht_osm', NULL, '2023-04-01'); +SELECT * FROM cagg_ht_osm ORDER BY 1; + time_bucket | count +------------------------------+------- + Sun May 01 17:00:00 2022 PDT | 1 +(1 row) + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + materialization_id | lowest_modified_value | greatest_modified_value +--------------------+-----------------------+------------------------- + 2 | -9223372036854775808 | 1651449599999999 + 2 | 1679875200000000 | 9223372036854775807 +(2 rows) + +DROP MATERIALIZED VIEW cagg_ht_osm; +NOTICE: drop cascades to table _timescaledb_internal._hyper_2_3_chunk +-- TEST : Repeat cagg refresh test but without initially materializing any data +CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('7 days'::interval, timec), count(*) +FROM ht_try +GROUP BY 1 WITH NO DATA; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + materialization_id | lowest_modified_value | greatest_modified_value +--------------------+-----------------------+------------------------- + 3 | -9223372036854775808 | 9223372036854775807 +(1 row) + +CALL refresh_continuous_aggregate('cagg_ht_osm', NULL, '2023-04-01'); +SELECT * FROM cagg_ht_osm ORDER BY 1; + time_bucket | count +------------------------------+------- + Sun May 01 17:00:00 2022 PDT | 1 +(1 row) + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + materialization_id | lowest_modified_value | greatest_modified_value +--------------------+-----------------------+------------------------- + 3 | -9223372036854775808 | 1651449599999999 + 3 | 1679875200000000 | 9223372036854775807 +(2 rows) + +DROP MATERIALIZED VIEW cagg_ht_osm; +NOTICE: drop cascades to table _timescaledb_internal._hyper_3_4_chunk +-- TEST : cagg refresh test for variable size bucket +CREATE MATERIALIZED VIEW cagg_ht_osm_var WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('1 month'::interval, timec), count(*) +FROM ht_try +GROUP BY 1 WITH NO DATA; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + materialization_id | lowest_modified_value | greatest_modified_value +--------------------+-----------------------+------------------------- + 4 | -9223372036854775808 | 9223372036854775807 +(1 row) + +CALL refresh_continuous_aggregate('cagg_ht_osm_var', NULL, '2023-04-01'); +SELECT * FROM cagg_ht_osm_var ORDER BY 1; + time_bucket | count +------------------------------+------- + Sat Apr 30 17:00:00 2022 PDT | 1 +(1 row) + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + materialization_id | lowest_modified_value | greatest_modified_value +--------------------+-----------------------+------------------------- + 4 | -9223372036854775808 | 1651363199999999 + 4 | 1680307200000000 | 9223372036854775807 +(2 rows) + +DROP MATERIALIZED VIEW cagg_ht_osm_var; +NOTICE: drop cascades to table _timescaledb_internal._hyper_4_5_chunk diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 5b2451fb0d7..63536167944 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -8,6 +8,7 @@ set(TEST_FILES bgw_policy.sql bgw_security.sql cagg_deprecated_bucket_ng.sql + cagg_osm.sql cagg_errors.sql cagg_invalidation.sql cagg_policy.sql diff --git a/tsl/test/sql/cagg_osm.sql b/tsl/test/sql/cagg_osm.sql new file mode 100644 index 00000000000..9a8b8cd8605 --- /dev/null +++ b/tsl/test/sql/cagg_osm.sql @@ -0,0 +1,113 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- These tests work for PG14 or greater +-- Remember to coordinate any changes to functionality with the Cloud +-- Native Storage team. Tests for the following API: +-- * cagg refresh with GUC enable_tiered_reads + +\set EXPLAIN 'EXPLAIN (COSTS OFF)' + +--SETUP for OSM chunk -- +--need superuser access to create foreign data server +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE DATABASE postgres_fdw_db; +GRANT ALL PRIVILEGES ON DATABASE postgres_fdw_db TO :ROLE_4; + +\c postgres_fdw_db :ROLE_4 +CREATE TABLE fdw_table( timec timestamptz NOT NULL , acq_id bigint, value bigint); +INSERT INTO fdw_table VALUES( '2020-01-01 01:00', 100, 1000); + +--create foreign server and user mappings as superuser +\c :TEST_DBNAME :ROLE_SUPERUSER + +SELECT current_setting('port') as "PORTNO" \gset + +CREATE EXTENSION postgres_fdw; +CREATE SERVER s3_server FOREIGN DATA WRAPPER postgres_fdw +OPTIONS ( host 'localhost', dbname 'postgres_fdw_db', port :'PORTNO'); +GRANT USAGE ON FOREIGN SERVER s3_server TO :ROLE_4; + +CREATE USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS ( user :'ROLE_4' , password :'ROLE_4_PASS'); + +ALTER USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS (ADD password_required 'false'); + +\c :TEST_DBNAME :ROLE_4; +-- this is a stand-in for the OSM table +CREATE FOREIGN TABLE child_fdw_table +(timec timestamptz NOT NULL, acq_id bigint, value bigint) + SERVER s3_server OPTIONS ( schema_name 'public', table_name 'fdw_table'); + +--now attach foreign table as a chunk of the hypertable. +CREATE TABLE ht_try(timec timestamptz NOT NULL, acq_id bigint, value bigint); +SELECT create_hypertable('ht_try', 'timec', chunk_time_interval => interval '1 day'); +INSERT INTO ht_try VALUES ('2022-05-05 01:00', 222, 222); + +SELECT * FROM child_fdw_table; +SELECT _timescaledb_functions.attach_osm_table_chunk('ht_try', 'child_fdw_table'); +-- must also update the range since the created chunk contains data +SELECT _timescaledb_functions.hypertable_osm_range_update('ht_try', '2020-01-01'::timestamptz, '2020-01-02'); + +set timescaledb.enable_tiered_reads = true; +SELECT * from ht_try ORDER BY 1; +-- IMPORTANT we test with disabled tiered reads +-- This set of tests verify that refresh_continuous_aggregate( cagg, NULL, ...) does not +-- generate bad invalidation logs i.e. invlaidations are for ranges that strictly visible +-- to the refresh +-- we are able to handle this only for the special case : refresh with start_time = NULL +set timescaledb.enable_tiered_reads = false; +SELECT * from ht_try ORDER BY 1; + +--TEST cagg creation +CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('7 days'::interval, timec), count(*) +FROM ht_try +GROUP BY 1; + +SELECT * FROM cagg_ht_osm ORDER BY 1; + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + +-- now refresh everything. it should still not change invalidation_log for tiered portion because +-- tiered reads are off +CALL refresh_continuous_aggregate('cagg_ht_osm', NULL, '2023-04-01'); +SELECT * FROM cagg_ht_osm ORDER BY 1; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + +DROP MATERIALIZED VIEW cagg_ht_osm; +-- TEST : Repeat cagg refresh test but without initially materializing any data +CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('7 days'::interval, timec), count(*) +FROM ht_try +GROUP BY 1 WITH NO DATA; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; +CALL refresh_continuous_aggregate('cagg_ht_osm', NULL, '2023-04-01'); +SELECT * FROM cagg_ht_osm ORDER BY 1; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + +DROP MATERIALIZED VIEW cagg_ht_osm; + +-- TEST : cagg refresh test for variable size bucket +CREATE MATERIALIZED VIEW cagg_ht_osm_var WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('1 month'::interval, timec), count(*) +FROM ht_try +GROUP BY 1 WITH NO DATA; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; +CALL refresh_continuous_aggregate('cagg_ht_osm_var', NULL, '2023-04-01'); +SELECT * FROM cagg_ht_osm_var ORDER BY 1; +SELECT * FROM +_timescaledb_catalog.continuous_aggs_materialization_invalidation_log ORDER BY 1; + +DROP MATERIALIZED VIEW cagg_ht_osm_var; + From b465b78bf21cbc18f6a661ade7fe2e39a80ded1e Mon Sep 17 00:00:00 2001 From: gayyappan Date: Tue, 14 Jan 2025 12:27:21 -0500 Subject: [PATCH 4/4] PR file --- .unreleased/pr_7546 | 1 + 1 file changed, 1 insertion(+) create mode 100644 .unreleased/pr_7546 diff --git a/.unreleased/pr_7546 b/.unreleased/pr_7546 new file mode 100644 index 00000000000..a6df0ea5b73 --- /dev/null +++ b/.unreleased/pr_7546 @@ -0,0 +1 @@ +Implements: #7546 Continuous aggregate refresh uses min time based on chunk metadata when start_offset is NULL and `enable_tiered_reads` GUC is false.