Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): use getters to access plan base, with runtime convention checking #12980

Merged
merged 13 commits into from
Oct 24, 2023
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pretty-xmlish = "0.1.13"
pretty_assertions = "1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
readonly = "0.2"
risingwave_batch = { workspace = true }
risingwave_common = { workspace = true }
risingwave_common_service = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::binder::{Binder, BoundQuery, BoundSetExpr};
use crate::catalog::{check_valid_column_name, CatalogError};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::Explain;
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
use crate::planner::Planner;
Expand Down Expand Up @@ -175,7 +176,7 @@ It only indicates the physical clustering of the data, which may improve the per

let (plan, table) =
gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?;
let context = plan.plan_base().ctx.clone();
let context = plan.plan_base().ctx().clone();
let mut graph = build_graph(plan);
graph.parallelism = session
.config()
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/optimizer/plan_node/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::generic::GenericPlanRef;
use super::generic::PhysicalPlanRef;
use crate::optimizer::property::Order;

/// A subtrait of [`GenericPlanRef`] for batch plans.
/// A subtrait of [`PhysicalPlanRef`] for batch plans.
///
/// Due to the lack of refactoring, all plan nodes currently implement this trait
/// through [`super::PlanBase`]. One may still use this trait as a bound for
/// expecting a batch plan, in contrast to [`GenericPlanRef`].
pub trait BatchPlanRef: GenericPlanRef {
/// accessing a batch plan, in contrast to [`GenericPlanRef`] or
/// [`PhysicalPlanRef`].
///
/// [`GenericPlanRef`]: super::generic::GenericPlanRef
pub trait BatchPlanRef: PhysicalPlanRef {
fn order(&self) -> &Order;
}
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct BatchDelete {
impl BatchDelete {
pub fn new(logical: generic::Delete<PlanRef>) -> Self {
assert_eq!(logical.input.distribution(), &Distribution::Single);
let base: PlanBase = PlanBase::new_batch_from_logical(
let base: PlanBase = PlanBase::new_batch_with_core(
&logical,
logical.input.distribution().clone(),
Order::any(),
Expand Down
14 changes: 8 additions & 6 deletions src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};

use super::batch::BatchPlanRef;
use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
Expand All @@ -43,12 +45,12 @@ impl Distill for BatchExchange {
fn distill<'a>(&self) -> XmlNode<'a> {
let input_schema = self.input.schema();
let order = OrderDisplay {
order: &self.base.order,
order: self.base.order(),
input_schema,
}
.distill();
let dist = Pretty::display(&DistributionDisplay {
distribution: &self.base.dist,
distribution: self.base.distribution(),
input_schema,
});
childless_record("BatchExchange", vec![("order", order), ("dist", dist)])
Expand All @@ -75,18 +77,18 @@ impl ToDistributedBatch for BatchExchange {
/// The serialization of Batch Exchange is default cuz it will be rewritten in scheduler.
impl ToBatchPb for BatchExchange {
fn to_batch_prost_body(&self) -> NodeBody {
if self.base.order.is_any() {
if self.base.order().is_any() {
NodeBody::Exchange(ExchangeNode {
sources: vec![],
input_schema: self.base.schema.to_prost(),
input_schema: self.base.schema().to_prost(),
})
} else {
NodeBody::MergeSortExchange(MergeSortExchangeNode {
exchange: Some(ExchangeNode {
sources: vec![],
input_schema: self.base.schema.to_prost(),
input_schema: self.base.schema().to_prost(),
}),
column_orders: self.base.order.to_protobuf(),
column_orders: self.base.order().to_protobuf(),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl BatchExpand {
| Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
Distribution::Broadcast => unreachable!(),
};
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
let base = PlanBase::new_batch_with_core(&logical, dist, Order::any());
BatchExpand { base, logical }
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct BatchFilter {
impl BatchFilter {
pub fn new(logical: generic::Filter<PlanRef>) -> Self {
// TODO: derive from input
let base = PlanBase::new_batch_from_logical(
let base = PlanBase::new_batch_with_core(
&logical,
logical.input.distribution().clone(),
logical.input.order().clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct BatchGroupTopN {
impl BatchGroupTopN {
pub fn new(logical: generic::TopN<PlanRef>) -> Self {
assert!(!logical.group_key.is_empty());
let base = PlanBase::new_batch_from_logical(
let base = PlanBase::new_batch_with_core(
&logical,
logical.input.distribution().clone(),
Order::any(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl BatchHashAgg {
let dist = logical
.i2o_col_mapping()
.rewrite_provided_distribution(input_dist);
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
let base = PlanBase::new_batch_with_core(&logical, dist, Order::any());
BatchHashAgg { base, logical }
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl BatchHashJoin {
logical.right.distribution(),
&logical,
);
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
let base = PlanBase::new_batch_with_core(&logical, dist, Order::any());

Self {
base,
Expand Down Expand Up @@ -99,7 +99,7 @@ impl BatchHashJoin {

impl Distill for BatchHashJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl BatchHopWindow {
let distribution = logical
.i2o_col_mapping()
.rewrite_provided_distribution(logical.input.distribution());
let base = PlanBase::new_batch_from_logical(
let base = PlanBase::new_batch_with_core(
&logical,
distribution,
logical.get_out_column_index_order(),
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::InsertNode;
use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};

use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::Expr;
Expand All @@ -34,7 +35,7 @@ pub struct BatchInsert {
impl BatchInsert {
pub fn new(logical: generic::Insert<PlanRef>) -> Self {
assert_eq!(logical.input.distribution(), &Distribution::Single);
let base: PlanBase = PlanBase::new_batch_from_logical(
let base: PlanBase = PlanBase::new_batch_with_core(
&logical,
logical.input.distribution().clone(),
Order::any(),
Expand All @@ -48,7 +49,7 @@ impl Distill for BatchInsert {
fn distill<'a>(&self) -> XmlNode<'a> {
let vec = self
.logical
.fields_pretty(self.base.ctx.is_explain_verbose());
.fields_pretty(self.base.ctx().is_explain_verbose());
childless_record("BatchInsert", vec)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LimitNode;

use super::generic::PhysicalPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
Expand All @@ -32,7 +33,7 @@ pub struct BatchLimit {

impl BatchLimit {
pub fn new(logical: generic::Limit<PlanRef>) -> Self {
let base = PlanBase::new_batch_from_logical(
let base = PlanBase::new_batch_with_core(
&logical,
logical.input.distribution().clone(),
logical.input.order().clone(),
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};

use super::generic::{self};
use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::ExprRewritable;
use crate::expr::{Expr, ExprRewriter};
Expand Down Expand Up @@ -68,7 +68,7 @@ impl BatchLookupJoin {
assert!(eq_join_predicate.has_eq());
assert!(eq_join_predicate.eq_keys_are_type_aligned());
let dist = Self::derive_dist(logical.left.distribution(), &logical);
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
let base = PlanBase::new_batch_with_core(&logical, dist, Order::any());
Self {
base,
logical,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl BatchLookupJoin {

impl Distill for BatchLookupJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::NestedLoopJoinNode;

use super::generic::{self};
use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb, ToDistributedBatch};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
Expand All @@ -37,7 +37,7 @@ pub struct BatchNestedLoopJoin {
impl BatchNestedLoopJoin {
pub fn new(logical: generic::Join<PlanRef>) -> Self {
let dist = Self::derive_dist(logical.left.distribution(), logical.right.distribution());
let base = PlanBase::new_batch_from_logical(&logical, dist, Order::any());
let base = PlanBase::new_batch_with_core(&logical, dist, Order::any());
Self { base, logical }
}

Expand All @@ -51,7 +51,7 @@ impl BatchNestedLoopJoin {

impl Distill for BatchNestedLoopJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.logical.join_type)));

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortOverWindowNode;

use super::batch::BatchPlanRef;
use super::generic::PlanWindowFunction;
use super::utils::impl_distill_by_unit;
use super::{
Expand Down Expand Up @@ -47,7 +48,7 @@ impl BatchOverWindow {
.collect(),
);

let base = PlanBase::new_batch_from_logical(&logical, input_dist, order);
let base = PlanBase::new_batch_with_core(&logical, input_dist, order);
BatchOverWindow { base, logical }
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ProjectNode;
use risingwave_pb::expr::ExprNode;

use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
Expand All @@ -43,7 +44,7 @@ impl BatchProject {
.i2o_col_mapping()
.rewrite_provided_order(logical.input.order());

let base = PlanBase::new_batch_from_logical(&logical, distribution, order);
let base = PlanBase::new_batch_with_core(&logical, distribution, order);
BatchProject { base, logical }
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/batch_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl BatchProjectSet {
.i2o_col_mapping()
.rewrite_provided_distribution(logical.input.distribution());

let base = PlanBase::new_batch_from_logical(
let base = PlanBase::new_batch_with_core(
&logical,
distribution,
logical.get_out_column_index_order(),
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode};
use risingwave_pb::plan_common::PbColumnDesc;

use super::batch::BatchPlanRef;
use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::catalog::ColumnId;
Expand All @@ -46,7 +48,7 @@ impl BatchSeqScan {
} else {
logical.get_out_column_index_order()
};
let base = PlanBase::new_batch_from_logical(&logical, dist, order);
let base = PlanBase::new_batch_with_core(&logical, dist, order);

{
// validate scan_range
Expand Down Expand Up @@ -180,7 +182,7 @@ fn range_to_string(name: &str, range: &(Bound<ScalarImpl>, Bound<ScalarImpl>)) -

impl Distill for BatchSeqScan {
fn distill<'a>(&self) -> XmlNode<'a> {
let verbose = self.base.ctx.is_explain_verbose();
let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(4);
vec.push(("table", Pretty::from(self.logical.table_name.clone())));
vec.push(("columns", self.logical.columns_pretty(verbose)));
Expand All @@ -196,7 +198,7 @@ impl Distill for BatchSeqScan {
if verbose {
let dist = Pretty::display(&DistributionDisplay {
distribution: self.distribution(),
input_schema: &self.base.schema,
input_schema: self.base.schema(),
});
vec.push(("distribution", dist));
}
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortAggNode;

use super::generic::{self, PlanAggCall};
use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::ExprRewriter;
Expand All @@ -32,7 +32,7 @@ pub struct BatchSimpleAgg {
impl BatchSimpleAgg {
pub fn new(logical: generic::Agg<PlanRef>) -> Self {
let input_dist = logical.input.distribution().clone();
let base = PlanBase::new_batch_from_logical(&logical, input_dist, Order::any());
let base = PlanBase::new_batch_with_core(&logical, input_dist, Order::any());
BatchSimpleAgg { base, logical }
}

Expand All @@ -41,8 +41,11 @@ impl BatchSimpleAgg {
}

fn two_phase_agg_enabled(&self) -> bool {
let session_ctx = self.base.ctx.session_ctx();
session_ctx.config().get_enable_two_phase_agg()
self.base
.ctx()
.session_ctx()
.config()
.get_enable_two_phase_agg()
}

pub(crate) fn can_two_phase_agg(&self) -> bool {
Expand Down
Loading
Loading