Skip to content

Commit

Permalink
Added comments and new struct for sink plan in handler files
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Dec 9, 2023
1 parent 703cfd6 commit 910b473
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ message ReplaceTablePlan {
// The new materialization plan, where all schema are updated.
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
// If no column modifications occur (such as for sinking into table), this will be None.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
Expand Down
26 changes: 21 additions & 5 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,19 @@ pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result<Query> {
})
}

#[allow(clippy::type_complexity)]
// used to store result of `gen_sink_plan`
pub struct SinkPlanContext {
pub query: Box<Query>,
pub sink_plan: PlanRef,
pub sink_catalog: SinkCatalog,
pub target_table_catalog: Option<Arc<TableCatalog>>,
}

pub fn gen_sink_plan(
session: &SessionImpl,
context: OptimizerContextRef,
stmt: CreateSinkStatement,
) -> Result<(Box<Query>, PlanRef, SinkCatalog, Option<Arc<TableCatalog>>)> {
) -> Result<SinkPlanContext> {
let db_name = session.database();
let (sink_schema_name, sink_table_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
Expand Down Expand Up @@ -256,7 +263,12 @@ pub fn gen_sink_plan(
}
};

Ok((query, sink_plan, sink_catalog, target_table_catalog))
Ok(SinkPlanContext {
query,
sink_plan,
sink_catalog,
target_table_catalog,
})
}

pub async fn handle_create_sink(
Expand All @@ -276,8 +288,12 @@ pub async fn handle_create_sink(
let (sink, graph, target_table_catalog) = {
let context = Rc::new(OptimizerContext::from_handler_args(handle_args));

let (query, plan, sink, target_table_catalog) =
gen_sink_plan(&session, context.clone(), stmt)?;
let SinkPlanContext {
query,
sink_plan: plan,
sink_catalog: sink,
target_table_catalog,
} = gen_sink_plan(&session, context.clone(), stmt)?;

let has_order_by = !query.order_by.is_empty();
if has_order_by {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn do_handle_explain(
.map(|x| x.0),

Statement::CreateSink { stmt } => {
gen_sink_plan(&session, context.clone(), stmt).map(|x| x.1)
gen_sink_plan(&session, context.clone(), stmt).map(|plan| plan.sink_plan)
}

Statement::CreateIndex {
Expand Down

0 comments on commit 910b473

Please sign in to comment.