diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 5f5fe9be1395..7ba4f2360cd0 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -34,6 +34,8 @@ use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; +use crate::error::{self, Result}; +use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; use crate::region_server::RegionServer; const MAX_CLOSE_RETRY_TIMES: usize = 10; @@ -54,7 +56,7 @@ pub struct RegionAliveKeeper { region_server: RegionServer, tasks: Arc>>>, heartbeat_interval_millis: u64, - started: AtomicBool, + started: Arc, /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically @@ -69,7 +71,7 @@ impl RegionAliveKeeper { region_server, tasks: Arc::new(Mutex::new(HashMap::new())), heartbeat_interval_millis, - started: AtomicBool::new(false), + started: Arc::new(AtomicBool::new(false)), epoch: Instant::now(), } } @@ -141,17 +143,72 @@ impl RegionAliveKeeper { deadline } - pub async fn start(&self) { + pub async fn start( + self: &Arc, + event_receiver: Option, + ) -> Result<()> { + self.started.store(true, Ordering::Relaxed); + + if let Some(mut event_receiver) = event_receiver { + let keeper = self.clone(); + // Initializers region alive keeper. + // It makes sure all opened regions are registered to `RegionAliveKeeper.` + loop { + match event_receiver.0.try_recv() { + Ok(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Ok(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + Err(mpsc::error::TryRecvError::Disconnected) => { + return error::UnexpectedSnafu { + violated: "RegionServerSender closed", + } + .fail() + } + Err(mpsc::error::TryRecvError::Empty) => { + break; + } + } + } + let running = self.started.clone(); + + // Watches changes + common_runtime::spawn_bg(async move { + loop { + if !running.load(Ordering::Relaxed) { + info!("shutdown heartbeat task"); + break; + } + + match event_receiver.0.recv().await { + Some(RegionServerEvent::Registered(region_id)) => { + keeper.register_region(region_id).await; + } + Some(RegionServerEvent::Deregistered(region_id)) => { + keeper.deregister_region(region_id).await; + } + None => { + info!("region server event sender closed!"); + break; + } + } + } + }); + } + let tasks = self.tasks.lock().await; for task in tasks.values() { task.start(self.heartbeat_interval_millis).await; } - self.started.store(true, Ordering::Relaxed); info!( "RegionAliveKeeper is started with region {:?}", tasks.keys().map(|x| x.to_string()).collect::>(), ); + + Ok(()) } pub fn epoch(&self) -> Instant { @@ -383,14 +440,14 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn region_alive_keeper() { let region_server = mock_region_server(); - let alive_keeper = RegionAliveKeeper::new(region_server, 300); + let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300)); let region_id = RegionId::new(1, 2); // register a region before starting alive_keeper.register_region(region_id).await; assert!(alive_keeper.find_handle(region_id).await.is_some()); - alive_keeper.start().await; + alive_keeper.start(None).await.unwrap(); // started alive keeper should assign deadline to this region let deadline = alive_keeper.deadline(region_id).await.unwrap(); diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 2fb2d5bbbe9e..c2fa5820e436 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -35,7 +35,7 @@ use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; -use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver}; +use crate::event_listener::RegionServerEventReceiver; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -142,7 +142,7 @@ impl HeartbeatTask { /// Start heartbeat task, spawn background task. pub async fn start( &self, - mut event_receiver: RegionServerEventReceiver, + event_receiver: RegionServerEventReceiver, notify: Arc, ) -> Result<()> { let running = self.running.clone(); @@ -159,8 +159,6 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); - self.region_alive_keeper.start().await; - let meta_client = self.meta_client.clone(); let region_server_clone = self.region_server.clone(); @@ -184,31 +182,7 @@ impl HeartbeatTask { }); let epoch = self.region_alive_keeper.epoch(); - let keeper = self.region_alive_keeper.clone(); - - common_runtime::spawn_bg(async move { - loop { - if !running.load(Ordering::Relaxed) { - info!("shutdown heartbeat task"); - break; - } - - match event_receiver.0.recv().await { - Some(RegionServerEvent::Registered(region_id)) => { - keeper.register_region(region_id).await; - } - Some(RegionServerEvent::Deregistered(region_id)) => { - keeper.deregister_region(region_id).await; - } - None => { - info!("region server event sender closed!"); - break; - } - } - } - }); - - let running = self.running.clone(); + self.region_alive_keeper.start(Some(event_receiver)).await?; common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0));