Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 5, 2023
1 parent 7933936 commit f3e2afe
Showing 1 changed file with 33 additions and 34 deletions.
67 changes: 33 additions & 34 deletions src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ impl CountdownTaskHandle {
}
}

/// Starts the [CountdownTask],
/// it will be ignored if the task started.
async fn start(&self, heartbeat_interval_millis: u64) {
if let Err(e) = self
.tx
Expand Down Expand Up @@ -355,7 +357,6 @@ struct CountdownTask {
}

impl CountdownTask {
///
async fn run(&mut self) {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
Expand All @@ -365,35 +366,29 @@ impl CountdownTask {
let countdown = tokio::time::sleep_until(far_future);
tokio::pin!(countdown);
let region_id = self.region_id;

let mut started = false;
loop {
tokio::select! {
command = self.rx.recv() => {
match command {
Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
// Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
// interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
if !started {
// Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat
// interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
started = true;
}
},
Some(CountdownCommand::Reset((role, deadline))) => {
// The first-time granted regions might be ignored because the `first_deadline` is larger than the `region_lease_timeout`.
// Therefore, we set writable at the outside.
// TODO(weny): Considers setting `first_deadline` to `region_lease_timeout`.
let _ = self.region_server.set_writable(self.region_id, role.writable());

if countdown.deadline() < deadline {
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:
// - not started yet;
// - during startup protection;
// - received a lagging heartbeat message.
// All can be safely ignored.
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later.",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
},
None => {
info!(
Expand All @@ -413,7 +408,7 @@ impl CountdownTask {
let _ = self.region_server.set_writable(self.region_id, false);
metrics::LEASE_EXPIRED_REGION
.with_label_values(&[&format!("{}", self.region_id)])
.add(1);
.inc();
// resets the countdown.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
countdown.as_mut().reset(far_future);
Expand Down Expand Up @@ -477,14 +472,23 @@ mod test {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
}],
Instant::now() + Duration::from_millis(500),
Instant::now() + Duration::from_millis(200),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);

info!("Wait for lease expired");
// Sleep to wait lease expired.
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);

let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline > Instant::now() + Duration::from_secs(86400 * 365 * 29));
}

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -507,21 +511,16 @@ mod test {
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
);
tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 4)).await;

// Reset deadline.
// A nearer deadline will be ignored.
countdown_handle
.reset_deadline(
RegionRole::Leader,
Instant::now() + Duration::from_millis(heartbeat_interval_millis),
)
.await;
// No effect.
countdown_handle.start(heartbeat_interval_millis).await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
> Instant::now() + Duration::from_secs(86400 * 365 * 29)
);

// Only a farther deadline will be accepted.
// Reset deadline.
countdown_handle
.reset_deadline(
RegionRole::Leader,
Expand Down

0 comments on commit f3e2afe

Please sign in to comment.