diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 985c70677bef..7e6bad0dec2e 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -298,6 +298,8 @@ impl CountdownTaskHandle { } } + /// Starts the [CountdownTask], + /// it will be ignored if the task started. async fn start(&self, heartbeat_interval_millis: u64) { if let Err(e) = self .tx @@ -364,16 +366,21 @@ impl CountdownTask { let countdown = tokio::time::sleep_until(far_future); tokio::pin!(countdown); let region_id = self.region_id; + + let mut started = false; loop { tokio::select! { command = self.rx.recv() => { match command { Some(CountdownCommand::Start(heartbeat_interval_millis)) => { - // Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat - // interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to - // network or other jitters during startup. - let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; - countdown.set(tokio::time::sleep_until(first_deadline)); + if !started { + // Set first deadline in 4 heartbeats (roughly after 12 seconds from now if heartbeat + // interval is set to default 3 seconds), to make Datanode and Metasrv more tolerable to + // network or other jitters during startup. + let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; + countdown.set(tokio::time::sleep_until(first_deadline)); + started = true; + } }, Some(CountdownCommand::Reset((role, deadline))) => { let _ = self.region_server.set_writable(self.region_id, role.writable()); @@ -504,21 +511,16 @@ mod test { countdown_handle.deadline().await.unwrap() > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3) ); + tokio::time::sleep(Duration::from_millis(heartbeat_interval_millis * 4)).await; - // Reset deadline. - // A nearer deadline will be ignored. - countdown_handle - .reset_deadline( - RegionRole::Leader, - Instant::now() + Duration::from_millis(heartbeat_interval_millis), - ) - .await; + // No effect. + countdown_handle.start(heartbeat_interval_millis).await; assert!( countdown_handle.deadline().await.unwrap() - > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3) + > Instant::now() + Duration::from_secs(86400 * 365 * 29) ); - // Only a farther deadline will be accepted. + // Reset deadline. countdown_handle .reset_deadline( RegionRole::Leader, diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index ded795861476..e997139b5357 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -44,6 +44,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_distributed_handle_ddl_request() { + common_telemetry::init_default_ut_logging(); let instance = tests::create_distributed_instance("test_distributed_handle_ddl_request").await;