diff --git a/e2e_test/backfill/sink/create_sink.slt b/e2e_test/backfill/sink/create_sink.slt index f98cd869bc9ae..bc9fba04da5c8 100644 --- a/e2e_test/backfill/sink/create_sink.slt +++ b/e2e_test/backfill/sink/create_sink.slt @@ -11,6 +11,9 @@ SET STREAMING_RATE_LIMIT = 1000; statement ok insert into t select * from generate_series(1, 10000); +statement ok +SET BACKGROUND_DDL=true; + statement ok create sink s as select x.v1 as v1 from t x join t y @@ -23,3 +26,6 @@ with ( allow.auto.create.topics=true, ) FORMAT DEBEZIUM ENCODE JSON; + +statement ok +SET BACKGROUND_DDL=false; diff --git a/e2e_test/streaming/rate_limit/snapshot_amplification.slt b/e2e_test/streaming/rate_limit/snapshot_amplification.slt index 231bc9b0eb94f..f704bc637c02c 100644 --- a/e2e_test/streaming/rate_limit/snapshot_amplification.slt +++ b/e2e_test/streaming/rate_limit/snapshot_amplification.slt @@ -17,6 +17,9 @@ INSERT INTO table select 1 from generate_series(1, 100000); statement ok flush; +statement ok +SET BACKGROUND_DDL=true; + statement ok CREATE SINK sink AS SELECT x.i1 as i1 FROM table x @@ -24,6 +27,9 @@ CREATE SINK sink AS JOIN table s2 ON x.i1 = s2.i1 WITH (connector = 'blackhole'); +statement ok +SET BACKGROUND_DDL=false; + # Let sink amplify... skipif in-memory sleep 1s diff --git a/e2e_test/streaming/rate_limit/upstream_amplification.slt b/e2e_test/streaming/rate_limit/upstream_amplification.slt index 71be801a78fc2..63528472050a8 100644 --- a/e2e_test/streaming/rate_limit/upstream_amplification.slt +++ b/e2e_test/streaming/rate_limit/upstream_amplification.slt @@ -18,6 +18,9 @@ WITH ( datagen.rows.per.second = '10000' ) FORMAT PLAIN ENCODE JSON; +statement ok +SET BACKGROUND_DDL=true; + statement ok CREATE SINK sink AS SELECT x.i1 as i1 FROM source_table x @@ -26,6 +29,9 @@ CREATE SINK sink AS JOIN source_table s3 ON x.i1 = s3.i1 WITH (connector = 'blackhole'); +statement ok +SET BACKGROUND_DDL=false; + # The following sequence of FLUSH should be fast, since barrier should be able to bypass sink. # Otherwise, these FLUSH will take a long time to complete, and trigger timeout. statement ok diff --git a/src/connector/src/sink/catalog/desc.rs b/src/connector/src/sink/catalog/desc.rs index ed17ebd1504ed..0fb466e5a1742 100644 --- a/src/connector/src/sink/catalog/desc.rs +++ b/src/connector/src/sink/catalog/desc.rs @@ -107,7 +107,7 @@ impl SinkDesc { target_table: self.target_table, created_at_cluster_version: None, initialized_at_cluster_version: None, - create_type: CreateType::Foreground, + create_type: self.create_type, } } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 36245fcf7518d..73157f91c2d0c 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; +use risingwave_pb::catalog::CreateType; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; @@ -164,6 +165,7 @@ pub enum Command { init_split_assignment: SplitAssignment, definition: String, ddl_type: DdlType, + create_type: CreateType, replace_table: Option, }, /// `CancelStreamingJob` command generates a `Stop` barrier including the actors of the given diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index a33380ea1cdd5..746a263b06317 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; +use risingwave_pb::catalog::CreateType; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; @@ -430,48 +431,56 @@ impl CreateMviewProgressTracker { return Some(TrackingJob::New(command)); } - let (creating_mv_id, upstream_mv_count, upstream_total_key_count, definition, ddl_type) = - if let Command::CreateStreamingJob { - table_fragments, - dispatchers, - upstream_root_actors, - definition, - ddl_type, - .. - } = &command.context.command - { - // Keep track of how many times each upstream MV appears. - let mut upstream_mv_count = HashMap::new(); - for (table_id, actors) in upstream_root_actors { - assert!(!actors.is_empty()); - let dispatch_count: usize = dispatchers - .iter() - .filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id)) - .map(|(_, v)| v.len()) - .sum(); - upstream_mv_count.insert(*table_id, dispatch_count / actors.len()); - } - - let upstream_total_key_count: u64 = upstream_mv_count + let ( + creating_mv_id, + upstream_mv_count, + upstream_total_key_count, + definition, + ddl_type, + create_type, + ) = if let Command::CreateStreamingJob { + table_fragments, + dispatchers, + upstream_root_actors, + definition, + ddl_type, + create_type, + .. + } = &command.context.command + { + // Keep track of how many times each upstream MV appears. + let mut upstream_mv_count = HashMap::new(); + for (table_id, actors) in upstream_root_actors { + assert!(!actors.is_empty()); + let dispatch_count: usize = dispatchers .iter() - .map(|(upstream_mv, count)| { - *count as u64 - * version_stats - .table_stats - .get(&upstream_mv.table_id) - .map_or(0, |stat| stat.total_key_count as u64) - }) + .filter(|(upstream_actor_id, _)| actors.contains(upstream_actor_id)) + .map(|(_, v)| v.len()) .sum(); - ( - table_fragments.table_id(), - upstream_mv_count, - upstream_total_key_count, - definition.to_string(), - ddl_type, - ) - } else { - unreachable!("Must be CreateStreamingJob."); - }; + upstream_mv_count.insert(*table_id, dispatch_count / actors.len()); + } + + let upstream_total_key_count: u64 = upstream_mv_count + .iter() + .map(|(upstream_mv, count)| { + *count as u64 + * version_stats + .table_stats + .get(&upstream_mv.table_id) + .map_or(0, |stat| stat.total_key_count as u64) + }) + .sum(); + ( + table_fragments.table_id(), + upstream_mv_count, + upstream_total_key_count, + definition.to_string(), + ddl_type, + create_type, + ) + } else { + unreachable!("Must be CreateStreamingJob."); + }; for &actor in &actors { self.actor_map.insert(actor, creating_mv_id); @@ -483,7 +492,7 @@ impl CreateMviewProgressTracker { upstream_total_key_count, definition, ); - if *ddl_type == DdlType::Sink { + if *ddl_type == DdlType::Sink && *create_type == CreateType::Background { // We return the original tracking job immediately. // This is because sink can be decoupled with backfill progress. // We don't need to wait for sink to finish backfill. diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index e042a222823f0..a29e6e923de2b 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -290,6 +290,7 @@ impl StreamingJob { Self::MaterializedView(table) => { table.get_create_type().unwrap_or(CreateType::Foreground) } + Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground), _ => CreateType::Foreground, } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index b2207e1b02faf..94b97abcdc007 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -461,6 +461,7 @@ impl GlobalStreamManager { definition: definition.to_string(), ddl_type, replace_table: replace_table_command, + create_type, }; tracing::debug!("sending Command::CreateStreamingJob"); if let Err(err) = self.barrier_scheduler.run_command(command).await { diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 38ce77c6ed79e..93dc9ebb88a70 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -440,7 +440,7 @@ async fn test_foreground_index_cancel() -> Result<()> { } #[tokio::test] -async fn test_sink_create() -> Result<()> { +async fn test_background_sink_create() -> Result<()> { init_logger(); let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; let mut session = cluster.start_session(); @@ -450,6 +450,7 @@ async fn test_sink_create() -> Result<()> { let mut session2 = cluster.start_session(); tokio::spawn(async move { + session2.run(SET_BACKGROUND_DDL).await.unwrap(); session2.run(SET_RATE_LIMIT_2).await.unwrap(); session2 .run("CREATE SINK s FROM t WITH (connector='blackhole');")