diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index ff665d5cea617..403ab87568033 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -525,6 +525,7 @@ pub async fn start_service_as_election_leader( cluster_manager.clone(), source_manager.clone(), hummock_manager.clone(), + catalog_manager.clone(), ) .unwrap(), ); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 4a8876c9efbef..5f881e8d33138 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -111,6 +111,8 @@ pub enum Command { /// Barriers from the actors to be dropped will STILL be collected. /// After the barrier is collected, it notifies the local stream manager of compute nodes to /// drop actors, and then delete the table fragments info from meta store. + /// The TableIds here are the ids for the stream job. + /// It does not include internal table ids. DropStreamingJobs(HashSet), /// `CreateStreamingJob` command generates a `Add` barrier by given info. diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 26e67b5e3cb5c..21f24b920b0b1 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -93,7 +93,7 @@ impl ScheduledQueue { // TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery, // we need to refine it when catalog and streaming metadata can be handled in a transactional way. if let QueueStatus::Blocked(reason) = &self.status && - !matches!(scheduled.command, Command::DropStreamingJobs(_)) { + !matches!(scheduled.command, Command::DropStreamingJobs(_) | Command::CancelStreamingJob(_)) { return Err(MetaError::unavailable(reason)); } self.queue.push_back(scheduled); @@ -402,10 +402,18 @@ impl ScheduledBarriers { notifiers, command, .. }) = queue.queue.pop_front() { - let Command::DropStreamingJobs(table_ids) = command else { - unreachable!("only drop streaming jobs should be buffered"); - }; - to_drop_tables.extend(table_ids); + match command { + Command::DropStreamingJobs(table_ids) => { + to_drop_tables.extend(table_ids); + } + Command::CancelStreamingJob(table_fragments) => { + let table_id = table_fragments.table_id(); + to_drop_tables.insert(table_id); + } + _ => { + unreachable!("only drop streaming jobs should be buffered"); + } + } notifiers.into_iter().for_each(|mut notify| { notify.notify_collected(); notify.notify_finished(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d2d7e2f041545..785cdb1879d15 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -32,7 +32,9 @@ use uuid::Uuid; use super::{Locations, ScaleController, ScaleControllerRef}; use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan}; use crate::hummock::HummockManagerRef; -use crate::manager::{ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv, StreamingJob}; +use crate::manager::{ + CatalogManagerRef, ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv, StreamingJob, +}; use crate::model::{ActorId, TableFragments}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -194,6 +196,9 @@ pub struct GlobalStreamManager { /// Maintains streaming sources from external system like kafka pub source_manager: SourceManagerRef, + /// Catalog manager for cleaning up state from deleted stream jobs + pub catalog_manager: CatalogManagerRef, + /// Creating streaming job info. creating_job_info: CreatingStreamingJobInfoRef, @@ -212,6 +217,7 @@ impl GlobalStreamManager { cluster_manager: ClusterManagerRef, source_manager: SourceManagerRef, hummock_manager: HummockManagerRef, + catalog_manager: CatalogManagerRef, ) -> MetaResult { let scale_controller = Arc::new(ScaleController::new( fragment_manager.clone(), @@ -229,6 +235,7 @@ impl GlobalStreamManager { creating_job_info: Arc::new(CreatingStreamingJobInfo::default()), reschedule_lock: RwLock::new(()), scale_controller, + catalog_manager, }) } @@ -667,6 +674,8 @@ impl GlobalStreamManager { id )))?; } + self.catalog_manager.cancel_create_table_procedure(id.into(), fragment.internal_table_ids()).await?; + self.barrier_scheduler .run_command(Command::CancelStreamingJob(fragment)) .await?; @@ -943,6 +952,7 @@ mod tests { cluster_manager.clone(), source_manager.clone(), hummock_manager, + catalog_manager.clone(), )?; let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager); diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 4e1ef135f839c..7d86335ed5950 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -15,7 +15,7 @@ use std::time::Duration; use anyhow::Result; -use itertools::Itertools; +use risingwave_common::error::anyhow_error; use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session}; use tokio::time::sleep; @@ -76,8 +76,11 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result> { tracing::info!("cancelled streaming jobs, {}", result); let ids = result .split('\n') - .map(|s| s.parse::().unwrap()) - .collect_vec(); + .map(|s| { + s.parse::() + .map_err(|_e| anyhow_error!("failed to parse {}", s)) + }) + .collect::>>()?; Ok(ids) } @@ -275,6 +278,97 @@ async fn test_ddl_cancel() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_high_barrier_latency_cancel() -> Result<()> { + init_logger(); + let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut session = cluster.start_session(); + + // 100,000 fact records + session.run("CREATE TABLE fact (v1 int)").await?; + session + .run("INSERT INTO fact select 1 from generate_series(1, 100000)") + .await?; + + // Amplification factor of 1000 per record. + session.run("CREATE TABLE dimension (v1 int)").await?; + session + .run("INSERT INTO dimension select 1 from generate_series(1, 1000)") + .await?; + session.flush().await?; + + // With 10 rate limit, and amplification factor of 1000, + // We should expect 10,000 rows / s. + // That should be enough to cause barrier latency to spike. + session.run("SET STREAMING_RATE_LIMIT=10").await?; + + tracing::info!("seeded base tables"); + + // Create high barrier latency scenario + // Keep creating mv1, if it's not created. + loop { + session.run(SET_BACKGROUND_DDL).await?; + session.run("CREATE MATERIALIZED VIEW mv1 as select fact.v1 from fact join dimension on fact.v1 = dimension.v1").await?; + tracing::info!("created mv in background"); + sleep(Duration::from_secs(1)).await; + + kill_cn_and_wait_recover(&cluster).await; + + // Check if mv stream job is created in the background + match session + .run("select * from rw_catalog.rw_ddl_progress;") + .await + { + Ok(s) if s.is_empty() => { + // MV was dropped + continue; + } + Err(e) => { + if e.to_string().contains("in creating procedure") { + // MV already created and recovered. + break; + } else { + return Err(e); + } + } + Ok(s) => { + tracing::info!("created mv stream job with status: {}", s); + break; + } + } + } + + tracing::info!("restarted cn: trigger stream job recovery"); + + // Attempt to cancel + let mut session2 = cluster.start_session(); + let handle = tokio::spawn(async move { + let result = cancel_stream_jobs(&mut session2).await; + assert!(result.is_err()) + }); + + sleep(Duration::from_secs(2)).await; + kill_cn_and_wait_recover(&cluster).await; + tracing::info!("restarted cn: cancel should take effect"); + + handle.await.unwrap(); + + // Create MV with same relation name should succeed, + // since the previous job should be cancelled. + tracing::info!("recreating mv"); + session.run("SET BACKGROUND_DDL=false").await?; + session + .run("CREATE MATERIALIZED VIEW mv1 as values(1)") + .await?; + tracing::info!("recreated mv"); + + session.run(DROP_MV1).await?; + session.run("DROP TABLE fact").await?; + session.run("DROP TABLE dimension").await?; + + Ok(()) +} + // When cluster stop, foreground ddl job must be cancelled. #[tokio::test] async fn test_foreground_ddl_no_recovery() -> Result<()> {