Skip to content

Commit

Permalink
fix stream share
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 21, 2024
1 parent 36411b2 commit 802a5f8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ pub mod agg_kinds {
| AggKind::Count
| AggKind::Avg
| AggKind::ApproxCountDistinct
| AggKind::ApproxPercentile
| AggKind::VarPop
| AggKind::VarSamp
| AggKind::StddevPop
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1004,4 +1004,5 @@
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t;
expected_outputs:
- logical_plan
- logical_plan
- stream_plan
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1871,3 +1871,10 @@
└─LogicalAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC))] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [approx_percentile($expr1 order_by(t.v1 ASC))] }
└─StreamSimpleAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl LogicalAgg {
.into());
}

let shared_input = LogicalShare::new(stream_input).to_stream(ctx)?;
let shared_input: PlanRef = StreamShare::new_from_input(stream_input).into();
let (approx_percentile_agg_call, non_approx_percentile_agg_calls, lhs_mapping, rhs_mapping) =
self.extract_approx_percentile();
let approx_percentile_agg =
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cell::RefCell;

use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;
Expand Down Expand Up @@ -50,6 +52,13 @@ impl StreamShare {

StreamShare { base, core }
}

pub fn new_from_input(input: PlanRef) -> Self {
let core = generic::Share {
input: RefCell::new(input),
};
Self::new(core)
}
}

impl Distill for StreamShare {
Expand Down

0 comments on commit 802a5f8

Please sign in to comment.