From 22b1ab981ca68e9c619f4a9f2f006304b3a29883 Mon Sep 17 00:00:00 2001 From: wu Date: Tue, 12 Sep 2023 14:45:18 +0800 Subject: [PATCH] feat: alter column for table with connector (#12164) --- ci/scripts/e2e-source-test.sh | 4 + e2e_test/source/basic/alter/kafka.slt | 28 +++++- .../basic/alter/kafka_after_new_data.slt | 15 ++++ .../basic/alter/kafka_after_new_data_2.slt | 14 +++ proto/ddl_service.proto | 2 + scripts/source/alter_data/kafka_alter.3 | 1 + src/frontend/src/catalog/catalog_service.rs | 4 +- .../src/handler/alter_source_column.rs | 6 ++ .../src/handler/alter_table_column.rs | 89 ++++++++++++++----- src/frontend/src/handler/create_source.rs | 2 +- src/frontend/src/handler/util.rs | 1 + src/frontend/src/test_utils.rs | 1 + src/meta/src/barrier/command.rs | 9 ++ src/meta/src/manager/catalog/mod.rs | 54 ++++++++++- src/meta/src/rpc/ddl_controller.rs | 12 +-- src/meta/src/rpc/service/ddl_service.rs | 75 ++++++++++------ src/meta/src/stream/stream_manager.rs | 6 ++ src/rpc_client/src/meta_client.rs | 2 + 18 files changed, 258 insertions(+), 67 deletions(-) create mode 100644 e2e_test/source/basic/alter/kafka_after_new_data_2.slt create mode 100644 scripts/source/alter_data/kafka_alter.3 diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index def1368641641..3dc25892a4615 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -141,6 +141,10 @@ chmod +x ./scripts/source/prepare_data_after_alter.sh ./scripts/source/prepare_data_after_alter.sh 2 sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data.slt' +echo "--- e2e, kafka alter source again" +./scripts/source/prepare_data_after_alter.sh 3 +sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' + echo "--- Run CH-benCHmark" ./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/batch/ch_benchmark.slt' ./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/streaming/*.slt' diff --git a/e2e_test/source/basic/alter/kafka.slt b/e2e_test/source/basic/alter/kafka.slt index 6e2b7b88d2727..7b355f6407e52 100644 --- a/e2e_test/source/basic/alter/kafka.slt +++ b/e2e_test/source/basic/alter/kafka.slt @@ -14,13 +14,22 @@ CREATE SOURCE s2 (v2 varchar) with ( scan.startup.mode = 'earliest' ) FORMAT PLAIN ENCODE JSON; +statement ok +CREATE TABLE t (v1 int) with ( + connector = 'kafka', + topic = 'kafka_alter', + properties.bootstrap.server = 'message_queue:29092', + scan.startup.mode = 'earliest' +) FORMAT PLAIN ENCODE JSON; + + statement ok create materialized view mv1 as select * from s1; statement ok create materialized view mv2 as select * from s2; -sleep 10s +sleep 5s statement ok flush; @@ -35,6 +44,11 @@ select * from s2; ---- 11 +query I +select * from t; +---- +1 + # alter source statement ok alter source s1 add column v2 varchar; @@ -49,7 +63,10 @@ create materialized view mv3 as select * from s1; statement ok create materialized view mv4 as select * from s2; -sleep 10s +statement ok +alter table t add column v2 varchar; + +sleep 5s statement ok flush; @@ -84,6 +101,11 @@ select * from mv4 ---- 11 NULL +query IT +select * from t +---- +1 NULL + # alter source again statement ok alter source s1 add column v3 int; @@ -91,7 +113,7 @@ alter source s1 add column v3 int; statement ok create materialized view mv5 as select * from s1; -sleep 10s +sleep 5s statement ok flush; diff --git a/e2e_test/source/basic/alter/kafka_after_new_data.slt b/e2e_test/source/basic/alter/kafka_after_new_data.slt index 2b0ab659766e9..5a73b749079f9 100644 --- a/e2e_test/source/basic/alter/kafka_after_new_data.slt +++ b/e2e_test/source/basic/alter/kafka_after_new_data.slt @@ -45,6 +45,21 @@ select * from mv5 1 11 111 2 22 222 +query IT rowsort +select * from t +---- +1 NULL +2 22 + +statement ok +alter table t add column v3 int; + +query IT rowsort +select * from t +---- +1 NULL NULL +2 22 NULL + statement ok drop materialized view mv1 diff --git a/e2e_test/source/basic/alter/kafka_after_new_data_2.slt b/e2e_test/source/basic/alter/kafka_after_new_data_2.slt new file mode 100644 index 0000000000000..c10634d259138 --- /dev/null +++ b/e2e_test/source/basic/alter/kafka_after_new_data_2.slt @@ -0,0 +1,14 @@ +sleep 5s + +statement ok +flush; + +query IT rowsort +select * from t +---- +1 NULL NULL +2 22 NULL +3 33 333 + +statement ok +drop table t; \ No newline at end of file diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 35ae7e0b01bb4..27c9f2ee82f83 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -239,6 +239,8 @@ message ReplaceTablePlanRequest { stream_plan.StreamFragmentGraph fragment_graph = 2; // The mapping from the old columns to the new columns of the table. catalog.ColIndexMapping table_col_index_mapping = 3; + // Source catalog of table's associated source + catalog.Source source = 4; } message ReplaceTablePlanResponse { diff --git a/scripts/source/alter_data/kafka_alter.3 b/scripts/source/alter_data/kafka_alter.3 new file mode 100644 index 0000000000000..fb9015ae75caf --- /dev/null +++ b/scripts/source/alter_data/kafka_alter.3 @@ -0,0 +1 @@ +{"v1": 3, "v2": "33", "v3": 333} \ No newline at end of file diff --git a/src/frontend/src/catalog/catalog_service.rs b/src/frontend/src/catalog/catalog_service.rs index df3fce39004a1..14a9f9ad104cc 100644 --- a/src/frontend/src/catalog/catalog_service.rs +++ b/src/frontend/src/catalog/catalog_service.rs @@ -81,6 +81,7 @@ pub trait CatalogWriter: Send + Sync { async fn replace_table( &self, + source: Option, table: PbTable, graph: StreamFragmentGraph, mapping: ColIndexMapping, @@ -229,13 +230,14 @@ impl CatalogWriter for CatalogWriterImpl { async fn replace_table( &self, + source: Option, table: PbTable, graph: StreamFragmentGraph, mapping: ColIndexMapping, ) -> Result<()> { let version = self .meta_client - .replace_table(table, graph, mapping) + .replace_table(source, table, graph, mapping) .await?; self.wait_version(version).await } diff --git a/src/frontend/src/handler/alter_source_column.rs b/src/frontend/src/handler/alter_source_column.rs index be139fd6d6976..6e13a16185bf2 100644 --- a/src/frontend/src/handler/alter_source_column.rs +++ b/src/frontend/src/handler/alter_source_column.rs @@ -68,6 +68,12 @@ pub async fn handle_alter_source_column( None.into(), ))); } + SourceEncode::Json if catalog.info.use_schema_registry => { + return Err(RwError::from(ErrorCode::NotImplemented( + "Alter source with schema registry".into(), + None.into(), + ))); + } SourceEncode::Invalid | SourceEncode::Native => { return Err(RwError::from(ErrorCode::NotSupported( format!("Alter source with encode {:?}", encode), diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index be314befdfae3..35524e70e7f1e 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -17,16 +17,21 @@ use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_pb::catalog::table::OptionalAssociatedSourceId; use risingwave_pb::catalog::Table; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; -use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement}; +use risingwave_sqlparser::ast::{ + AlterTableOperation, ColumnOption, Encode, ObjectName, SourceSchemaV2, Statement, +}; use risingwave_sqlparser::parser::Parser; +use super::create_source::get_json_schema_location; use super::create_table::{gen_create_table_plan, ColumnIdGenerator}; use super::{HandlerArgs, RwPgResponse}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::table_catalog::TableType; +use crate::handler::create_table::gen_create_table_plan_with_source; use crate::{build_graph, Binder, OptimizerContext, TableCatalog}; /// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or @@ -51,13 +56,6 @@ pub async fn handle_alter_table_column( reader.get_table_by_name(db_name, schema_path, &real_table_name)?; match table.table_type() { - // Do not allow altering a table with a connector. It should be done passively according - // to the messages from the connector. - TableType::Table if table.has_associated_source() => { - Err(ErrorCode::InvalidInputSyntax(format!( - "cannot alter table \"{table_name}\" because it has a connector" - )))? - } TableType::Table => {} _ => Err(ErrorCode::InvalidInputSyntax(format!( @@ -82,9 +80,26 @@ pub async fn handle_alter_table_column( .context("unable to parse original table definition")? .try_into() .unwrap(); - let Statement::CreateTable { columns, .. } = &mut definition else { + let Statement::CreateTable { + columns, + source_schema, + .. + } = &mut definition + else { panic!("unexpected statement: {:?}", definition); }; + let source_schema = source_schema + .clone() + .map(|source_schema| source_schema.into_source_schema_v2()); + + if let Some(source_schema) = &source_schema { + if schema_has_schema_registry(source_schema) { + return Err(RwError::from(ErrorCode::NotImplemented( + "Alter table with source having schema registry".into(), + None.into(), + ))); + } + } match operation { AlterTableOperation::AddColumn { @@ -170,20 +185,32 @@ pub async fn handle_alter_table_column( panic!("unexpected statement type: {:?}", definition); }; - let (graph, table) = { + let (graph, table, source) = { let context = OptimizerContext::from_handler_args(handler_args); - let (plan, source, table) = gen_create_table_plan( - context, - table_name, - columns, - constraints, - col_id_gen, - source_watermarks, - append_only, - )?; - - // We should already have rejected the case where the table has a connector. - assert!(source.is_none()); + let (plan, source, table) = match source_schema { + Some(source_schema) => { + gen_create_table_plan_with_source( + context, + table_name, + columns, + constraints, + source_schema, + source_watermarks, + col_id_gen, + append_only, + ) + .await? + } + None => gen_create_table_plan( + context, + table_name, + columns, + constraints, + col_id_gen, + source_watermarks, + append_only, + )?, + }; // TODO: avoid this backward conversion. if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() { @@ -203,10 +230,13 @@ pub async fn handle_alter_table_column( // Fill the original table ID. let table = Table { id: original_catalog.id().table_id(), + optional_associated_source_id: original_catalog + .associated_source_id() + .map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())), ..table }; - (graph, table) + (graph, table, source) }; // Calculate the mapping from the original columns to the new columns. @@ -226,12 +256,23 @@ pub async fn handle_alter_table_column( let catalog_writer = session.catalog_writer()?; catalog_writer - .replace_table(table, graph, col_index_mapping) + .replace_table(source, table, graph, col_index_mapping) .await?; Ok(PgResponse::empty_result(StatementType::ALTER_TABLE)) } +fn schema_has_schema_registry(schema: &SourceSchemaV2) -> bool { + match schema.row_encode { + Encode::Avro | Encode::Protobuf => true, + Encode::Json => { + let mut options = schema.gen_options().unwrap(); + matches!(get_json_schema_location(&mut options), Ok(Some(_))) + } + _ => false, + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 78e85fca77e0c..a8f59bd5ccc61 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -252,7 +252,7 @@ fn consume_string_from_options( )))) } -fn get_json_schema_location( +pub fn get_json_schema_location( row_options: &mut BTreeMap, ) -> Result> { let schema_location = try_consume_string_from_options(row_options, "schema.location"); diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 05bab2ea5404a..92704120b1ed9 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -256,6 +256,7 @@ pub fn get_connection_name(with_properties: &BTreeMap) -> Option .get(CONNECTION_NAME_KEY) .map(|s| s.to_lowercase()) } + #[cfg(test)] mod tests { use bytes::BytesMut; diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index e934bed502f42..086c4f0c496e3 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -263,6 +263,7 @@ impl CatalogWriter for MockCatalogWriter { async fn replace_table( &self, + _source: Option, table: PbTable, _graph: StreamFragmentGraph, _mapping: ColIndexMapping, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index d0deac65b3207..8d8076e56a233 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -139,6 +139,7 @@ pub enum Command { new_table_fragments: TableFragments, merge_updates: Vec, dispatchers: HashMap>, + init_split_assignment: SplitAssignment, }, /// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or @@ -352,6 +353,7 @@ impl CommandContext { old_table_fragments, merge_updates, dispatchers, + init_split_assignment, .. } => { let dropped_actors = old_table_fragments.actor_ids(); @@ -368,10 +370,16 @@ impl CommandContext { }) .collect(); + let actor_splits = init_split_assignment + .values() + .flat_map(build_actor_connector_splits) + .collect(); + Some(Mutation::Update(UpdateMutation { actor_new_dispatchers, merge_update: merge_updates.clone(), dropped_actors, + actor_splits, ..Default::default() })) } @@ -761,6 +769,7 @@ impl CommandContext { new_table_fragments, merge_updates, dispatchers, + .. } => { let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 0e3fa33d0eec1..6d512c1133d0d 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -2020,7 +2020,10 @@ impl CatalogManager { } /// This is used for `ALTER TABLE ADD/DROP COLUMN`. - pub async fn start_replace_table_procedure(&self, table: &Table) -> MetaResult<()> { + pub async fn start_replace_table_procedure(&self, stream_job: &StreamingJob) -> MetaResult<()> { + let StreamingJob::Table(source, table) = stream_job else { + unreachable!("unexpected job: {stream_job:?}") + }; let core = &mut *self.core.lock().await; let database_core = &mut core.database; database_core.ensure_database_id(table.database_id)?; @@ -2043,6 +2046,13 @@ impl CatalogManager { if database_core.has_in_progress_creation(&key) { bail!("table is in altering procedure"); } else { + if let Some(source) = source { + let source_key = (source.database_id, source.schema_id, source.name.clone()); + if database_core.has_in_progress_creation(&source_key) { + bail!("source is in altering procedure"); + } + database_core.mark_creating(&source_key); + } database_core.mark_creating(&key); Ok(()) } @@ -2051,20 +2061,38 @@ impl CatalogManager { /// This is used for `ALTER TABLE ADD/DROP COLUMN`. pub async fn finish_replace_table_procedure( &self, + source: &Option, table: &Table, table_col_index_mapping: ColIndexMapping, ) -> MetaResult { let core = &mut *self.core.lock().await; let database_core = &mut core.database; let mut tables = BTreeMapTransaction::new(&mut database_core.tables); + let mut sources = BTreeMapTransaction::new(&mut database_core.sources); let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes); let key = (table.database_id, table.schema_id, table.name.clone()); + assert!( tables.contains_key(&table.id) && database_core.in_progress_creation_tracker.contains(&key), "table must exist and be in altering procedure" ); + if let Some(source) = source { + let source_key = (source.database_id, source.schema_id, source.name.clone()); + assert!( + sources.contains_key(&source.id) + && database_core + .in_progress_creation_tracker + .contains(&source_key), + "source must exist and be in altering procedure" + ); + sources.insert(source.id, source.clone()); + database_core + .in_progress_creation_tracker + .remove(&source_key); + } + let index_ids: Vec<_> = indexes .tree_ref() .iter() @@ -2091,7 +2119,7 @@ impl CatalogManager { database_core.in_progress_creation_tracker.remove(&key); tables.insert(table.id, table.clone()); - commit_meta!(self, tables, indexes)?; + commit_meta!(self, tables, indexes, sources)?; // Group notification let version = self @@ -2102,6 +2130,9 @@ impl CatalogManager { relation_info: RelationInfo::Table(table.to_owned()).into(), }] .into_iter() + .chain(source.iter().map(|source| Relation { + relation_info: RelationInfo::Source(source.to_owned()).into(), + })) .chain(updated_indexes.into_iter().map(|index| Relation { relation_info: RelationInfo::Index(index).into(), })) @@ -2114,7 +2145,13 @@ impl CatalogManager { } /// This is used for `ALTER TABLE ADD/DROP COLUMN`. - pub async fn cancel_replace_table_procedure(&self, table: &Table) -> MetaResult<()> { + pub async fn cancel_replace_table_procedure( + &self, + stream_job: &StreamingJob, + ) -> MetaResult<()> { + let StreamingJob::Table(source, table) = stream_job else { + unreachable!("unexpected job: {stream_job:?}") + }; let core = &mut *self.core.lock().await; let database_core = &mut core.database; let key = (table.database_id, table.schema_id, table.name.clone()); @@ -2127,6 +2164,17 @@ impl CatalogManager { "table must exist and must be in altering procedure" ); + if let Some(source) = source { + let source_key = (source.database_id, source.schema_id, source.name.clone()); + assert!( + database_core.sources.contains_key(&source.id) + && database_core.has_in_progress_creation(&source_key), + "source must exist and must be in altering procedure" + ); + + database_core.unmark_creating(&source_key); + } + // TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must // occur after it's created. We may need to add a new tracker for `alter` procedure.s database_core.unmark_creating(&key); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index a11fe815609cd..254e1ae5eca29 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -836,7 +836,7 @@ impl DdlController { // 3. Mark current relation as "updating". self.catalog_manager - .start_replace_table_procedure(stream_job.table().unwrap()) + .start_replace_table_procedure(stream_job) .await?; Ok(fragment_graph) @@ -947,22 +947,18 @@ impl DdlController { stream_job: &StreamingJob, table_col_index_mapping: ColIndexMapping, ) -> MetaResult { - let StreamingJob::Table(None, table) = stream_job else { + let StreamingJob::Table(source, table) = stream_job else { unreachable!("unexpected job: {stream_job:?}") }; self.catalog_manager - .finish_replace_table_procedure(table, table_col_index_mapping) + .finish_replace_table_procedure(source, table, table_col_index_mapping) .await } async fn cancel_replace_table(&self, stream_job: &StreamingJob) -> MetaResult<()> { - let StreamingJob::Table(None, table) = stream_job else { - unreachable!("unexpected job: {stream_job:?}") - }; - self.catalog_manager - .cancel_replace_table_procedure(table) + .cancel_replace_table_procedure(stream_job) .await } diff --git a/src/meta/src/rpc/service/ddl_service.rs b/src/meta/src/rpc/service/ddl_service.rs index a744ba910198d..8cf8f3e419a50 100644 --- a/src/meta/src/rpc/service/ddl_service.rs +++ b/src/meta/src/rpc/service/ddl_service.rs @@ -25,11 +25,12 @@ use risingwave_pb::catalog::connection::private_link_service::{ use risingwave_pb::catalog::connection::PbPrivateLinkService; use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::table::OptionalAssociatedSourceId; -use risingwave_pb::catalog::{connection, Connection}; +use risingwave_pb::catalog::{connection, Connection, PbSource, PbTable}; use risingwave_pb::ddl_service::ddl_service_server::DdlService; use risingwave_pb::ddl_service::drop_table_request::PbSourceId; use risingwave_pb::ddl_service::*; use risingwave_pb::stream_plan::stream_node::NodeBody; +use risingwave_pb::stream_plan::PbStreamFragmentGraph; use tonic::{Request, Response, Status}; use crate::barrier::BarrierManagerRef; @@ -413,30 +414,7 @@ impl DdlService for DdlServiceImpl { if let Some(source) = &mut source { // Generate source id. let source_id = self.gen_unique_id::<{ IdCategory::Table }>().await?; // TODO: Use source category - source.id = source_id; - - let mut source_count = 0; - for fragment in fragment_graph.fragments.values_mut() { - visit_fragment(fragment, |node_body| { - if let NodeBody::Source(source_node) = node_body { - // TODO: Refactor using source id. - source_node.source_inner.as_mut().unwrap().source_id = source_id; - source_count += 1; - } - }); - } - assert_eq!( - source_count, 1, - "require exactly 1 external stream source when creating table with a connector" - ); - - // Fill in the correct table id for source. - source.optional_associated_table_id = - Some(OptionalAssociatedTableId::AssociatedTableId(table_id)); - - // Fill in the correct source id for mview. - mview.optional_associated_source_id = - Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); + fill_table_source(source, source_id, &mut mview, table_id, &mut fragment_graph); } let mut stream_job = StreamingJob::Table(source, mview); @@ -530,10 +508,19 @@ impl DdlService for DdlServiceImpl { ) -> Result, Status> { let req = request.into_inner(); - let stream_job = StreamingJob::Table(None, req.table.unwrap()); - let fragment_graph = req.fragment_graph.unwrap(); + let mut source = req.source; + let mut fragment_graph = req.fragment_graph.unwrap(); + let mut table = req.table.unwrap(); + if let Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)) = + table.optional_associated_source_id + { + let source = source.as_mut().unwrap(); + let table_id = table.id; + fill_table_source(source, source_id, &mut table, table_id, &mut fragment_graph); + } let table_col_index_mapping = ColIndexMapping::from_protobuf(&req.table_col_index_mapping.unwrap()); + let stream_job = StreamingJob::Table(source, table); let version = self .ddl_controller @@ -760,3 +747,37 @@ impl DdlServiceImpl { Ok(()) } } + +fn fill_table_source( + source: &mut PbSource, + source_id: u32, + table: &mut PbTable, + table_id: u32, + fragment_graph: &mut PbStreamFragmentGraph, +) { + // If we're creating a table with connector, we should additionally fill its ID first. + source.id = source_id; + + let mut source_count = 0; + for fragment in fragment_graph.fragments.values_mut() { + visit_fragment(fragment, |node_body| { + if let NodeBody::Source(source_node) = node_body { + // TODO: Refactor using source id. + source_node.source_inner.as_mut().unwrap().source_id = source_id; + source_count += 1; + } + }); + } + assert_eq!( + source_count, 1, + "require exactly 1 external stream source when creating table with a connector" + ); + + // Fill in the correct table id for source. + source.optional_associated_table_id = + Some(OptionalAssociatedTableId::AssociatedTableId(table_id)); + + // Fill in the correct source id for mview. + table.optional_associated_source_id = + Some(OptionalAssociatedSourceId::AssociatedSourceId(source_id)); +} diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 558149787c85f..df642802361ad 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -483,6 +483,11 @@ impl GlobalStreamManager { let dummy_table_id = table_fragments.table_id(); + let init_split_assignment = self + .source_manager + .pre_allocate_splits(&dummy_table_id) + .await?; + if let Err(err) = self .barrier_scheduler .run_config_change_command_with_pause(Command::ReplaceTable { @@ -490,6 +495,7 @@ impl GlobalStreamManager { new_table_fragments: table_fragments, merge_updates, dispatchers, + init_split_assignment, }) .await { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 2b87ae995a564..a6a8b1d5baff4 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -429,11 +429,13 @@ impl MetaClient { pub async fn replace_table( &self, + source: Option, table: PbTable, graph: StreamFragmentGraph, table_col_index_mapping: ColIndexMapping, ) -> Result { let request = ReplaceTablePlanRequest { + source, table: Some(table), fragment_graph: Some(graph), table_col_index_mapping: Some(table_col_index_mapping.to_protobuf()),