diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index 84c88c66a9c6f..f30d8253cb95d 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -23,9 +23,9 @@ use tokio::sync::{oneshot, watch}; use tokio::time; use tokio_stream::StreamExt; -use crate::rpc::election::META_ELECTION_KEY; +use crate::rpc::election::{ElectionClient, ElectionMember, META_ELECTION_KEY}; use crate::storage::WrappedEtcdClient; -use crate::{ElectionClient, ElectionMember, MetaResult}; +use crate::MetaResult; pub struct EtcdElectionClient { id: String, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index e208222de4908..fa3159f6657ec 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -23,7 +23,6 @@ use std::time::Duration; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_connector::dispatch_source_prop; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, @@ -53,7 +52,6 @@ pub struct SourceManager { barrier_scheduler: BarrierScheduler, core: Mutex, metrics: Arc, - runtime: Arc, } const MAX_FAIL_CNT: u32 = 10; @@ -501,20 +499,11 @@ impl SourceManager { fragment_manager: FragmentManagerRef, metrics: Arc, ) -> MetaResult { - let runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("risingwave-source-manager") - .enable_all() - .build() - .map_err(|e| anyhow!(e))?; - - let runtime: Arc = Arc::new(runtime.into()); - let mut managed_sources = HashMap::new(); { let sources = catalog_manager.list_sources().await; for source in sources { Self::create_source_worker_async( - runtime.clone(), env.connector_client(), source, &mut managed_sources, @@ -548,7 +537,6 @@ impl SourceManager { core, paused: Mutex::new(()), metrics, - runtime, }) } @@ -691,7 +679,6 @@ impl SourceManager { tracing::warn!("source {} already registered", source.get_id()); } else { Self::create_source_worker( - self.runtime.clone(), self.env.connector_client(), source, &mut core.managed_sources, @@ -704,7 +691,6 @@ impl SourceManager { } fn create_source_worker_async( - runtime: Arc, connector_client: Option, source: Source, managed_sources: &mut HashMap, @@ -720,7 +706,7 @@ impl SourceManager { let connector_properties = extract_prop_from_source(&source)?; let enable_scale_in = connector_properties.enable_split_scale_in(); - let handle = runtime.spawn(async move { + let handle = tokio::spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -764,7 +750,6 @@ impl SourceManager { } async fn create_source_worker( - runtime: Arc, connector_client: Option, source: &Source, managed_sources: &mut HashMap, @@ -806,7 +791,7 @@ impl SourceManager { })??; } - runtime.spawn(async move { worker.run(sync_call_rx).await }) + tokio::spawn(async move { worker.run(sync_call_rx).await }) }); managed_sources.insert(