Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed May 6, 2024
1 parent a1ab0cd commit c913894
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ fn assemble_materialize(
}))
.collect_vec();

PlanRoot::new(
PlanRoot::new_with_logical_plan(
logical_project,
// schema of logical_project is such that index columns come first.
// so we can use distributed_by_columns_len to represent distributed by columns indices.
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ fn gen_table_plan_inner(
.into();

let required_cols = FixedBitSet::with_capacity(columns.len());
let plan_root = PlanRoot::new(
let plan_root = PlanRoot::new_with_logical_plan(
source_node,
RequiredDist::Any,
Order::any(),
Expand Down Expand Up @@ -859,7 +859,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(

let scan_node: PlanRef = logical_scan.into();
let required_cols = FixedBitSet::with_capacity(columns.len());
let plan_root = PlanRoot::new(
let plan_root = PlanRoot::new_with_logical_plan(
scan_node,
RequiredDist::Any,
Order::any(),
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub fn create_batch_plan_for_cursor(
let out_fields = FixedBitSet::from_iter(0..batch_log_seq_scan.core().schema().len());
let out_names = batch_log_seq_scan.core().column_names();
// Here we just need a plan_root to call the method, only out_fields and out_names will be used
let mut plan_root = PlanRoot::new(
let plan_root = PlanRoot::new_with_batch_plan(
PlanRef::from(batch_log_seq_scan.clone()),
RequiredDist::single(),
Order::default(),
Expand All @@ -176,15 +176,15 @@ pub fn create_batch_plan_for_cursor(
let schema = batch_log_seq_scan.core().schema().clone();
let (batch_log_seq_scan, query_mode) = match handle_args.session.config().query_mode() {
QueryMode::Auto => (
plan_root.gen_batch_distributed_plan(PlanRef::from(batch_log_seq_scan))?,
plan_root.gen_batch_local_plan()?,
QueryMode::Local,
),
QueryMode::Local => (
plan_root.gen_batch_local_plan(PlanRef::from(batch_log_seq_scan))?,
plan_root.gen_batch_local_plan()?,
QueryMode::Local,
),
QueryMode::Distributed => (
plan_root.gen_batch_distributed_plan(PlanRef::from(batch_log_seq_scan))?,
plan_root.gen_batch_distributed_plan()?,
QueryMode::Distributed,
),
};
Expand Down
29 changes: 26 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,43 @@ pub enum PlanPhase {
}

impl PlanRoot {
pub fn new(
pub fn new_with_logical_plan(
plan: PlanRef,
required_dist: RequiredDist,
required_order: Order,
out_fields: FixedBitSet,
out_names: Vec<String>,
) -> Self {
assert_eq!(plan.convention(), Convention::Logical);
Self::new_inner(plan, PlanPhase::Logical, required_dist, required_order, out_fields, out_names)
}

pub fn new_with_batch_plan(
plan: PlanRef,
required_dist: RequiredDist,
required_order: Order,
out_fields: FixedBitSet,
out_names: Vec<String>,
) -> Self {
assert_eq!(plan.convention(), Convention::Batch);
Self::new_inner(plan, PlanPhase::Batch, required_dist, required_order, out_fields, out_names)
}

fn new_inner(
plan: PlanRef,
phase: PlanPhase,
required_dist: RequiredDist,
required_order: Order,
out_fields: FixedBitSet,
out_names: Vec<String>,
) -> Self {
let input_schema = plan.schema();
assert_eq!(input_schema.fields().len(), out_fields.len());
assert_eq!(out_fields.count_ones(..), out_names.len());

Self {
plan,
phase: PlanPhase::Logical,
phase,
required_dist,
required_order,
out_fields,
Expand Down Expand Up @@ -1122,7 +1145,7 @@ mod tests {
.into();
let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
let out_names = vec!["v1".into()];
let root = PlanRoot::new(
let root = PlanRoot::new_with_logical_plan(
values,
RequiredDist::Any,
Order::any(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/planner/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Planner {
plan.schema().names()
};

let root = PlanRoot::new(plan, dist, Order::any(), out_fields, out_names);
let root = PlanRoot::new_with_logical_plan(plan, dist, Order::any(), out_fields, out_names);
Ok(root)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/planner/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Planner {
} else {
plan.schema().names()
};
let root = PlanRoot::new(plan, dist, Order::any(), out_fields, out_names);
let root = PlanRoot::new_with_logical_plan(plan, dist, Order::any(), out_fields, out_names);
Ok(root)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/planner/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Planner {
// Do not output projected_row_id hidden column.
out_fields.set(0, false);
}
let root = PlanRoot::new(plan, RequiredDist::Any, order, out_fields, out_names);
let root = PlanRoot::new_with_logical_plan(plan, RequiredDist::Any, order, out_fields, out_names);
Ok(root)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/planner/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Planner {
plan.schema().names()
};

let root = PlanRoot::new(plan, dist, Order::any(), out_fields, out_names);
let root = PlanRoot::new_with_logical_plan(plan, dist, Order::any(), out_fields, out_names);
Ok(root)
}
}

0 comments on commit c913894

Please sign in to comment.