diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 37ed893b19b8c..5d999fdd29d4c 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -319,7 +319,6 @@ fn check_cycle_for_sink( visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables) } - pub async fn handle_create_sink( handle_args: HandlerArgs, stmt: CreateSinkStatement, @@ -352,10 +351,13 @@ pub async fn handle_create_sink( let mut graph = build_graph(plan); - graph.parallelism = session - .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }); + graph.parallelism = + session + .config() + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }); (sink, graph, target_table_catalog) }; @@ -449,114 +451,6 @@ pub async fn handle_create_sink( }); } - let (sink, graph) = { - let context = Rc::new(OptimizerContext::from_handler_args(handle_args)); - - let (query, mut plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?; - - if let Some(replace_table_plan) = &affected_table_change { - let affected_table_catalog = replace_table_plan.table.as_ref().unwrap(); - - for column in sink.full_columns() { - if column.is_generated() { - return Err(RwError::from(ErrorCode::BindError( - "The sink to table feature for Sinks with generated columns has not been implemented yet." - .to_string(), - ))); - } - } - - let user_defined_primary_key_table = !(affected_table_catalog.append_only - || affected_table_catalog.row_id_index.is_some()); - - if !(user_defined_primary_key_table - || sink.sink_type == SinkType::AppendOnly - || sink.sink_type == SinkType::ForceAppendOnly) - { - return Err(RwError::from(ErrorCode::BindError( - "Only append-only sinks can sink to a table without primary keys.".to_string(), - ))); - } - - let mut exprs = vec![]; - - let table_columns = affected_table_catalog - .get_columns() - .iter() - .map(|col| ColumnCatalog::from(col.clone())) - .collect_vec(); - - let sink_visible_columns = sink - .full_columns() - .iter() - .enumerate() - .filter(|(_i, c)| !c.is_hidden()) - .collect_vec(); - - for (idx, table_column) in table_columns.iter().enumerate() { - if table_column.is_generated() { - continue; - } - - let data_type = table_column.data_type(); - - if idx < sink_visible_columns.len() { - let (sink_col_idx, sink_column) = sink_visible_columns[idx]; - - let sink_col_type = sink_column.data_type(); - - if data_type != sink_col_type { - bail!( - "column type mismatch: {:?} vs {:?}", - data_type, - sink_col_type - ); - } else { - exprs.push(ExprImpl::InputRef(Box::new(InputRef::new( - sink_col_idx, - data_type.clone(), - )))); - } - } else { - exprs.push(ExprImpl::Literal(Box::new(Literal::new( - None, - data_type.clone(), - )))); - }; - } - - let logical_project = generic::Project::new(exprs, plan); - - plan = StreamProject::new(logical_project).into(); - - let exprs = LogicalSource::derive_output_exprs_from_generated_columns(&table_columns)?; - if let Some(exprs) = exprs { - let logical_project = generic::Project::new(exprs, plan); - plan = StreamProject::new(logical_project).into(); - } - }; - - let has_order_by = !query.order_by.is_empty(); - if has_order_by { - context.warn_to_user( - r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."# - .to_string(), - ); - } - - let mut graph = build_graph(plan); - - graph.parallelism = - session - .config() - .streaming_parallelism() - .map(|parallelism| Parallelism { - parallelism: parallelism.get(), - }); - - (sink, graph) - }; - let _job_guard = session .env() diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 6eb372557447d..a2b8f576a0716 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -1139,8 +1139,10 @@ pub async fn generate_stream_graph_for_table( let graph = StreamFragmentGraph { parallelism: session .config() - .get_streaming_parallelism() - .map(|parallelism| Parallelism { parallelism }), + .streaming_parallelism() + .map(|parallelism| Parallelism { + parallelism: parallelism.get(), + }), ..build_graph(plan) }; diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 10a30f9dab724..ea9c21b2670c3 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -1098,16 +1098,6 @@ impl CatalogManager { .collect() }; - let check_sink_into_table_in_dependency = |relation_id: RelationId| -> Vec { - let mut table_sinks = vec![]; - for relation_info in relations_depend_on(relation_id) { - if let RelationInfo::Sink(s) = relation_info && s.target_table.is_some() { - table_sinks.push(s); - } - } - table_sinks - }; - // Initial push into deque. match relation { RelationIdEnum::Table(table_id) => { @@ -1168,14 +1158,6 @@ impl CatalogManager { continue; } - let associated_sink_into_tables = check_sink_into_table_in_dependency(table_id); - if !associated_sink_into_tables.is_empty() { - bail!( - "Found {} sink(s) into table in dependency, please drop them manually", - associated_sink_into_tables.len() - ); - } - let table_fragments = fragment_manager .select_table_fragments_by_table_id(&table_id.into()) .await?; @@ -1332,15 +1314,6 @@ impl CatalogManager { continue; } - let associated_sink_into_tables = - check_sink_into_table_in_dependency(source.id as RelationId); - if !associated_sink_into_tables.is_empty() { - bail!( - "Found {} sink(s) into table in dependency, please drop them manually", - associated_sink_into_tables.len() - ); - } - // cdc source streaming job if let Some(info) = source.info && info.cdc_source_job @@ -1380,14 +1353,6 @@ impl CatalogManager { continue; } - let associated_sink_into_tables = check_sink_into_table_in_dependency(view.id); - if !associated_sink_into_tables.is_empty() { - bail!( - "Found {} sink(s) into table in dependency, please drop them manually", - associated_sink_into_tables.len() - ); - } - if let Some(ref_count) = database_core.relation_ref_count.get(&view.id).cloned() { if ref_count > 0 { @@ -1468,6 +1433,19 @@ impl CatalogManager { .map(|sink_id| sinks.remove(*sink_id).unwrap()) .collect_vec(); + if !matches!(relation, RelationIdEnum::Sink(_)) { + let table_sink = sinks_removed + .iter() + .filter(|sink| sink.target_table.is_some()) + .collect_vec(); + if !table_sink.is_empty() { + bail!( + "Found {} sink(s) into table in dependency, please drop them manually", + table_sink.len() + ); + } + } + let internal_tables = all_internal_table_ids .iter() .map(|internal_table_id| { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 1c87b9acdbcd7..14190fae7a31b 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -696,7 +696,6 @@ impl DdlController { hash_mapping: Some(actor_mapping.to_protobuf()), dispatcher_id: union_fragment.fragment_id as _, downstream_actor_id: downstream_actor_ids.clone(), - downstream_table_name: None, }], ); }