diff --git a/src/guc.h b/src/guc.h index 34ebc0ef2d6..d5bfecc2f5f 100644 --- a/src/guc.h +++ b/src/guc.h @@ -44,7 +44,7 @@ extern bool ts_guc_enable_cagg_reorder_groupby; extern TSDLLEXPORT int ts_guc_cagg_max_individual_materializations; extern bool ts_guc_enable_now_constify; extern bool ts_guc_enable_foreign_key_propagation; -extern bool ts_guc_enable_osm_reads; +extern TSDLLEXPORT bool ts_guc_enable_osm_reads; #if PG16_GE extern TSDLLEXPORT bool ts_guc_enable_cagg_sort_pushdown; #endif diff --git a/src/hypertable.c b/src/hypertable.c index ac3c2c5e765..f91255ebcf0 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -2402,6 +2402,67 @@ ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, return max_value; } +/* + * Get the min value of an open dimension. + */ +int64 +ts_hypertable_get_open_dim_min_value(const Hypertable *ht, int dimension_index, bool *isnull) +{ + StringInfo command; + const Dimension *dim; + int res; + bool min_isnull; + Datum mindat; + Oid timetype; + + dim = hyperspace_get_open_dimension(ht->space, dimension_index); + + if (NULL == dim) + elog(ERROR, "invalid open dimension index %d", dimension_index); + + timetype = ts_dimension_get_partition_type(dim); + + /* + * Query for the oldest chunk in the hypertable. + * Anyway to optimize this better? + */ + command = makeStringInfo(); + appendStringInfo(command, + "SELECT pg_catalog.min(%s) FROM %s.%s", + quote_identifier(NameStr(dim->fd.column_name)), + quote_identifier(NameStr(ht->fd.schema_name)), + quote_identifier(NameStr(ht->fd.table_name))); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "could not connect to SPI"); + + res = SPI_execute(command->data, true /* read_only */, 0 /*count*/); + + if (res < 0) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + (errmsg("could not find the minimum time value for hypertable \"%s\"", + get_rel_name(ht->main_table_relid))))); + + Ensure(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == timetype, + "partition types for result (%d) and dimension (%d) do not match", + SPI_gettypeid(SPI_tuptable->tupdesc, 1), + ts_dimension_get_partition_type(dim)); + mindat = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &min_isnull); + + if (isnull) + *isnull = min_isnull; + + int64 min_value = + min_isnull ? ts_time_get_min(timetype) : ts_time_value_to_internal(mindat, timetype); + + res = SPI_finish(); + if (res != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(res)); + + return min_value; +} + bool ts_hypertable_has_compression_table(const Hypertable *ht) { diff --git a/src/hypertable.h b/src/hypertable.h index 9b961b801e3..4d36591dc1c 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -146,6 +146,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht, int64 compress_interval); extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_max_value(const Hypertable *ht, int dimension_index, bool *isnull); +extern TSDLLEXPORT int64 ts_hypertable_get_open_dim_min_value(const Hypertable *ht, + int dimension_index, bool *isnull); extern TSDLLEXPORT bool ts_hypertable_has_compression_table(const Hypertable *ht); extern TSDLLEXPORT void ts_hypertable_formdata_fill(FormData_hypertable *fd, const TupleInfo *ti); diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index a786b4bf44a..e59d90d3336 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -1587,6 +1587,22 @@ ts_compute_circumscribed_bucketed_refresh_window_variable(int64 *start, int64 *e *end = ts_time_value_to_internal(end_new, TIMESTAMPOID); } +int64 +ts_compute_circumscribed_bucketed_refresh_window_start_variable( + int64 start, const ContinuousAggsBucketFunction *bf) +{ + Datum start_old, start_new; + + /* + * It's OK to use TIMESTAMPOID here. + * See the comment in ts_compute_inscribed_bucketed_refresh_window_variable() + */ + start_old = ts_internal_to_time_value(start, TIMESTAMPOID); + start_new = generic_time_bucket(bf, start_old); + + return ts_time_value_to_internal(start_new, TIMESTAMPOID); +} + /* * Calculates the beginning of the next bucket. * diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h index 268c42488d3..6a343370573 100644 --- a/src/ts_catalog/continuous_agg.h +++ b/src/ts_catalog/continuous_agg.h @@ -215,3 +215,6 @@ extern TSDLLEXPORT Query *ts_continuous_agg_get_query(ContinuousAgg *cagg); extern TSDLLEXPORT int64 ts_continuous_agg_fixed_bucket_width(const ContinuousAggsBucketFunction *bucket_function); + +extern TSDLLEXPORT int64 ts_compute_circumscribed_bucketed_refresh_window_start_variable( + int64 start, const ContinuousAggsBucketFunction *bf); diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index b6097dbc8c5..2b999eae5ed 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -372,6 +372,56 @@ compute_circumscribed_bucketed_refresh_window(const ContinuousAgg *cagg, return result; } +static int64 +compute_circumscribed_refresh_window_start(const ContinuousAgg *cagg, const InternalTimeRange *const refresh_window, + const ContinuousAggsBucketFunction *bucket_function) +{ + Assert(refresh_window != NULL); + Assert(bucket_function != NULL); + + if (bucket_function->bucket_fixed_interval == false) + { + InternalTimeRange result = *refresh_window; + result.start = + ts_compute_circumscribed_bucketed_refresh_window_start_variable(refresh_window->start, + bucket_function); + return result.start; + } + + /* Interval is fixed */ + int64 bucket_width = ts_continuous_agg_fixed_bucket_width(bucket_function); + Assert(bucket_width > 0); + + InternalTimeRange result = *refresh_window; + InternalTimeRange largest_bucketed_window = + get_largest_bucketed_window(refresh_window->type, bucket_width); + + /* Get offset and origin for bucket function */ + NullableDatum offset = INIT_NULL_DATUM; + NullableDatum origin = INIT_NULL_DATUM; + fill_bucket_offset_origin(cagg, refresh_window, &offset, &origin); + + /* Defined offset and origin in one function is not supported */ + Assert(offset.isnull == true || origin.isnull == true); + + if (refresh_window->start <= largest_bucketed_window.start) + { + result.start = largest_bucketed_window.start; + } + else + { + /* For alignment with a bucket, which includes the start of the refresh window, we just + * need to get start of the bucket. */ + result.start = ts_time_bucket_by_type_extended(bucket_width, + refresh_window->start, + refresh_window->type, + offset, + origin); + } + + return result.start; +} + /* * Initialize the refresh state for a continuous aggregate. * @@ -790,26 +840,66 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, * still take a long time and it is probably best for consistency to always * prevent transaction blocks. */ PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME); + elog(NOTICE, + "HERE refrech window INPUT IS (%ld %ld) (start_is_null %d)", + refresh_window.start, + refresh_window.end, + start_isnull); /* No bucketing when open ended */ - if (!(start_isnull && end_isnull)) + if (!(start_isnull && end_isnull) || (start_isnull && !ts_guc_enable_osm_reads)) { + int64 mints = 0, bucket_mints = 0; + bool min_isnull = true; + if (start_isnull && ts_guc_enable_osm_reads == false) + { + /*set refresh window start to min(ts) of raw hypertable as tiered + * data is not visible */ + InternalTimeRange tw = *refresh_window_arg; + const Hypertable *raw_ht = cagg_get_hypertable_or_fail(cagg->data.raw_hypertable_id); + mints = ts_hypertable_get_open_dim_min_value(raw_ht, 0, &min_isnull); + tw.start = mints; + bucket_mints = compute_circumscribed_refresh_window_start(cagg, &tw, cagg->bucket_function); + elog(NOTICE, "got mints=%ld bucketmints=%ld ", mints, bucket_mints); + } if (cagg->bucket_function->bucket_fixed_interval == false) { - refresh_window = *refresh_window_arg; + elog(NOTICE, + "NOT FIXED INTERVAL original (%ld %ld)", + refresh_window.start, + refresh_window.end); + if (!min_isnull) + refresh_window.start = bucket_mints; + ts_compute_inscribed_bucketed_refresh_window_variable(&refresh_window.start, &refresh_window.end, cagg->bucket_function); + elog(NOTICE, + "NOT FIXED INTERVAL adjusted (%ld %ld)", + refresh_window.start, + refresh_window.end); } else { + InternalTimeRange refresh_window_arg_copy = *refresh_window_arg; + if (!min_isnull) + refresh_window_arg_copy.start = bucket_mints; int64 bucket_width = ts_continuous_agg_fixed_bucket_width(cagg->bucket_function); Assert(bucket_width > 0); refresh_window = - compute_inscribed_bucketed_refresh_window(cagg, refresh_window_arg, bucket_width); + compute_inscribed_bucketed_refresh_window(cagg, + (const InternalTimeRange + *) (&refresh_window_arg_copy), + bucket_width); + elog(NOTICE, + "FIXED INTERVAL orig(%ld %ld) adjusted (%ld %ld)", + refresh_window_arg_copy.start, + refresh_window_arg_copy.end, + refresh_window.start, + refresh_window.end); } } - + elog(NOTICE, "HERE refrech window (%ld %ld)", refresh_window.start, refresh_window.end); if (refresh_window.start >= refresh_window.end) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -859,6 +949,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, (IS_TIMESTAMP_TYPE(refresh_window.type) && invalidation_threshold == ts_time_get_min(refresh_window.type))) { + elog(NOTICE, + "upto date (start=%ld end=%ld) threshold=%ld", + refresh_window.start, + refresh_window.end, + invalidation_threshold); emit_up_to_date_notice(cagg, callctx); /* Restore search_path */ diff --git a/tsl/test/expected/cagg_osm.out b/tsl/test/expected/cagg_osm.out new file mode 100644 index 00000000000..ef0fa38bf13 --- /dev/null +++ b/tsl/test/expected/cagg_osm.out @@ -0,0 +1,99 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- These tests work for PG14 or greater +-- Remember to coordinate any changes to functionality with the Cloud +-- Native Storage team. Tests for the following API: +-- * cagg refresh with GUC enable_tiered_reads +\set EXPLAIN 'EXPLAIN (COSTS OFF)' +--SETUP for OSM chunk -- +--need superuser access to create foreign data server +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE DATABASE postgres_fdw_db; +GRANT ALL PRIVILEGES ON DATABASE postgres_fdw_db TO :ROLE_4; +\c postgres_fdw_db :ROLE_4 +CREATE TABLE fdw_table( timec timestamptz NOT NULL , acq_id bigint, value bigint); +INSERT INTO fdw_table VALUES( '2020-01-01 01:00', 100, 1000); +--create foreign server and user mappings as superuser +\c :TEST_DBNAME :ROLE_SUPERUSER +SELECT current_setting('port') as "PORTNO" \gset +CREATE EXTENSION postgres_fdw; +CREATE SERVER s3_server FOREIGN DATA WRAPPER postgres_fdw +OPTIONS ( host 'localhost', dbname 'postgres_fdw_db', port :'PORTNO'); +GRANT USAGE ON FOREIGN SERVER s3_server TO :ROLE_4; +CREATE USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS ( user :'ROLE_4' , password :'ROLE_4_PASS'); +ALTER USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS (ADD password_required 'false'); +\c :TEST_DBNAME :ROLE_4; +-- this is a stand-in for the OSM table +CREATE FOREIGN TABLE child_fdw_table +(timec timestamptz NOT NULL, acq_id bigint, value bigint) + SERVER s3_server OPTIONS ( schema_name 'public', table_name 'fdw_table'); +--now attach foreign table as a chunk of the hypertable. +CREATE TABLE ht_try(timec timestamptz NOT NULL, acq_id bigint, value bigint); +SELECT create_hypertable('ht_try', 'timec', chunk_time_interval => interval '1 day'); + create_hypertable +--------------------- + (1,public,ht_try,t) +(1 row) + +INSERT INTO ht_try VALUES ('2022-05-05 01:00', 222, 222); +SELECT * FROM child_fdw_table; + timec | acq_id | value +------------------------------+--------+------- + Wed Jan 01 01:00:00 2020 PST | 100 | 1000 +(1 row) + +SELECT _timescaledb_functions.attach_osm_table_chunk('ht_try', 'child_fdw_table'); + attach_osm_table_chunk +------------------------ + t +(1 row) + +-- must also update the range since the created chunk contains data +SELECT _timescaledb_functions.hypertable_osm_range_update('ht_try', '2020-01-01'::timestamptz, '2020-01-02'); + hypertable_osm_range_update +----------------------------- + f +(1 row) + +set timescaledb.enable_tiered_reads = true; +SELECT * from ht_try ORDER BY 1; + timec | acq_id | value +------------------------------+--------+------- + Wed Jan 01 01:00:00 2020 PST | 100 | 1000 + Thu May 05 01:00:00 2022 PDT | 222 | 222 +(2 rows) + +--disable tiered reads -- +--set timescaledb.enable_tiered_reads = false; +SELECT * from ht_try ORDER BY 1; + timec | acq_id | value +------------------------------+--------+------- + Wed Jan 01 01:00:00 2020 PST | 100 | 1000 + Thu May 05 01:00:00 2022 PDT | 222 | 222 +(2 rows) + +--TEST cagg creation +CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('7 days'::interval, timec), count(*) +FROM ht_try +GROUP BY 1; +NOTICE: HERE refrech window INPUT IS (-210866803200000000 9223372036854775807) (start_is_null 1) +NOTICE: HERE refrech window (-210866803200000000 9223372036854775807) +NOTICE: refreshing continuous aggregate "cagg_ht_osm" +SELECT * FROM cagg_ht_osm ORDER BY 1; + time_bucket | count +------------------------------+------- + Sun Dec 29 16:00:00 2019 PST | 1 + Sun May 01 17:00:00 2022 PDT | 1 +(2 rows) + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_hypertable_invalidation_log ORDER BY 1; + hypertable_id | lowest_modified_value | greatest_modified_value +---------------+-----------------------+------------------------- +(0 rows) + diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 2b3a007d7a2..10d6f9f4b90 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -8,6 +8,7 @@ set(TEST_FILES bgw_policy.sql bgw_security.sql cagg_deprecated_bucket_ng.sql + cagg_osm.sql cagg_errors.sql cagg_invalidation.sql cagg_policy.sql diff --git a/tsl/test/sql/cagg_osm.sql b/tsl/test/sql/cagg_osm.sql new file mode 100644 index 00000000000..06d4f1ed55c --- /dev/null +++ b/tsl/test/sql/cagg_osm.sql @@ -0,0 +1,70 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- These tests work for PG14 or greater +-- Remember to coordinate any changes to functionality with the Cloud +-- Native Storage team. Tests for the following API: +-- * cagg refresh with GUC enable_tiered_reads + +\set EXPLAIN 'EXPLAIN (COSTS OFF)' + +--SETUP for OSM chunk -- +--need superuser access to create foreign data server +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE DATABASE postgres_fdw_db; +GRANT ALL PRIVILEGES ON DATABASE postgres_fdw_db TO :ROLE_4; + +\c postgres_fdw_db :ROLE_4 +CREATE TABLE fdw_table( timec timestamptz NOT NULL , acq_id bigint, value bigint); +INSERT INTO fdw_table VALUES( '2020-01-01 01:00', 100, 1000); + +--create foreign server and user mappings as superuser +\c :TEST_DBNAME :ROLE_SUPERUSER + +SELECT current_setting('port') as "PORTNO" \gset + +CREATE EXTENSION postgres_fdw; +CREATE SERVER s3_server FOREIGN DATA WRAPPER postgres_fdw +OPTIONS ( host 'localhost', dbname 'postgres_fdw_db', port :'PORTNO'); +GRANT USAGE ON FOREIGN SERVER s3_server TO :ROLE_4; + +CREATE USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS ( user :'ROLE_4' , password :'ROLE_4_PASS'); + +ALTER USER MAPPING FOR :ROLE_4 SERVER s3_server +OPTIONS (ADD password_required 'false'); + +\c :TEST_DBNAME :ROLE_4; +-- this is a stand-in for the OSM table +CREATE FOREIGN TABLE child_fdw_table +(timec timestamptz NOT NULL, acq_id bigint, value bigint) + SERVER s3_server OPTIONS ( schema_name 'public', table_name 'fdw_table'); + +--now attach foreign table as a chunk of the hypertable. +CREATE TABLE ht_try(timec timestamptz NOT NULL, acq_id bigint, value bigint); +SELECT create_hypertable('ht_try', 'timec', chunk_time_interval => interval '1 day'); +INSERT INTO ht_try VALUES ('2022-05-05 01:00', 222, 222); + +SELECT * FROM child_fdw_table; +SELECT _timescaledb_functions.attach_osm_table_chunk('ht_try', 'child_fdw_table'); +-- must also update the range since the created chunk contains data +SELECT _timescaledb_functions.hypertable_osm_range_update('ht_try', '2020-01-01'::timestamptz, '2020-01-02'); + +set timescaledb.enable_tiered_reads = true; +SELECT * from ht_try ORDER BY 1; +--disable tiered reads -- +set timescaledb.enable_tiered_reads = false; +SELECT * from ht_try ORDER BY 1; + +--TEST cagg creation +CREATE MATERIALIZED VIEW cagg_ht_osm WITH (timescaledb.continuous, timescaledb.materialized_only = true) +AS +SELECT time_bucket('7 days'::interval, timec), count(*) +FROM ht_try +GROUP BY 1; + +SELECT * FROM cagg_ht_osm ORDER BY 1; + +SELECT * FROM +_timescaledb_catalog.continuous_aggs_hypertable_invalidation_log ORDER BY 1;