Skip to content

Commit

Permalink
add commment
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Oct 24, 2023
1 parent ccd7dfe commit 6b35e9c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
13 changes: 13 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,19 @@
└─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t (x int, y int);
select first_value(distinct x order by x asc) from t;
batch_plan: |-
BatchSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC))] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [first_value(distinct t.x order_by(t.x ASC))] }
└─StreamSimpleAgg { aggs: [first_value(distinct t.x order_by(t.x ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
Expand Down
38 changes: 18 additions & 20 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::{fmt, vec};

use fixedbitset::FixedBitSet;
use itertools::{Either, Itertools};
Expand Down Expand Up @@ -348,8 +348,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

let gen_materialized_input_state = |sort_keys: Vec<(OrderType, usize)>,
include_keys: Vec<usize>,
distinct_key: Option<usize>|
extra_keys: Vec<usize>,
include_keys: Vec<usize>|
-> MaterializedInputState {
let (mut table_builder, mut included_upstream_indices, mut column_mapping) =
self.create_table_builder(me.ctx(), window_col_idx);
Expand All @@ -376,17 +376,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
for (order_type, idx) in sort_keys {
add_column(idx, Some(order_type), true, &mut table_builder);
}
if let Some(distinct_key) = distinct_key {
add_column(
distinct_key,
Some(OrderType::ascending()),
true,
&mut table_builder,
);
} else {
for &idx in &in_pks {
add_column(idx, Some(OrderType::ascending()), true, &mut table_builder);
}
for idx in extra_keys {
add_column(idx, Some(OrderType::ascending()), true, &mut table_builder);
}
for idx in include_keys {
add_column(idx, None, true, &mut table_builder);
Expand Down Expand Up @@ -468,6 +459,17 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
_ => unreachable!(),
}
};

// columns to ensure each row unique
let extra_keys = if agg_call.distinct {
// if distinct, use distinct keys as extra keys
let distinct_key = agg_call.inputs[0].index;
vec![distinct_key]
} else {
// if not distinct, use primary keys as extra keys
in_pks.clone()
};

// other columns that should be contained in state table
let include_keys = match agg_call.agg_kind {
AggKind::FirstValue
Expand All @@ -480,12 +482,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
}
_ => vec![],
};
let state = if agg_call.distinct {
let distinct_key = agg_call.inputs[0].index;
gen_materialized_input_state(sort_keys, include_keys, Some(distinct_key))
} else {
gen_materialized_input_state(sort_keys, include_keys, None)
};

let state = gen_materialized_input_state(sort_keys, extra_keys, include_keys);
AggCallState::MaterializedInput(Box::new(state))
}
agg_kinds::rewritten!() => {
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,15 @@ impl MaterializedInputState {
};

if agg_call.distinct {
// If distinct, we need to materialize input with the distinct keys
// As we only support single-column distinct for now, we use the
// `agg_call.args.val_indices()[0]` as the distinct key.
if !order_col_indices.contains(&agg_call.args.val_indices()[0]) {
order_col_indices.push(agg_call.args.val_indices()[0]);
order_types.push(OrderType::ascending());
}
} else {
// If not distinct, we need to materialize input with the primary keys
let pk_len = pk_indices.len();
order_col_indices.extend(pk_indices.iter());
order_types.extend(itertools::repeat_n(OrderType::ascending(), pk_len));
Expand Down

0 comments on commit 6b35e9c

Please sign in to comment.