Skip to content

Commit

Permalink
fix agg key merge schema
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 22, 2024
1 parent 25b4433 commit 85b8e3e
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 42 deletions.
9 changes: 8 additions & 1 deletion src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,13 @@
expected_outputs:
- batch_plan
- stream_plan
- name: test duplicate agg
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as x, count(v1) as y, sum(v1) as z, count(v1) as w from t;
expected_outputs:
- logical_plan
- stream_plan
- name: test simple approx_percentile alone
sql: |
CREATE TABLE t (v1 int);
Expand All @@ -1017,7 +1024,7 @@
- name: test simple approx_percentile with other simple aggs (sum, count)
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1), count(v1) from t;
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) as s2, count(v1) from t;
expected_outputs:
- logical_plan
- stream_plan
77 changes: 47 additions & 30 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,22 @@
└─StreamHashAgg { group_key: [t.a, t.b], aggs: [sum(t.c), sum(t.d), count(t.d), max(t.e), count] }
└─StreamExchange { dist: HashShard(t.a, t.b) }
└─StreamTableScan { table: t, columns: [t.a, t.b, t.c, t.d, t.e, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test duplicate agg
sql: |
CREATE TABLE t (v1 int);
SELECT sum(v1) as x, count(v1) as y, sum(v1) as z, count(v1) as w from t;
logical_plan: |-
LogicalProject { exprs: [sum(t.v1), count(t.v1), sum(t.v1), count(t.v1)] }
└─LogicalAgg { aggs: [sum(t.v1), count(t.v1)] }
└─LogicalProject { exprs: [t.v1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [x, y, z, w], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), sum0(count(t.v1)), sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count(t.v1)] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test simple approx_percentile alone
sql: |
CREATE TABLE t (v1 int);
Expand All @@ -1888,39 +1904,40 @@
└─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile, sum], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile($expr10011):Float64, sum(t.v1):Int64] }
├─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] }
│ └─StreamExchange { dist: Single }
│ └─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamGlobalApproxPercentile
└─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Decimal, relative_error: 0.01:Decimal }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
StreamMaterialize { columns: [approx_percentile, sum], stream_key: [approx_percentile], pk_columns: [approx_percentile], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile:Float64, sum(sum(t.v1)):Int64] }
├─StreamGlobalApproxPercentile
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Decimal, relative_error: 0.01:Decimal }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(t.v1)] }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: test simple approx_percentile with other simple aggs (sum, count)
sql: |
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1), count(v1) from t;
SELECT sum(v1) as s1, approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1), sum(v1) as s2, count(v1) from t;
logical_plan: |-
LogicalProject { exprs: [approx_percentile($expr1), sum(t.v1), count(t.v1)] }
└─LogicalAgg { aggs: [approx_percentile($expr1), sum(t.v1), count(t.v1)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] }
LogicalProject { exprs: [sum(t.v1), approx_percentile($expr1), sum(t.v1), count(t.v1)] }
└─LogicalAgg { aggs: [sum(t.v1), approx_percentile($expr1), count(t.v1)] }
└─LogicalProject { exprs: [t.v1, t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile, sum, count], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamKeyedMerge { output: [approx_percentile($expr10011):Float64, sum(t.v1):Int64, count(t.v1):Int64] }
├─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count] }
│ └─StreamExchange { dist: Single }
│ └─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count(t.v1)] }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamGlobalApproxPercentile
└─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Decimal, relative_error: 0.01:Decimal }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
StreamMaterialize { columns: [s1, approx_percentile, s2, count], stream_key: [s2], pk_columns: [s2], pk_conflict: NoCheck }
└─StreamProject { exprs: [sum(sum(t.v1)), approx_percentile, sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamKeyedMerge { output: [sum(sum(t.v1)):Int64, approx_percentile:Float64, sum0(count(t.v1)):Int64] }
├─StreamGlobalApproxPercentile
│ └─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Decimal, relative_error: 0.01:Decimal }
│ └─StreamShare { id: 2 }
│ └─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamSimpleAgg { aggs: [sum(sum(t.v1)), sum0(count(t.v1)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum(t.v1), count(t.v1)] }
└─StreamShare { id: 2 }
└─StreamProject { exprs: [t.v1, t.v1::Float64 as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
7 changes: 2 additions & 5 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ impl LogicalAgg {
fn gen_stateless_two_phase_streaming_agg_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
debug_assert!(self.group_key().is_empty());
let mut core = self.core.clone();
let schema = self.base.schema().clone();
println!("agg schema: {:?}", schema);

// First, handle approx percentile.
let has_approx_percentile = self
Expand Down Expand Up @@ -114,12 +112,11 @@ impl LogicalAgg {
));
if let Some((approx_percentile_agg, lhs_mapping, rhs_mapping)) = approx_percentile_info {
let keyed_merge = StreamKeyedMerge::new(
global_agg.into(),
approx_percentile_agg,
global_agg.into(),
lhs_mapping,
rhs_mapping,
schema,
);
)?;
Ok(keyed_merge.into())
} else {
Ok(global_agg.into())
Expand Down
31 changes: 25 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
Expand All @@ -29,6 +32,7 @@ use crate::optimizer::plan_node::{
};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;
use crate::error::Result;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamKeyedMerge {
Expand All @@ -47,9 +51,24 @@ impl StreamKeyedMerge {
rhs_input: PlanRef,
lhs_mapping: ColIndexMapping,
rhs_mapping: ColIndexMapping,
schema: Schema,
) -> Self {
println!("keyed merge schema: {:?}", schema);
) -> Result<Self> {
assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size());
let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size());
let mut o2i_lhs = lhs_mapping.inverse().ok_or_else(|| anyhow!("lhs_mapping should be invertible"))?;
let mut o2i_rhs = rhs_mapping.inverse().ok_or_else(|| anyhow!("rhs_mapping should be invertible"))?;
for output_idx in 0..lhs_mapping.target_size() {
if let Some(lhs_idx) = o2i_lhs.try_map(output_idx) {
schema_fields.push(lhs_input.schema().fields()[lhs_idx].clone());
} else if let Some(rhs_idx) = o2i_rhs.try_map(output_idx) {
println!("rhs schema: {:?}", rhs_input.schema().fields());
schema_fields.push(rhs_input.schema().fields()[rhs_idx].clone());
} else {
bail!("output index {} not found in either lhs or rhs mapping", output_idx);
}
}
let schema = Schema::new(schema_fields);
let watermark_columns = FixedBitSet::with_capacity(schema.fields.len());

// FIXME: schema is wrong.
let base = PlanBase::new_stream(
lhs_input.ctx(),
Expand All @@ -59,16 +78,16 @@ impl StreamKeyedMerge {
lhs_input.distribution().clone(),
lhs_input.append_only(),
lhs_input.emit_on_window_close(),
lhs_input.watermark_columns().clone(),
watermark_columns,
lhs_input.columns_monotonicity().clone(),
);
Self {
Ok(Self {
base,
lhs_input,
rhs_input,
lhs_mapping,
rhs_mapping,
}
})
}
}

Expand Down

0 comments on commit 85b8e3e

Please sign in to comment.