-
Notifications
You must be signed in to change notification settings - Fork 897
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This release is a patch release. We recommend that you upgrade at the next available opportunity. **Bugfixes** * #4454 Keep locks after reading job status * #4658 Fix error when querying a compressed hypertable with compress_segmentby on an enum column * #4671 Fix a possible error while flushing the COPY data * #4675 Fix bad TupleTableSlot drop * #4676 Fix a deadlock when decompressing chunks and performing SELECTs * #4685 Fix chunk exclusion for space partitions in SELECT FOR UPDATE queries * #4694 Change parameter names of cagg_migrate procedure * #4698 Do not use row-by-row fetcher for parameterized plans * #4711 Remove support for procedures as custom checks * #4712 Fix assertion failure in constify_now * #4713 Fix Continuous Aggregate migration policies * #4720 Fix chunk exclusion for prepared statements and dst changes * #4726 Fix gapfill function signature * #4737 Fix join on time column of compressed chunk * #4738 Fix error when waiting for remote COPY to finish * #4739 Fix continuous aggregate migrate check constraint * #4760 Fix segfault when INNER JOINing hypertables * #4767 Fix permission issues on index creation for CAggs **Thanks** * @boxhock and @cocowalla for reporting a segfault when JOINing hypertables * @carobme for reporting constraint error during continuous aggregate migration * @choisnetm, @dustinsorensen, @jayadevanm and @joeyberkovitz for reporting a problem with JOINs on compressed hypertables * @daniel-k for reporting a background worker crash * @justinpryzby for reporting an error when compressing very wide tables * @maxtwardowski for reporting problems with chunk exclusion and space partitions * @yuezhihan for reporting GROUP BY error when having compress_segmentby on an enum column
- Loading branch information
1 parent
3dfeb72
commit 6507848
Showing
7 changed files
with
371 additions
and
364 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
DROP PROCEDURE IF EXISTS @[email protected]_migrate (REGCLASS, BOOLEAN, BOOLEAN); | ||
DROP PROCEDURE IF EXISTS _timescaledb_internal.cagg_migrate_create_plan (_timescaledb_catalog.continuous_agg, TEXT, BOOLEAN, BOOLEAN); | ||
|
||
CREATE PROCEDURE _timescaledb_internal.cagg_migrate_create_plan ( | ||
_cagg_data _timescaledb_catalog.continuous_agg, | ||
_cagg_name_new TEXT, | ||
_override BOOLEAN DEFAULT FALSE, | ||
_drop_old BOOLEAN DEFAULT FALSE | ||
) | ||
LANGUAGE plpgsql AS | ||
$BODY$ | ||
DECLARE | ||
_sql TEXT; | ||
_matht RECORD; | ||
_time_interval INTERVAL; | ||
_integer_interval BIGINT; | ||
_watermark TEXT; | ||
_policies JSONB; | ||
_bucket_column_name TEXT; | ||
_bucket_column_type TEXT; | ||
_interval_type TEXT; | ||
_interval_value TEXT; | ||
BEGIN | ||
IF _timescaledb_internal.cagg_migrate_plan_exists(_cagg_data.mat_hypertable_id) IS TRUE THEN | ||
RAISE EXCEPTION 'plan already exists for materialized hypertable %', _cagg_data.mat_hypertable_id; | ||
END IF; | ||
|
||
INSERT INTO | ||
_timescaledb_catalog.continuous_agg_migrate_plan (mat_hypertable_id) | ||
VALUES | ||
(_cagg_data.mat_hypertable_id); | ||
|
||
SELECT schema_name, table_name | ||
INTO _matht | ||
FROM _timescaledb_catalog.hypertable | ||
WHERE id = _cagg_data.mat_hypertable_id; | ||
|
||
SELECT time_interval, integer_interval, column_name, column_type | ||
INTO _time_interval, _integer_interval, _bucket_column_name, _bucket_column_type | ||
FROM timescaledb_information.dimensions | ||
WHERE hypertable_schema = _matht.schema_name | ||
AND hypertable_name = _matht.table_name | ||
AND dimension_type = 'Time'; | ||
|
||
IF _integer_interval IS NOT NULL THEN | ||
_interval_value := _integer_interval::TEXT; | ||
_interval_type := _bucket_column_type; | ||
IF _bucket_column_type = 'bigint' THEN | ||
_watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::bigint, '-9223372036854775808'::bigint)::TEXT; | ||
ELSIF _bucket_column_type = 'integer' THEN | ||
_watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::integer, '-2147483648'::integer)::TEXT; | ||
ELSE | ||
_watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::smallint, '-32768'::smallint)::TEXT; | ||
END IF; | ||
ELSE | ||
_interval_value := _time_interval::TEXT; | ||
_interval_type := 'interval'; | ||
_watermark := COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)), '-infinity'::timestamptz)::TEXT; | ||
END IF; | ||
|
||
-- get all scheduled policies except the refresh | ||
SELECT jsonb_build_object('policies', array_agg(id)) | ||
INTO _policies | ||
FROM _timescaledb_config.bgw_job | ||
WHERE hypertable_id = _cagg_data.mat_hypertable_id | ||
AND proc_name IS DISTINCT FROM 'policy_refresh_continuous_aggregate' | ||
AND scheduled IS TRUE | ||
AND id >= 1000; | ||
|
||
INSERT INTO | ||
_timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config) | ||
VALUES | ||
(_cagg_data.mat_hypertable_id, 'SAVE WATERMARK', jsonb_build_object('watermark', _watermark)), | ||
(_cagg_data.mat_hypertable_id, 'CREATE NEW CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new)), | ||
(_cagg_data.mat_hypertable_id, 'DISABLE POLICIES', _policies), | ||
(_cagg_data.mat_hypertable_id, 'REFRESH NEW CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new, 'window_start', _watermark, 'window_start_type', _bucket_column_type)); | ||
|
||
-- Finish the step because don't require any extra step | ||
UPDATE _timescaledb_catalog.continuous_agg_migrate_plan_step | ||
SET status = 'FINISHED', start_ts = now(), end_ts = clock_timestamp() | ||
WHERE type = 'SAVE WATERMARK'; | ||
|
||
_sql := format ( | ||
$$ | ||
WITH boundaries AS ( | ||
SELECT min(%1$I), max(%1$I), %1$L AS bucket_column_name, %2$L AS bucket_column_type, %3$L AS cagg_name_new | ||
FROM %4$I.%5$I | ||
WHERE %1$I < CAST(%6$L AS %2$s) | ||
) | ||
INSERT INTO | ||
_timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config) | ||
SELECT | ||
%7$L, | ||
'COPY DATA', | ||
jsonb_build_object ( | ||
'start_ts', start::text, | ||
'end_ts', (start + CAST(%8$L AS %9$s))::text, | ||
'bucket_column_name', bucket_column_name, | ||
'bucket_column_type', bucket_column_type, | ||
'cagg_name_new', cagg_name_new | ||
) | ||
FROM boundaries, | ||
LATERAL generate_series(min, max, CAST(%8$L AS %9$s)) AS start; | ||
$$, | ||
_bucket_column_name, _bucket_column_type, _cagg_name_new, _cagg_data.user_view_schema, | ||
_cagg_data.user_view_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value, _interval_type | ||
); | ||
|
||
EXECUTE _sql; | ||
|
||
-- get all scheduled policies | ||
SELECT jsonb_build_object('policies', array_agg(id)) | ||
INTO _policies | ||
FROM _timescaledb_config.bgw_job | ||
WHERE hypertable_id = _cagg_data.mat_hypertable_id | ||
AND scheduled IS TRUE | ||
AND id >= 1000; | ||
|
||
INSERT INTO | ||
_timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config) | ||
VALUES | ||
(_cagg_data.mat_hypertable_id, 'OVERRIDE CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new, 'override', _override, 'drop_old', _drop_old)), | ||
(_cagg_data.mat_hypertable_id, 'DROP OLD CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new, 'override', _override, 'drop_old', _drop_old)), | ||
(_cagg_data.mat_hypertable_id, 'COPY POLICIES', _policies || jsonb_build_object('cagg_name_new', _cagg_name_new)), | ||
(_cagg_data.mat_hypertable_id, 'ENABLE POLICIES', NULL); | ||
END; | ||
$BODY$ SET search_path TO pg_catalog, pg_temp; | ||
|
||
CREATE PROCEDURE @[email protected]_migrate ( | ||
cagg REGCLASS, | ||
override BOOLEAN DEFAULT FALSE, | ||
drop_old BOOLEAN DEFAULT FALSE | ||
) | ||
LANGUAGE plpgsql AS | ||
$BODY$ | ||
DECLARE | ||
_cagg_schema TEXT; | ||
_cagg_name TEXT; | ||
_cagg_name_new TEXT; | ||
_cagg_data _timescaledb_catalog.continuous_agg; | ||
BEGIN | ||
SELECT nspname, relname | ||
INTO _cagg_schema, _cagg_name | ||
FROM pg_catalog.pg_class | ||
JOIN pg_catalog.pg_namespace ON pg_namespace.oid OPERATOR(pg_catalog.=) pg_class.relnamespace | ||
WHERE pg_class.oid OPERATOR(pg_catalog.=) cagg::pg_catalog.oid; | ||
|
||
-- maximum size of an identifier in Postgres is 63 characters, se we need to left space for '_new' | ||
_cagg_name_new := pg_catalog.format('%s_new', pg_catalog.substr(_cagg_name, 1, 59)); | ||
|
||
-- pre-validate the migration and get some variables | ||
_cagg_data := _timescaledb_internal.cagg_migrate_pre_validation(_cagg_schema, _cagg_name, _cagg_name_new); | ||
|
||
-- create new migration plan | ||
CALL _timescaledb_internal.cagg_migrate_create_plan(_cagg_data, _cagg_name_new, override, drop_old); | ||
COMMIT; | ||
|
||
-- execute the migration plan | ||
CALL _timescaledb_internal.cagg_migrate_execute_plan(_cagg_data); | ||
|
||
-- finish the migration plan | ||
UPDATE _timescaledb_catalog.continuous_agg_migrate_plan | ||
SET end_ts = pg_catalog.clock_timestamp() | ||
WHERE mat_hypertable_id OPERATOR(pg_catalog.=) _cagg_data.mat_hypertable_id; | ||
END; | ||
$BODY$; | ||
|
||
-- Issue #4727 | ||
ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step | ||
DROP CONSTRAINT IF EXISTS continuous_agg_migrate_plan_step_check2; | ||
|
||
ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step | ||
ADD CONSTRAINT continuous_agg_migrate_plan_step_check2 | ||
CHECK (type IN ('CREATE NEW CAGG', 'DISABLE POLICIES', 'COPY POLICIES', 'ENABLE POLICIES', 'SAVE WATERMARK', 'REFRESH NEW CAGG', 'COPY DATA', 'OVERRIDE CAGG', 'DROP OLD CAGG')); |
Oops, something went wrong.