Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed Sep 18, 2023
1 parent f304ed2 commit 422f02d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"time",
"signal",
] }
tokio-util = "0.7"
tokio-retry = "0.3"
tokio-stream = { version = "0.1", features = ["net"] }
tonic = { workspace = true }
Expand Down
24 changes: 20 additions & 4 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tokio::{select, time};
use tokio_util::sync::CancellationToken;

use crate::barrier::{BarrierScheduler, Command};
use crate::manager::{CatalogManagerRef, FragmentManagerRef, MetaSrvEnv, SourceId};
Expand Down Expand Up @@ -161,12 +162,17 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
pub async fn run(
&mut self,
mut sync_call_rx: UnboundedReceiver<oneshot::Sender<MetaResult<()>>>,
cancel_token: CancellationToken,
) {
let mut interval = time::interval(self.period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
'runner_loop: loop {
select! {
biased;
_ = cancel_token.cancelled() => {
tracing::debug!("source worker {} cancelled", self.source_name);
break 'runner_loop;
}
tx = sync_call_rx.borrow_mut().recv() => {
if let Some(tx) = tx {
let _ = tx.send(self.tick().await);
Expand All @@ -183,6 +189,7 @@ impl<P: SourceProperties> ConnectorSourceWorker<P> {
}
}
}
tracing::debug!("source worker {} watcher exit", self.source_name);
}
}

Expand Down Expand Up @@ -216,6 +223,7 @@ struct ConnectorSourceWorkerHandle {
handle: JoinHandle<()>,
sync_call_tx: UnboundedSender<oneshot::Sender<MetaResult<()>>>,
splits: SharedSplitMapRef,
cancel_token: CancellationToken,
}

impl ConnectorSourceWorkerHandle {
Expand Down Expand Up @@ -726,6 +734,9 @@ impl SourceManager {

let connector_properties = extract_prop_from_source(&source)?;

let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();

let handle = tokio::spawn(async move {
let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand Down Expand Up @@ -753,7 +764,7 @@ impl SourceManager {
}
};

worker.run(sync_call_rx).await
worker.run(sync_call_rx, cancel_token_clone).await
});
});

Expand All @@ -763,6 +774,7 @@ impl SourceManager {
handle,
sync_call_tx,
splits: current_splits_ref,
cancel_token,
},
);
Ok(())
Expand All @@ -778,6 +790,8 @@ impl SourceManager {
let current_splits_ref = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
let connector_properties = extract_prop_from_source(source)?;
let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel();
let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
let handle = dispatch_source_prop!(connector_properties, prop, {
let mut worker = ConnectorSourceWorker::create(
&connector_client,
Expand Down Expand Up @@ -809,7 +823,7 @@ impl SourceManager {
})??;
}

tokio::spawn(async move { worker.run(sync_call_rx).await })
tokio::spawn(async move { worker.run(sync_call_rx, cancel_token_clone).await })
});

managed_sources.insert(
Expand All @@ -818,6 +832,7 @@ impl SourceManager {
handle,
sync_call_tx,
splits: current_splits_ref,
cancel_token,
},
);

Expand All @@ -829,7 +844,8 @@ impl SourceManager {
let mut core = self.core.lock().await;
for source_id in source_ids {
if let Some(handle) = core.managed_sources.remove(&source_id) {
handle.handle.abort();
handle.cancel_token.cancel();
_ = handle.handle.await;
}
}
}
Expand Down

0 comments on commit 422f02d

Please sign in to comment.