diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2918fa8acac1..2393cd99a196 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -50,6 +50,10 @@ use crate::error::{ MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, }; +use crate::event_listener::{ + new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, + RegionServerEventReceiver, +}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::{new_metasrv_client, HeartbeatTask}; use crate::region_server::RegionServer; @@ -63,6 +67,7 @@ pub struct Datanode { opts: DatanodeOptions, services: Option, heartbeat_task: Option, + region_event_receiver: Option, region_server: RegionServer, greptimedb_telemetry_task: Arc, } @@ -77,9 +82,11 @@ impl Datanode { self.start_services().await } - pub async fn start_heartbeat(&self) -> Result<()> { + pub async fn start_heartbeat(&mut self) -> Result<()> { if let Some(task) = &self.heartbeat_task { - task.start().await?; + // Safety: The event_receiver must exist. + let receiver = self.region_event_receiver.take().unwrap(); + task.start(receiver).await?; } Ok(()) } @@ -185,8 +192,24 @@ impl DatanodeBuilder { // build and initialize region server let log_store = Self::build_log_store(&self.opts).await?; - let region_server = - Self::new_region_server(&self.opts, self.plugins.clone(), log_store).await?; + let (region_event_listener, region_event_receiver) = match mode { + Mode::Distributed => { + let (tx, rx) = new_region_server_event_channel(); + (Box::new(tx) as RegionServerEventListenerRef, Some(rx)) + } + Mode::Standalone => ( + Box::new(NoopRegionServerEventListener) as RegionServerEventListenerRef, + None, + ), + }; + + let region_server = Self::new_region_server( + &self.opts, + self.plugins.clone(), + log_store, + region_event_listener, + ) + .await?; self.initialize_region_server(®ion_server, kv_backend, matches!(mode, Mode::Standalone)) .await?; @@ -219,6 +242,7 @@ impl DatanodeBuilder { heartbeat_task, region_server, greptimedb_telemetry_task, + region_event_receiver, }) } @@ -276,6 +300,7 @@ impl DatanodeBuilder { opts: &DatanodeOptions, plugins: Arc, log_store: Arc, + event_listener: RegionServerEventListenerRef, ) -> Result { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. @@ -294,7 +319,8 @@ impl DatanodeBuilder { .context(RuntimeResourceSnafu)?, ); - let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone()); + let mut region_server = + RegionServer::new(query_engine.clone(), runtime.clone(), event_listener); let object_store = store::new_object_store(opts).await?; let engines = Self::build_store_engines(opts, log_store, object_store).await?; for engine in engines { diff --git a/src/datanode/src/event_listener.rs b/src/datanode/src/event_listener.rs new file mode 100644 index 000000000000..1c48f98d6c1d --- /dev/null +++ b/src/datanode/src/event_listener.rs @@ -0,0 +1,67 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::error; +use store_api::storage::RegionId; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + +pub enum RegionServerEvent { + Registered(RegionId), + Deregistered(RegionId), +} + +pub trait RegionServerEventListener: Sync + Send { + /// Called *after* a new region was created/opened. + fn on_region_registered(&self, _region_id: RegionId) {} + + /// Called *after* a region was closed. + fn on_region_deregistered(&self, _region_id: RegionId) {} +} + +pub type RegionServerEventListenerRef = Box; + +pub struct NoopRegionServerEventListener; + +impl RegionServerEventListener for NoopRegionServerEventListener {} + +#[derive(Debug, Clone)] +pub struct RegionServerEventSender(pub(crate) UnboundedSender); + +impl RegionServerEventListener for RegionServerEventSender { + fn on_region_registered(&self, region_id: RegionId) { + if let Err(e) = self.0.send(RegionServerEvent::Registered(region_id)) { + error!( + "Failed to send registering region: {region_id} event, source: {}", + e + ); + } + } + + fn on_region_deregistered(&self, region_id: RegionId) { + if let Err(e) = self.0.send(RegionServerEvent::Deregistered(region_id)) { + error!( + "Failed to send deregistering region: {region_id} event, source: {}", + e + ); + } + } +} + +pub struct RegionServerEventReceiver(pub(crate) UnboundedReceiver); + +pub fn new_region_server_event_channel() -> (RegionServerEventSender, RegionServerEventReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + + (RegionServerEventSender(tx), RegionServerEventReceiver(rx)) +} diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 74dffcc240b5..4ac90d497dec 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -35,6 +35,7 @@ use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; +use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -136,7 +137,7 @@ impl HeartbeatTask { } /// Start heartbeat task, spawn background task. - pub async fn start(&self) -> Result<()> { + pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> { let running = self.running.clone(); if running .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) @@ -180,6 +181,32 @@ impl HeartbeatTask { }); let epoch = self.region_alive_keeper.epoch(); + let keeper = self.region_alive_keeper.clone(); + + common_runtime::spawn_bg(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("shutdown heartbeat task"); + break; + } + + match event_receiver.0.recv().await { + Some(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Some(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + None => { + info!("region server event sender closed!"); + break; + } + } + } + }); + + let running = self.running.clone(); + common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 72f705c78c1b..890e5c38ce92 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -21,6 +21,7 @@ pub mod alive_keeper; pub mod config; pub mod datanode; pub mod error; +pub mod event_listener; mod greptimedb_telemetry; pub mod heartbeat; pub mod metrics; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index b168d85c875b..040c4a595768 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -59,6 +59,7 @@ use crate::error::{ GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, }; +use crate::event_listener::RegionServerEventListenerRef; #[derive(Clone)] pub struct RegionServer { @@ -66,9 +67,17 @@ pub struct RegionServer { } impl RegionServer { - pub fn new(query_engine: QueryEngineRef, runtime: Arc) -> Self { + pub fn new( + query_engine: QueryEngineRef, + runtime: Arc, + event_listener: RegionServerEventListenerRef, + ) -> Self { Self { - inner: Arc::new(RegionServerInner::new(query_engine, runtime)), + inner: Arc::new(RegionServerInner::new( + query_engine, + runtime, + event_listener, + )), } } @@ -185,15 +194,21 @@ struct RegionServerInner { region_map: DashMap, query_engine: QueryEngineRef, runtime: Arc, + event_listener: RegionServerEventListenerRef, } impl RegionServerInner { - pub fn new(query_engine: QueryEngineRef, runtime: Arc) -> Self { + pub fn new( + query_engine: QueryEngineRef, + runtime: Arc, + event_listener: RegionServerEventListenerRef, + ) -> Self { Self { engines: RwLock::new(HashMap::new()), region_map: DashMap::new(), query_engine, runtime, + event_listener, } } @@ -251,12 +266,14 @@ impl RegionServerInner { RegionChange::Register(_) => { info!("Region {region_id} is registered to engine {engine_type}"); self.region_map.insert(region_id, engine); + self.event_listener.on_region_registered(region_id); } RegionChange::Deregisters => { info!("Region {region_id} is deregistered from engine {engine_type}"); self.region_map .remove(®ion_id) .map(|(id, engine)| engine.set_writable(id, false)); + self.event_listener.on_region_deregistered(region_id); } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 629603b82383..6cb9392caea2 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -36,6 +36,7 @@ use session::context::QueryContextRef; use table::TableRef; use tokio::sync::mpsc::{self, Receiver}; +use crate::event_listener::NoopRegionServerEventListener; use crate::region_server::RegionServer; use crate::Instance; @@ -142,5 +143,6 @@ pub fn mock_region_server() -> RegionServer { RegionServer::new( Arc::new(MockQueryEngine), Arc::new(Runtime::builder().build().unwrap()), + Box::new(NoopRegionServerEventListener), ) }