From 8908e368ecceda8b2aeb04bebf7191419a878c09 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Wed, 27 Sep 2023 14:52:34 +0800 Subject: [PATCH] Add `background_shutdown_runtime` dependency to `EtcdElectionClient` and `SourceManager` (#123) --- src/meta/src/rpc/election/etcd.rs | 43 ++++++++------------------- src/meta/src/rpc/election/mod.rs | 7 +++++ src/meta/src/stream/source_manager.rs | 20 +++++++++++-- 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index bdb6616d3298..ecf772710045 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -20,6 +20,7 @@ use std::time::Duration; use anyhow::anyhow; use etcd_client::{ConnectOptions, Error, GetOptions, LeaderKey, ResignOptions}; use risingwave_common::bail; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use serde::Serialize; use tokio::runtime::Runtime; use tokio::sync::watch::Receiver; @@ -27,36 +28,15 @@ use tokio::sync::{oneshot, watch}; use tokio::time; use tokio_stream::StreamExt; +use crate::rpc::election::META_ELECTION_KEY; use crate::storage::WrappedEtcdClient; -use crate::MetaResult; - -const META_ELECTION_KEY: &str = "__meta_election_"; - -#[derive(Debug, Serialize)] -pub struct ElectionMember { - pub id: String, - pub is_leader: bool, -} - -#[async_trait::async_trait] -pub trait ElectionClient: Send + Sync + 'static { - fn id(&self) -> MetaResult; - async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>; - fn subscribe(&self) -> Receiver; - async fn leader(&self) -> MetaResult>; - async fn get_members(&self) -> MetaResult>; - async fn is_leader(&self) -> bool; - - fn runtime_ref(&self) -> Option> { - None - } -} +use crate::{ElectionClient, ElectionMember, MetaResult}; pub struct EtcdElectionClient { id: String, is_leader_sender: watch::Sender, client: WrappedEtcdClient, - runtime: Arc, + runtime: Arc, } #[async_trait::async_trait] @@ -355,18 +335,19 @@ impl EtcdElectionClient { let client = WrappedEtcdClient::connect(endpoints, options, auth_enabled).await?; - let runtime = Runtime::new().map_err(|e| { - anyhow!( - "create runtime for election client failed {}", - e.to_string() - ) - })?; + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("risingwave-election-client") + .enable_all() + .build() + .map_err(|e| anyhow!(e))?; + + let runtime: Arc = Arc::new(runtime.into()); Ok(Self { id, is_leader_sender: sender, client, - runtime: Arc::new(runtime), + runtime, }) } } diff --git a/src/meta/src/rpc/election/mod.rs b/src/meta/src/rpc/election/mod.rs index 9835c554b3fd..1c2ab4a5ab53 100644 --- a/src/meta/src/rpc/election/mod.rs +++ b/src/meta/src/rpc/election/mod.rs @@ -14,6 +14,9 @@ pub mod etcd; pub mod sql; +use std::sync::Arc; + +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use serde::Serialize; use tokio::sync::watch::Receiver; @@ -35,4 +38,8 @@ pub trait ElectionClient: Send + Sync + 'static { async fn leader(&self) -> MetaResult>; async fn get_members(&self) -> MetaResult>; async fn is_leader(&self) -> bool; + + fn runtime_ref(&self) -> Option> { + None + } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index d6a7377f1992..871228f31dce 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -24,6 +24,7 @@ use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_connector::dispatch_source_prop; +use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData, @@ -52,6 +53,7 @@ pub struct SourceManager { barrier_scheduler: BarrierScheduler, core: Mutex, metrics: Arc, + runtime: Arc, } const MAX_FAIL_CNT: u32 = 10; @@ -496,11 +498,20 @@ impl SourceManager { fragment_manager: FragmentManagerRef, metrics: Arc, ) -> MetaResult { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("risingwave-source-manager") + .enable_all() + .build() + .map_err(|e| anyhow!(e))?; + + let runtime: Arc = Arc::new(runtime.into()); + let mut managed_sources = HashMap::new(); { let sources = catalog_manager.list_sources().await; for source in sources { Self::create_source_worker_async( + runtime.clone(), env.connector_client(), source, &mut managed_sources, @@ -534,6 +545,7 @@ impl SourceManager { core, paused: Mutex::new(()), metrics, + runtime, }) } @@ -675,6 +687,7 @@ impl SourceManager { tracing::warn!("source {} already registered", source.get_id()); } else { Self::create_source_worker( + self.runtime.clone(), self.env.connector_client(), source, &mut core.managed_sources, @@ -687,6 +700,7 @@ impl SourceManager { } fn create_source_worker_async( + runtime: Arc, connector_client: Option, source: Source, managed_sources: &mut HashMap, @@ -701,8 +715,7 @@ impl SourceManager { let source_id = source.id; let connector_properties = extract_prop_from_source(&source)?; - - let handle = tokio::spawn(async move { + let handle = runtime.spawn(async move { let mut ticker = time::interval(Self::DEFAULT_SOURCE_TICK_INTERVAL); ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -745,6 +758,7 @@ impl SourceManager { } async fn create_source_worker( + runtime: Arc, connector_client: Option, source: &Source, managed_sources: &mut HashMap, @@ -785,7 +799,7 @@ impl SourceManager { })??; } - tokio::spawn(async move { worker.run(sync_call_rx).await }) + runtime.spawn(async move { worker.run(sync_call_rx).await }) }); managed_sources.insert(