Skip to content

Commit

Permalink
fix(meta): drop TableCatalog in cancel_stream_jobs (#13938)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Dec 13, 2023
1 parent 8db6191 commit 1342d6f
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ pub async fn start_service_as_election_leader(
cluster_manager.clone(),
source_manager.clone(),
hummock_manager.clone(),
catalog_manager.clone(),
)
.unwrap(),
);
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub enum Command {
/// Barriers from the actors to be dropped will STILL be collected.
/// After the barrier is collected, it notifies the local stream manager of compute nodes to
/// drop actors, and then delete the table fragments info from meta store.
/// The TableIds here are the ids for the stream job.
/// It does not include internal table ids.
DropStreamingJobs(HashSet<TableId>),

/// `CreateStreamingJob` command generates a `Add` barrier by given info.
Expand Down
18 changes: 13 additions & 5 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ScheduledQueue {
// TODO: this is just a workaround to allow dropping streaming jobs when the cluster is under recovery,
// we need to refine it when catalog and streaming metadata can be handled in a transactional way.
if let QueueStatus::Blocked(reason) = &self.status &&
!matches!(scheduled.command, Command::DropStreamingJobs(_)) {
!matches!(scheduled.command, Command::DropStreamingJobs(_) | Command::CancelStreamingJob(_)) {
return Err(MetaError::unavailable(reason));
}
self.queue.push_back(scheduled);
Expand Down Expand Up @@ -402,10 +402,18 @@ impl ScheduledBarriers {
notifiers, command, ..
}) = queue.queue.pop_front()
{
let Command::DropStreamingJobs(table_ids) = command else {
unreachable!("only drop streaming jobs should be buffered");
};
to_drop_tables.extend(table_ids);
match command {
Command::DropStreamingJobs(table_ids) => {
to_drop_tables.extend(table_ids);
}
Command::CancelStreamingJob(table_fragments) => {
let table_id = table_fragments.table_id();
to_drop_tables.insert(table_id);
}
_ => {
unreachable!("only drop streaming jobs should be buffered");
}
}
notifiers.into_iter().for_each(|mut notify| {
notify.notify_collected();
notify.notify_finished();
Expand Down
12 changes: 11 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use uuid::Uuid;
use super::{Locations, ScaleController, ScaleControllerRef};
use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan};
use crate::hummock::HummockManagerRef;
use crate::manager::{ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv, StreamingJob};
use crate::manager::{
CatalogManagerRef, ClusterManagerRef, DdlType, FragmentManagerRef, MetaSrvEnv, StreamingJob,
};
use crate::model::{ActorId, TableFragments};
use crate::stream::SourceManagerRef;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -194,6 +196,9 @@ pub struct GlobalStreamManager {
/// Maintains streaming sources from external system like kafka
pub source_manager: SourceManagerRef,

/// Catalog manager for cleaning up state from deleted stream jobs
pub catalog_manager: CatalogManagerRef,

/// Creating streaming job info.
creating_job_info: CreatingStreamingJobInfoRef,

Expand All @@ -212,6 +217,7 @@ impl GlobalStreamManager {
cluster_manager: ClusterManagerRef,
source_manager: SourceManagerRef,
hummock_manager: HummockManagerRef,
catalog_manager: CatalogManagerRef,
) -> MetaResult<Self> {
let scale_controller = Arc::new(ScaleController::new(
fragment_manager.clone(),
Expand All @@ -229,6 +235,7 @@ impl GlobalStreamManager {
creating_job_info: Arc::new(CreatingStreamingJobInfo::default()),
reschedule_lock: RwLock::new(()),
scale_controller,
catalog_manager,
})
}

Expand Down Expand Up @@ -667,6 +674,8 @@ impl GlobalStreamManager {
id
)))?;
}
self.catalog_manager.cancel_create_table_procedure(id.into(), fragment.internal_table_ids()).await?;

self.barrier_scheduler
.run_command(Command::CancelStreamingJob(fragment))
.await?;
Expand Down Expand Up @@ -943,6 +952,7 @@ mod tests {
cluster_manager.clone(),
source_manager.clone(),
hummock_manager,
catalog_manager.clone(),
)?;

let (join_handle_2, shutdown_tx_2) = GlobalBarrierManager::start(barrier_manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::time::Duration;

use anyhow::Result;
use itertools::Itertools;
use risingwave_common::error::anyhow_error;
use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session};
use tokio::time::sleep;

Expand Down Expand Up @@ -76,8 +76,11 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result<Vec<u32>> {
tracing::info!("cancelled streaming jobs, {}", result);
let ids = result
.split('\n')
.map(|s| s.parse::<u32>().unwrap())
.collect_vec();
.map(|s| {
s.parse::<u32>()
.map_err(|_e| anyhow_error!("failed to parse {}", s))
})
.collect::<Result<Vec<_>>>()?;
Ok(ids)
}

Expand Down Expand Up @@ -275,6 +278,97 @@ async fn test_ddl_cancel() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_high_barrier_latency_cancel() -> Result<()> {
init_logger();
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();

// 100,000 fact records
session.run("CREATE TABLE fact (v1 int)").await?;
session
.run("INSERT INTO fact select 1 from generate_series(1, 100000)")
.await?;

// Amplification factor of 1000 per record.
session.run("CREATE TABLE dimension (v1 int)").await?;
session
.run("INSERT INTO dimension select 1 from generate_series(1, 1000)")
.await?;
session.flush().await?;

// With 10 rate limit, and amplification factor of 1000,
// We should expect 10,000 rows / s.
// That should be enough to cause barrier latency to spike.
session.run("SET STREAMING_RATE_LIMIT=10").await?;

tracing::info!("seeded base tables");

// Create high barrier latency scenario
// Keep creating mv1, if it's not created.
loop {
session.run(SET_BACKGROUND_DDL).await?;
session.run("CREATE MATERIALIZED VIEW mv1 as select fact.v1 from fact join dimension on fact.v1 = dimension.v1").await?;
tracing::info!("created mv in background");
sleep(Duration::from_secs(1)).await;

kill_cn_and_wait_recover(&cluster).await;

// Check if mv stream job is created in the background
match session
.run("select * from rw_catalog.rw_ddl_progress;")
.await
{
Ok(s) if s.is_empty() => {
// MV was dropped
continue;
}
Err(e) => {
if e.to_string().contains("in creating procedure") {
// MV already created and recovered.
break;
} else {
return Err(e);
}
}
Ok(s) => {
tracing::info!("created mv stream job with status: {}", s);
break;
}
}
}

tracing::info!("restarted cn: trigger stream job recovery");

// Attempt to cancel
let mut session2 = cluster.start_session();
let handle = tokio::spawn(async move {
let result = cancel_stream_jobs(&mut session2).await;
assert!(result.is_err())
});

sleep(Duration::from_secs(2)).await;
kill_cn_and_wait_recover(&cluster).await;
tracing::info!("restarted cn: cancel should take effect");

handle.await.unwrap();

// Create MV with same relation name should succeed,
// since the previous job should be cancelled.
tracing::info!("recreating mv");
session.run("SET BACKGROUND_DDL=false").await?;
session
.run("CREATE MATERIALIZED VIEW mv1 as values(1)")
.await?;
tracing::info!("recreated mv");

session.run(DROP_MV1).await?;
session.run("DROP TABLE fact").await?;
session.run("DROP TABLE dimension").await?;

Ok(())
}

// When cluster stop, foreground ddl job must be cancelled.
#[tokio::test]
async fn test_foreground_ddl_no_recovery() -> Result<()> {
Expand Down

0 comments on commit 1342d6f

Please sign in to comment.