From 4aed67f43eab44aa594f4a37fe2aa5461c4807ca Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Thu, 18 Apr 2024 11:41:35 +0800 Subject: [PATCH] fix(meta): only return for background sink once initial barrier collected (#16367) --- src/meta/src/rpc/ddl_controller.rs | 9 ++++++--- src/meta/src/rpc/ddl_controller_v2.rs | 9 ++++++--- src/tests/simulation/src/slt.rs | 11 +++++++++-- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index c4a6e8c6ed62c..78fb65089eb40 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -774,8 +774,11 @@ impl DdlController { } }; - match create_type { - CreateType::Foreground | CreateType::Unspecified => { + match (create_type, &stream_job) { + (CreateType::Foreground, _) + | (CreateType::Unspecified, _) + // FIXME(kwannoel): Unify background stream's creation path with MV below. + | (CreateType::Background, &StreamingJob::Sink(_, _)) => { self.create_streaming_job_inner( mgr, stream_job, @@ -785,7 +788,7 @@ impl DdlController { ) .await } - CreateType::Background => { + (CreateType::Background, _) => { let ctrl = self.clone(); let mgr = mgr.clone(); let stream_job_id = stream_job.id(); diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index c0ab2f13f2ff6..3e948e88e2821 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -204,8 +204,11 @@ impl DdlController { // create streaming jobs. let stream_job_id = streaming_job.id(); - match streaming_job.create_type() { - CreateType::Unspecified | CreateType::Foreground => { + match (streaming_job.create_type(), streaming_job) { + (CreateType::Unspecified, _) + | (CreateType::Foreground, _) + // FIXME(kwannoel): Unify background stream's creation path with MV below. + | (CreateType::Background, StreamingJob::Sink(_, _)) => { let replace_table_job_info = ctx.replace_table_job_info.as_ref().map( |(streaming_job, ctx, table_fragments)| { ( @@ -241,7 +244,7 @@ impl DdlController { Ok(version) } - CreateType::Background => { + (CreateType::Background, _) => { let ctrl = self.clone(); let mgr = mgr.clone(); let fut = async move { diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 9e341c7151901..ec1aca82d36cc 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -223,6 +223,10 @@ pub async fn run_slt_task( // NOTE(kwannoel): For background ddl let mut background_ddl_enabled = false; + // If background ddl is set to true within the test case, prevent random setting of background_ddl to true. + // We can revert it back to false only if we encounter a record that sets background_ddl to false. + let mut manual_background_ddl_enabled = false; + for record in sqllogictest::parse_file(path).expect("failed to parse file") { // uncomment to print metrics for task counts // let metrics = madsim::runtime::Handle::current().metrics(); @@ -254,8 +258,10 @@ pub async fn run_slt_task( }; tracing::debug!(?cmd, "Running"); - if matches!(cmd, SqlCmd::SetBackgroundDdl { .. }) && background_ddl_rate > 0.0 { - panic!("We cannot run background_ddl statement with background_ddl_rate > 0.0, since it could be reset"); + if background_ddl_rate > 0.0 + && let SqlCmd::SetBackgroundDdl { enable } = cmd + { + manual_background_ddl_enabled = enable; } // For each background ddl compatible statement, provide a chance for background_ddl=true. @@ -266,6 +272,7 @@ pub async fn run_slt_task( .. } = &record && matches!(cmd, SqlCmd::CreateMaterializedView { .. }) + && !manual_background_ddl_enabled { let background_ddl_setting = rng.gen_bool(background_ddl_rate); let set_background_ddl = Record::Statement {