From be0e5d3c80113b1049df8c13ef71399b11bdd73b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Sat, 28 Oct 2023 00:30:17 +0800 Subject: [PATCH] fmt --- .../optimizer/plan_node/stream_table_scan.rs | 3 +- src/meta/src/manager/catalog/mod.rs | 10 ++ src/meta/src/stream/stream_manager.rs | 138 +++++++++--------- .../executor/backfill/no_shuffle_backfill.rs | 2 +- .../recovery/background_ddl.rs | 16 +- 5 files changed, 85 insertions(+), 84 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 124ee7fa2dc1b..96d0335920657 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -289,8 +289,7 @@ impl StreamTableScan { let ctx = self.base.ctx(); let config = ctx.session_ctx().config(); let rate_limit = config.get_streaming_rate_limit(); - let snapshot_read_delay = - config.get_backfill_snapshot_read_delay(); + let snapshot_read_delay = config.get_backfill_snapshot_read_delay(); PbStreamNode { fields: self.schema().to_prost(), diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index d2007dcab45d6..27a5ea3f0b746 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -892,6 +892,7 @@ impl CatalogManager { } commit_meta!(self, tables)?; + tracing::debug!(id = ?table.id, "notifying frontend"); let version = self .notify_frontend( Operation::Add, @@ -2517,6 +2518,15 @@ impl CatalogManager { .await } + pub async fn table_is_created(&self, table_id: TableId) -> bool { + let guard = self.core.lock().await; + return if let Some(table) = guard.database.tables.get(&table_id) { + table.get_stream_job_status() != Ok(StreamJobStatus::Creating) + } else { + false + }; + } + pub async fn get_tables(&self, table_ids: &[TableId]) -> Vec { let mut tables = vec![]; let guard = self.core.lock().await; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index ecf04107c0e88..6dede77f175e8 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -263,86 +263,80 @@ impl GlobalStreamManager { .in_current_span(); tokio::spawn(fut); - let res = try { - while let Some(state) = receiver.recv().await { - match state { - CreatingState::Failed { reason } => { - tracing::debug!(id=?table_id, "stream job failed"); - self.creating_job_info.delete_job(table_id).await; - return Err(reason); - } - CreatingState::Canceling { finish_tx } => { - tracing::debug!(id=?table_id, "cancelling streaming job"); - if let Ok(table_fragments) = self - .fragment_manager - .select_table_fragments_by_table_id(&table_id) + while let Some(state) = receiver.recv().await { + match state { + CreatingState::Failed { reason } => { + tracing::debug!(id=?table_id, "stream job failed"); + self.creating_job_info.delete_job(table_id).await; + return Err(reason); + } + CreatingState::Canceling { finish_tx } => { + tracing::debug!(id=?table_id, "cancelling streaming job"); + if let Ok(table_fragments) = self + .fragment_manager + .select_table_fragments_by_table_id(&table_id) + .await + { + // try to cancel buffered creating command. + if self + .barrier_scheduler + .try_cancel_scheduled_create(table_id) .await { - // try to cancel buffered creating command. - if self - .barrier_scheduler - .try_cancel_scheduled_create(table_id) - .await - { - tracing::debug!( - "cancelling streaming job {table_id} in buffer queue." - ); - let node_actors = table_fragments.worker_actor_ids(); - let cluster_info = - self.cluster_manager.get_streaming_cluster_info().await; - let node_actors = node_actors - .into_iter() - .map(|(id, actor_ids)| { - ( - cluster_info.worker_nodes.get(&id).cloned().unwrap(), - actor_ids, - ) - }) - .collect_vec(); - let futures = node_actors.into_iter().map(|(node, actor_ids)| { - let request_id = Uuid::new_v4().to_string(); - async move { - let client = - self.env.stream_client_pool().get(&node).await?; - let request = DropActorsRequest { - request_id, - actor_ids, - }; - client.drop_actors(request).await - } - }); - try_join_all(futures).await?; - - self.fragment_manager - .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once( - table_id, - ))) - .await?; - } - if !table_fragments.is_created() { - tracing::debug!( - "cancelling streaming job {table_id} by issue cancel command." - ); - self.barrier_scheduler - .run_command(Command::CancelStreamingJob(table_fragments)) - .await?; - } - let _ = finish_tx.send(()).inspect_err(|_| { - tracing::warn!("failed to notify cancelled: {table_id}") + tracing::debug!("cancelling streaming job {table_id} in buffer queue."); + let node_actors = table_fragments.worker_actor_ids(); + let cluster_info = + self.cluster_manager.get_streaming_cluster_info().await; + let node_actors = node_actors + .into_iter() + .map(|(id, actor_ids)| { + ( + cluster_info.worker_nodes.get(&id).cloned().unwrap(), + actor_ids, + ) + }) + .collect_vec(); + let futures = node_actors.into_iter().map(|(node, actor_ids)| { + let request_id = Uuid::new_v4().to_string(); + async move { + let client = self.env.stream_client_pool().get(&node).await?; + let request = DropActorsRequest { + request_id, + actor_ids, + }; + client.drop_actors(request).await + } }); - self.creating_job_info.delete_job(table_id).await; - return Err(MetaError::cancelled("create".into())); + try_join_all(futures).await?; + + self.fragment_manager + .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once( + table_id, + ))) + .await?; } - } - CreatingState::Created => { + if !table_fragments.is_created() { + tracing::debug!( + "cancelling streaming job {table_id} by issue cancel command." + ); + self.barrier_scheduler + .run_command(Command::CancelStreamingJob(table_fragments)) + .await?; + } + let _ = finish_tx.send(()).inspect_err(|_| { + tracing::warn!("failed to notify cancelled: {table_id}") + }); self.creating_job_info.delete_job(table_id).await; - return Ok(()); + return Err(MetaError::cancelled("create".into())); } } + CreatingState::Created => { + self.creating_job_info.delete_job(table_id).await; + return Ok(()); + } } - }; - - res + } + Ok(()) } async fn build_actors( diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index b5c21cb1a3402..4fc9ed9a95647 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -305,7 +305,7 @@ where tokio::time::sleep(Duration::from_millis( self.snapshot_read_delay as u64, )) - .await; + .await; // Raise the current position. // As snapshot read streams are ordered by pk, so we can // just use the last row to update `current_pos`. diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index 7e17b8596b80b..c935007fc5dab 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -24,8 +24,7 @@ const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; const SEED_TABLE: &str = "INSERT INTO t SELECT generate_series FROM generate_series(1, 100000);"; const SET_BACKGROUND_DDL: &str = "SET BACKGROUND_DDL=true;"; const SET_STREAMING_RATE_LIMIT: &str = "SET STREAMING_RATE_LIMIT=4000;"; -const SET_BACKFILL_SNAPSHOT_READ_DELAY: &str = - "SET BACKFILL_SNAPSHOT_READ_DELAY=100;"; +const SET_BACKFILL_SNAPSHOT_READ_DELAY: &str = "SET BACKFILL_SNAPSHOT_READ_DELAY=100;"; const CREATE_MV1: &str = "CREATE MATERIALIZED VIEW mv1 as SELECT * FROM t;"; async fn kill_cn_and_wait_recover(cluster: &Cluster) { @@ -63,11 +62,12 @@ async fn cancel_stream_jobs(session: &mut Session) -> Result> { tracing::info!("selected streaming jobs to cancel {:?}", ids); tracing::info!("cancelling streaming jobs"); let ids = ids.split('\n').collect::>().join(","); - let result = session - .run(&format!("cancel jobs {};", ids)) - .await?; + let result = session.run(&format!("cancel jobs {};", ids)).await?; tracing::info!("cancelled streaming jobs, {:#?}", result); - let ids = result.split('\n').map(|s| s.parse::().unwrap()).collect_vec(); + let ids = result + .split('\n') + .map(|s| s.parse::().unwrap()) + .collect_vec(); Ok(ids) } @@ -150,9 +150,7 @@ async fn test_background_ddl_cancel() -> Result<()> { session.run(SEED_TABLE).await?; session.run(SET_BACKGROUND_DDL).await?; session.run(SET_STREAMING_RATE_LIMIT).await?; - session - .run(SET_BACKFILL_SNAPSHOT_READ_DELAY) - .await?; + session.run(SET_BACKFILL_SNAPSHOT_READ_DELAY).await?; for _ in 0..5 { session.run(CREATE_MV1).await?;