Skip to content

Commit

Permalink
Choose cheapest aggregated path as base for chunk-wise aggregation (#…
Browse files Browse the repository at this point in the history
…7025)

Currently we use the cheapest path before final aggregation for this
(i.e. Append over individual chunks), but it does not consider the cost
of a Sort that might be required before aggregation. To account for
this, use instead the child path of the cheapest final aggregated path.
Remove duplicate handling of parallel paths, because the cheapest path
is either parallel or not, and we don't need to handle both together.

This is mostly needed for PR
#6879 , where we introduce
the unsorted DecompressChunk paths into consideration, and they are
wrongly chosen as a basis for partial aggregation without regard for the
cost of Sort over Append.
  • Loading branch information
akuzm authored Dec 10, 2024
1 parent 81ff88c commit 7b32f8d
Show file tree
Hide file tree
Showing 21 changed files with 1,142 additions and 1,381 deletions.
362 changes: 177 additions & 185 deletions tsl/src/chunkwise_agg.c

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ has_vector_agg_node(Plan *plan, bool *has_normal_agg)
{
append_plans = castNode(Append, plan)->appendplans;
}
if (IsA(plan, MergeAppend))
{
append_plans = castNode(MergeAppend, plan)->mergeplans;
}
else if (IsA(plan, CustomScan))
{
custom = castNode(CustomScan, plan);
Expand Down Expand Up @@ -471,6 +475,10 @@ try_insert_vector_agg_node(Plan *plan)
{
append_plans = castNode(Append, plan)->appendplans;
}
else if (IsA(plan, MergeAppend))
{
append_plans = castNode(MergeAppend, plan)->mergeplans;
}
else if (IsA(plan, CustomScan))
{
CustomScan *custom = castNode(CustomScan, plan);
Expand Down
110 changes: 52 additions & 58 deletions tsl/test/expected/cagg_union_view-14.out
Original file line number Diff line number Diff line change
Expand Up @@ -355,31 +355,29 @@ SELECT _timescaledb_functions.cagg_watermark(:boundary_view_id);

-- first UNION child should have no rows because no materialization has happened yet and 2nd child should have 4 rows
:PREFIX SELECT * FROM boundary_view;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------
Finalize GroupAggregate (actual rows=4 loops=1)
Group Key: (time_bucket(10, _hyper_5_5_chunk."time"))
-> Sort (actual rows=4 loops=1)
-> Merge Append (actual rows=4 loops=1)
Sort Key: (time_bucket(10, _hyper_5_5_chunk."time"))
Sort Method: quicksort
-> Append (actual rows=4 loops=1)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_5_chunk."time")
-> Index Scan Backward using _hyper_5_5_chunk_boundary_test_time_idx on _hyper_5_5_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_6_chunk."time")
-> Index Scan Backward using _hyper_5_6_chunk_boundary_test_time_idx on _hyper_5_6_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_7_chunk."time")
-> Index Scan Backward using _hyper_5_7_chunk_boundary_test_time_idx on _hyper_5_7_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_8_chunk."time")
-> Index Scan Backward using _hyper_5_8_chunk_boundary_test_time_idx on _hyper_5_8_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
(22 rows)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_5_chunk."time")
-> Index Scan Backward using _hyper_5_5_chunk_boundary_test_time_idx on _hyper_5_5_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_6_chunk."time")
-> Index Scan Backward using _hyper_5_6_chunk_boundary_test_time_idx on _hyper_5_6_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_7_chunk."time")
-> Index Scan Backward using _hyper_5_7_chunk_boundary_test_time_idx on _hyper_5_7_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_8_chunk."time")
-> Index Scan Backward using _hyper_5_8_chunk_boundary_test_time_idx on _hyper_5_8_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= '-2147483648'::integer)
(20 rows)

-- result should have 4 rows
SELECT * FROM boundary_view ORDER BY time_bucket;
Expand All @@ -403,26 +401,24 @@ SELECT _timescaledb_functions.cagg_watermark(:boundary_view_id);

-- both sides of the UNION should return 2 rows
:PREFIX SELECT * FROM boundary_view;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=4 loops=1)
-> Index Scan using _hyper_6_9_chunk__materialized_hypertable_6_time_bucket_idx on _hyper_6_9_chunk (actual rows=2 loops=1)
Index Cond: (time_bucket < 30)
-> Finalize GroupAggregate (actual rows=2 loops=1)
Group Key: (time_bucket(10, _hyper_5_7_chunk."time"))
-> Sort (actual rows=2 loops=1)
-> Merge Append (actual rows=2 loops=1)
Sort Key: (time_bucket(10, _hyper_5_7_chunk."time"))
Sort Method: quicksort
-> Append (actual rows=2 loops=1)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_7_chunk."time")
-> Index Scan Backward using _hyper_5_7_chunk_boundary_test_time_idx on _hyper_5_7_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= 30)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_8_chunk."time")
-> Index Scan Backward using _hyper_5_8_chunk_boundary_test_time_idx on _hyper_5_8_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= 30)
(17 rows)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_7_chunk."time")
-> Index Scan Backward using _hyper_5_7_chunk_boundary_test_time_idx on _hyper_5_7_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= 30)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(10, _hyper_5_8_chunk."time")
-> Index Scan Backward using _hyper_5_8_chunk_boundary_test_time_idx on _hyper_5_8_chunk (actual rows=1 loops=1)
Index Cond: ("time" >= 30)
(15 rows)

-- result should have 4 rows
SELECT * FROM boundary_view ORDER BY time_bucket;
Expand Down Expand Up @@ -592,8 +588,8 @@ ORDER by 1;

-- plan output
:PREFIX SELECT * FROM mat_m1 ORDER BY 1;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Sort (actual rows=3 loops=1)
Sort Key: _hyper_9_15_chunk.time_bucket
Sort Method: quicksort
Expand All @@ -604,27 +600,25 @@ ORDER by 1;
Group Key: (time_bucket(5, _hyper_7_11_chunk.a))
Filter: ((sum(_hyper_7_11_chunk.c) > 50) AND ((avg(_hyper_7_11_chunk.b))::integer > 12))
Rows Removed by Filter: 1
-> Sort (actual rows=3 loops=1)
-> Merge Append (actual rows=3 loops=1)
Sort Key: (time_bucket(5, _hyper_7_11_chunk.a))
Sort Method: quicksort
-> Append (actual rows=3 loops=1)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(5, _hyper_7_11_chunk.a)
-> Index Scan Backward using _hyper_7_11_chunk_ht_intdata_a_idx on _hyper_7_11_chunk (actual rows=2 loops=1)
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(5, _hyper_7_13_chunk.a)
-> Index Scan Backward using _hyper_7_13_chunk_ht_intdata_a_idx on _hyper_7_13_chunk (actual rows=3 loops=1)
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(5, _hyper_7_14_chunk.a)
-> Index Scan Backward using _hyper_7_14_chunk_ht_intdata_a_idx on _hyper_7_14_chunk (actual rows=1 loops=1)
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
Rows Removed by Filter: 2
(30 rows)
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(5, _hyper_7_11_chunk.a)
-> Index Scan Backward using _hyper_7_11_chunk_ht_intdata_a_idx on _hyper_7_11_chunk (actual rows=2 loops=1)
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(5, _hyper_7_13_chunk.a)
-> Index Scan Backward using _hyper_7_13_chunk_ht_intdata_a_idx on _hyper_7_13_chunk (actual rows=3 loops=1)
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
-> Partial GroupAggregate (actual rows=1 loops=1)
Group Key: time_bucket(5, _hyper_7_14_chunk.a)
-> Index Scan Backward using _hyper_7_14_chunk_ht_intdata_a_idx on _hyper_7_14_chunk (actual rows=1 loops=1)
Index Cond: (a >= 25)
Filter: ((b < 16) AND (c > 20))
Rows Removed by Filter: 2
(28 rows)

-- Test caggs with different time types
CREATE TABLE smallint_table (time smallint, value int);
Expand Down
Loading

0 comments on commit 7b32f8d

Please sign in to comment.