Skip to content

Commit

Permalink
refactor: watch changes in RegionAliveKeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 17, 2023
1 parent 381ffab commit 0470e91
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 35 deletions.
69 changes: 63 additions & 6 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};

use crate::error::{self, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::region_server::RegionServer;

const MAX_CLOSE_RETRY_TIMES: usize = 10;
Expand All @@ -54,7 +56,7 @@ pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
started: Arc<AtomicBool>,

/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
Expand All @@ -69,7 +71,7 @@ impl RegionAliveKeeper {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
started: Arc::new(AtomicBool::new(false)),
epoch: Instant::now(),
}
}
Expand Down Expand Up @@ -141,17 +143,72 @@ impl RegionAliveKeeper {
deadline
}

pub async fn start(&self) {
pub async fn start(
self: &Arc<Self>,
event_receiver: Option<RegionServerEventReceiver>,
) -> Result<()> {
self.started.store(true, Ordering::Relaxed);

if let Some(mut event_receiver) = event_receiver {
let keeper = self.clone();
// Initializers region alive keeper.
// It makes sure all opened regions are registered to `RegionAliveKeeper.`
loop {
match event_receiver.0.try_recv() {
Ok(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Ok(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
return error::UnexpectedSnafu {
violated: "RegionServerEventSender closed",
}
.fail()
}
Err(mpsc::error::TryRecvError::Empty) => {
break;
}
}
}
let running = self.started.clone();

// Watches changes
common_runtime::spawn_bg(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
break;
}

match event_receiver.0.recv().await {
Some(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Some(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
None => {
info!("region server event sender closed!");
break;
}
}
}
});
}

let tasks = self.tasks.lock().await;
for task in tasks.values() {
task.start(self.heartbeat_interval_millis).await;
}
self.started.store(true, Ordering::Relaxed);

info!(
"RegionAliveKeeper is started with region {:?}",
tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
);

Ok(())
}

pub fn epoch(&self) -> Instant {
Expand Down Expand Up @@ -383,14 +440,14 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn region_alive_keeper() {
let region_server = mock_region_server();
let alive_keeper = RegionAliveKeeper::new(region_server, 300);
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300));
let region_id = RegionId::new(1, 2);

// register a region before starting
alive_keeper.register_region(region_id).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());

alive_keeper.start().await;
alive_keeper.start(None).await.unwrap();

// started alive keeper should assign deadline to this region
let deadline = alive_keeper.deadline(region_id).await.unwrap();
Expand Down
32 changes: 3 additions & 29 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
use crate::event_listener::RegionServerEventReceiver;
use crate::region_server::RegionServer;

pub(crate) mod handler;
Expand Down Expand Up @@ -142,7 +142,7 @@ impl HeartbeatTask {
/// Start heartbeat task, spawn background task.
pub async fn start(
&self,
mut event_receiver: RegionServerEventReceiver,
event_receiver: RegionServerEventReceiver,
notify: Arc<Notify>,
) -> Result<()> {
let running = self.running.clone();
Expand All @@ -159,8 +159,6 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");

self.region_alive_keeper.start().await;

let meta_client = self.meta_client.clone();
let region_server_clone = self.region_server.clone();

Expand All @@ -184,31 +182,7 @@ impl HeartbeatTask {
});
let epoch = self.region_alive_keeper.epoch();

let keeper = self.region_alive_keeper.clone();

common_runtime::spawn_bg(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("shutdown heartbeat task");
break;
}

match event_receiver.0.recv().await {
Some(RegionServerEvent::Registered(region_id)) => {
keeper.register_region(region_id).await;
}
Some(RegionServerEvent::Deregistered(region_id)) => {
keeper.deregister_region(region_id).await;
}
None => {
info!("region server event sender closed!");
break;
}
}
}
});

let running = self.running.clone();
self.region_alive_keeper.start(Some(event_receiver)).await?;

common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
Expand Down

0 comments on commit 0470e91

Please sign in to comment.