Skip to content

Commit

Permalink
fix(stream): let Row Merge executor handle extra row count column i…
Browse files Browse the repository at this point in the history
…n input (#17978)
  • Loading branch information
kwannoel authored Aug 11, 2024
1 parent 21a46ab commit a8b2811
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Single phase approx percentile
statement ok
create table t(p_col double, grp_col int);

statement ok
insert into t select a, 1 from generate_series(-1000, 1000) t(a);

statement ok
flush;

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
sum(p_col),
percentile_cont(0.5) within group (order by p_col) as p50,
count(*),
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-980 0 0 2001 980

statement ok
create materialized view m1 as
select
approx_percentile(0.01, 0.01) within group (order by p_col) as p01,
sum(p_col),
approx_percentile(0.5, 0.01) within group (order by p_col) as p50,
count(*),
approx_percentile(0.99, 0.01) within group (order by p_col) as p99
from t;

query I
select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test state encode / decode
onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s

query I
select * from m1;
----
-982.5779489474152 0 0 2001 982.5779489474152

# Test 0<x<1 values
statement ok
insert into t select 0.001, 1 from generate_series(1, 500);

statement ok
insert into t select 0.0001, 1 from generate_series(1, 501);

statement ok
flush;

query I
select * from m1;
----
-963.1209598593477 0.5501000000000007 0.00009999833511933609 3002 963.1209598593477

query I
select
percentile_cont(0.01) within group (order by p_col) as p01,
round(sum(p_col) * 100),
percentile_cont(0.5) within group (order by p_col) as p50,
count(*),
percentile_cont(0.99) within group (order by p_col) as p99
from t;
----
-969.99 55 0.0001 3002 969.9899999999998

statement ok
drop materialized view m1;

statement ok
drop table t;
16 changes: 12 additions & 4 deletions src/stream/src/executor/row_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,23 @@ impl RowMergeExecutor {
for (i, (op, lhs_row)) in lhs_chunk.rows().enumerate() {
ops.push(op);
for (j, d) in lhs_row.iter().enumerate() {
let out_index = lhs_mapping.map(j);
merged_rows[i][out_index] = d.to_owned_datum();
// NOTE(kwannoel): Unnecessary columns will not have a mapping,
// for instance extra row count column.
// those can be skipped here.
if let Some(out_index) = lhs_mapping.try_map(j) {
merged_rows[i][out_index] = d.to_owned_datum();
}
}
}

for (i, (_, rhs_row)) in rhs_chunk.rows().enumerate() {
for (j, d) in rhs_row.iter().enumerate() {
let out_index = rhs_mapping.map(j);
merged_rows[i][out_index] = d.to_owned_datum();
// NOTE(kwannoel): Unnecessary columns will not have a mapping,
// for instance extra row count column.
// those can be skipped here.
if let Some(out_index) = rhs_mapping.try_map(j) {
merged_rows[i][out_index] = d.to_owned_datum();
}
}
}
let mut builder = DataChunkBuilder::new(data_types.to_vec(), cardinality);
Expand Down

0 comments on commit a8b2811

Please sign in to comment.