diff --git a/Cargo.lock b/Cargo.lock index 6cb0662b32155..cefe87e123256 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7251,6 +7251,7 @@ dependencies = [ "thiserror", "tokio-retry", "tokio-stream", + "tokio-util", "tower", "tower-http", "tracing", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index d8401cc5a7f71..9299484c14d1c 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -66,6 +66,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "time", "signal", ] } +tokio-util = "0.7" tokio-retry = "0.3" tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index cae431aa5b188..0ddd517b3dbf3 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -37,6 +37,7 @@ use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; use tokio::{select, time}; +use tokio_util::sync::CancellationToken; use crate::barrier::{BarrierScheduler, Command}; use crate::manager::{CatalogManagerRef, FragmentManagerRef, MetaSrvEnv, SourceId}; @@ -161,12 +162,17 @@ impl ConnectorSourceWorker

{ pub async fn run( &mut self, mut sync_call_rx: UnboundedReceiver>>, + cancel_token: CancellationToken, ) { let mut interval = time::interval(self.period); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { + 'runner_loop: loop { select! { biased; + _ = cancel_token.cancelled() => { + tracing::debug!("source worker {} cancelled", self.source_name); + break 'runner_loop; + } tx = sync_call_rx.borrow_mut().recv() => { if let Some(tx) = tx { let _ = tx.send(self.tick().await); @@ -183,6 +189,7 @@ impl ConnectorSourceWorker

{ } } } + tracing::debug!("source worker {} watcher exit", self.source_name); } } @@ -216,6 +223,7 @@ struct ConnectorSourceWorkerHandle { handle: JoinHandle<()>, sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, + cancel_token: CancellationToken, } impl ConnectorSourceWorkerHandle { @@ -726,6 +734,9 @@ impl SourceManager { let connector_properties = extract_prop_from_source(&source)?; + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let handle = tokio::spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -753,7 +764,7 @@ impl SourceManager { } }; - worker.run(sync_call_rx).await + worker.run(sync_call_rx, cancel_token_clone).await }); }); @@ -763,6 +774,7 @@ impl SourceManager { handle, sync_call_tx, splits: current_splits_ref, + cancel_token, }, ); Ok(()) @@ -778,6 +790,8 @@ impl SourceManager { let current_splits_ref = Arc::new(Mutex::new(SharedSplitMap { splits: None })); let connector_properties = extract_prop_from_source(source)?; let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); let handle = dispatch_source_prop!(connector_properties, prop, { let mut worker = ConnectorSourceWorker::create( &connector_client, @@ -809,7 +823,7 @@ impl SourceManager { })??; } - tokio::spawn(async move { worker.run(sync_call_rx).await }) + tokio::spawn(async move { worker.run(sync_call_rx, cancel_token_clone).await }) }); managed_sources.insert( @@ -818,6 +832,7 @@ impl SourceManager { handle, sync_call_tx, splits: current_splits_ref, + cancel_token, }, ); @@ -829,7 +844,8 @@ impl SourceManager { let mut core = self.core.lock().await; for source_id in source_ids { if let Some(handle) = core.managed_sources.remove(&source_id) { - handle.handle.abort(); + handle.cancel_token.cancel(); + _ = handle.handle.await; } } }