diff --git a/README.md b/README.md
index a9156560eb2a..bbb794764aef 100644
--- a/README.md
+++ b/README.md
@@ -27,6 +27,9 @@
+> [!WARNING]
+> Our default branch has changed from `develop` to main. Please update your local repository to use the `main` branch.
+
## What is GreptimeDB
GreptimeDB is an open-source time-series database with a special focus on
diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs
index 1d51ff775d65..8c0ba9ecfa7a 100644
--- a/src/common/meta/src/instruction.rs
+++ b/src/common/meta/src/instruction.rs
@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
+use std::time::Duration;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber};
@@ -140,11 +141,12 @@ pub struct UpgradeRegion {
pub region_id: RegionId,
/// The `last_entry_id` of old leader region.
pub last_entry_id: Option,
- /// The second of waiting for a wal replay.
+ /// The timeout of waiting for a wal replay.
///
/// `None` stands for no wait,
/// it's helpful to verify whether the leader region is ready.
- pub wait_for_replay_secs: Option,
+ #[serde(with = "humantime_serde")]
+ pub wait_for_replay_timeout: Option,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs
index 07b8fe6f8349..95aa0bb22b58 100644
--- a/src/datanode/src/error.rs
+++ b/src/datanode/src/error.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
+use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -27,6 +28,19 @@ use table::error::Error as TableError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
+ #[snafu(display("Failed to execute async task"))]
+ AsyncTaskExecute {
+ location: Location,
+ source: Arc,
+ },
+
+ #[snafu(display("Failed to watch change"))]
+ WatchAsyncTaskChange {
+ location: Location,
+ #[snafu(source)]
+ error: tokio::sync::watch::error::RecvError,
+ },
+
#[snafu(display("Failed to handle heartbeat response"))]
HandleHeartbeatResponse {
location: Location,
@@ -292,7 +306,11 @@ impl ErrorExt for Error {
| MissingWalDirConfig { .. }
| MissingKvBackend { .. } => StatusCode::InvalidArguments,
- PayloadNotExist { .. } | Unexpected { .. } => StatusCode::Unexpected,
+ PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
+ StatusCode::Unexpected
+ }
+
+ AsyncTaskExecute { source, .. } => source.status_code(),
// TODO(yingwen): Further categorize http error.
ParseAddr { .. }
diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs
index 5a4502beb5ef..8845d0d5e850 100644
--- a/src/datanode/src/heartbeat.rs
+++ b/src/datanode/src/heartbeat.rs
@@ -41,6 +41,7 @@ use crate::metrics;
use crate::region_server::RegionServer;
pub(crate) mod handler;
+pub(crate) mod task_tracker;
pub struct HeartbeatTask {
node_id: u64,
diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs
index 5f4f5fa3ace8..c7533db5423c 100644
--- a/src/datanode/src/heartbeat/handler.rs
+++ b/src/datanode/src/heartbeat/handler.rs
@@ -17,130 +17,75 @@ use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
-use common_meta::instruction::{
- DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, OpenRegion, SimpleReply,
-};
+use common_meta::instruction::{Instruction, InstructionReply};
use common_meta::RegionIdent;
use common_telemetry::error;
use futures::future::BoxFuture;
use snafu::OptionExt;
-use store_api::path_utils::region_dir;
-use store_api::region_engine::SetReadonlyResponse;
-use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
-use crate::error;
+mod close_region;
+mod downgrade_region;
+mod open_region;
+mod upgrade_region;
+
+use super::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
+
/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
#[derive(Clone)]
pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
+ catchup_tasks: TaskTracker<()>,
}
/// Handler of the instruction.
pub type InstructionHandler =
- Box BoxFuture<'static, InstructionReply> + Send>;
+ Box BoxFuture<'static, InstructionReply> + Send>;
+
+#[derive(Clone)]
+pub struct HandlerContext {
+ region_server: RegionServer,
+ catchup_tasks: TaskTracker<()>,
+}
+
+impl HandlerContext {
+ fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
+ RegionId::new(region_ident.table_id, region_ident.region_number)
+ }
+}
impl RegionHeartbeatResponseHandler {
/// Returns the [RegionHeartbeatResponseHandler].
pub fn new(region_server: RegionServer) -> Self {
- Self { region_server }
+ Self {
+ region_server,
+ catchup_tasks: TaskTracker::new(),
+ }
}
/// Builds the [InstructionHandler].
fn build_handler(instruction: Instruction) -> MetaResult {
match instruction {
- Instruction::OpenRegion(OpenRegion {
- region_ident,
- region_storage_path,
- region_options,
- region_wal_options,
- skip_wal_replay,
- }) => Ok(Box::new(move |region_server| {
- Box::pin(async move {
- let region_id = Self::region_ident_to_region_id(®ion_ident);
- // TODO(niebayes): extends region options with region_wal_options.
- let _ = region_wal_options;
- let request = RegionRequest::Open(RegionOpenRequest {
- engine: region_ident.engine,
- region_dir: region_dir(®ion_storage_path, region_id),
- options: region_options,
- skip_wal_replay,
- });
- let result = region_server.handle_request(region_id, request).await;
-
- let success = result.is_ok();
- let error = result.as_ref().map_err(|e| e.to_string()).err();
-
- InstructionReply::OpenRegion(SimpleReply {
- result: success,
- error,
- })
- })
+ Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| {
+ handler_context.handle_open_region_instruction(open_region)
})),
- Instruction::CloseRegion(region_ident) => Ok(Box::new(|region_server| {
- Box::pin(async move {
- let region_id = Self::region_ident_to_region_id(®ion_ident);
- let request = RegionRequest::Close(RegionCloseRequest {});
- let result = region_server.handle_request(region_id, request).await;
-
- match result {
- Ok(_) => InstructionReply::CloseRegion(SimpleReply {
- result: true,
- error: None,
- }),
- Err(error::Error::RegionNotFound { .. }) => {
- InstructionReply::CloseRegion(SimpleReply {
- result: true,
- error: None,
- })
- }
- Err(err) => InstructionReply::CloseRegion(SimpleReply {
- result: false,
- error: Some(err.to_string()),
- }),
- }
- })
+ Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| {
+ handler_context.handle_close_region_instruction(close_region)
})),
- Instruction::DowngradeRegion(DowngradeRegion { region_id }) => {
- Ok(Box::new(move |region_server| {
- Box::pin(async move {
- match region_server.set_readonly_gracefully(region_id).await {
- Ok(SetReadonlyResponse::Success { last_entry_id }) => {
- InstructionReply::DowngradeRegion(DowngradeRegionReply {
- last_entry_id,
- exists: true,
- error: None,
- })
- }
- Ok(SetReadonlyResponse::NotFound) => {
- InstructionReply::DowngradeRegion(DowngradeRegionReply {
- last_entry_id: None,
- exists: false,
- error: None,
- })
- }
- Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
- last_entry_id: None,
- exists: false,
- error: Some(err.to_string()),
- }),
- }
- })
+ Instruction::DowngradeRegion(downgrade_region) => {
+ Ok(Box::new(move |handler_context| {
+ handler_context.handle_downgrade_region_instruction(downgrade_region)
}))
}
- Instruction::UpgradeRegion(_) => {
- todo!()
- }
+ Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
+ handler_context.handle_upgrade_region_instruction(upgrade_region)
+ })),
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
}
}
-
- fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
- RegionId::new(region_ident.table_id, region_ident.region_number)
- }
}
#[async_trait]
@@ -162,9 +107,14 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
+ let catchup_tasks = self.catchup_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_bg(async move {
- let reply = handler(region_server).await;
+ let reply = handler(HandlerContext {
+ region_server,
+ catchup_tasks,
+ })
+ .await;
if let Err(e) = mailbox.send((meta, reply)).await {
error!(e; "Failed to send reply to mailbox");
@@ -184,10 +134,12 @@ mod tests {
use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
+ use common_meta::instruction::{DowngradeRegion, OpenRegion};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
- use store_api::region_request::RegionRequest;
+ use store_api::path_utils::region_dir;
+ use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver};
diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs
new file mode 100644
index 000000000000..df72240789f3
--- /dev/null
+++ b/src/datanode/src/heartbeat/handler/close_region.rs
@@ -0,0 +1,54 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_error::ext::ErrorExt;
+use common_meta::instruction::{InstructionReply, SimpleReply};
+use common_meta::RegionIdent;
+use common_telemetry::warn;
+use futures_util::future::BoxFuture;
+use store_api::region_request::{RegionCloseRequest, RegionRequest};
+
+use crate::error;
+use crate::heartbeat::handler::HandlerContext;
+
+impl HandlerContext {
+ pub(crate) fn handle_close_region_instruction(
+ self,
+ region_ident: RegionIdent,
+ ) -> BoxFuture<'static, InstructionReply> {
+ Box::pin(async move {
+ let region_id = Self::region_ident_to_region_id(®ion_ident);
+ let request = RegionRequest::Close(RegionCloseRequest {});
+ let result = self.region_server.handle_request(region_id, request).await;
+
+ match result {
+ Ok(_) => InstructionReply::CloseRegion(SimpleReply {
+ result: true,
+ error: None,
+ }),
+ Err(error::Error::RegionNotFound { .. }) => {
+ warn!("Received a close region instruction from meta, but target region:{region_id} is not found.");
+ InstructionReply::CloseRegion(SimpleReply {
+ result: true,
+ error: None,
+ })
+ }
+ Err(err) => InstructionReply::CloseRegion(SimpleReply {
+ result: false,
+ error: Some(err.output_msg()),
+ }),
+ }
+ })
+ }
+}
diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs
new file mode 100644
index 000000000000..dbf5d72ffd6e
--- /dev/null
+++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs
@@ -0,0 +1,51 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_error::ext::ErrorExt;
+use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
+use futures_util::future::BoxFuture;
+use store_api::region_engine::SetReadonlyResponse;
+
+use crate::heartbeat::handler::HandlerContext;
+
+impl HandlerContext {
+ pub(crate) fn handle_downgrade_region_instruction(
+ self,
+ DowngradeRegion { region_id }: DowngradeRegion,
+ ) -> BoxFuture<'static, InstructionReply> {
+ Box::pin(async move {
+ match self.region_server.set_readonly_gracefully(region_id).await {
+ Ok(SetReadonlyResponse::Success { last_entry_id }) => {
+ InstructionReply::DowngradeRegion(DowngradeRegionReply {
+ last_entry_id,
+ exists: true,
+ error: None,
+ })
+ }
+ Ok(SetReadonlyResponse::NotFound) => {
+ InstructionReply::DowngradeRegion(DowngradeRegionReply {
+ last_entry_id: None,
+ exists: false,
+ error: None,
+ })
+ }
+ Err(err) => InstructionReply::DowngradeRegion(DowngradeRegionReply {
+ last_entry_id: None,
+ exists: true,
+ error: Some(err.output_msg()),
+ }),
+ }
+ })
+ }
+}
diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs
new file mode 100644
index 000000000000..131026be4469
--- /dev/null
+++ b/src/datanode/src/heartbeat/handler/open_region.rs
@@ -0,0 +1,55 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_error::ext::ErrorExt;
+use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
+use futures_util::future::BoxFuture;
+use store_api::path_utils::region_dir;
+use store_api::region_request::{RegionOpenRequest, RegionRequest};
+
+use crate::heartbeat::handler::HandlerContext;
+
+impl HandlerContext {
+ pub(crate) fn handle_open_region_instruction(
+ self,
+ OpenRegion {
+ region_ident,
+ region_storage_path,
+ region_options,
+ region_wal_options,
+ skip_wal_replay,
+ }: OpenRegion,
+ ) -> BoxFuture<'static, InstructionReply> {
+ Box::pin(async move {
+ let region_id = Self::region_ident_to_region_id(®ion_ident);
+ // TODO(niebayes): extends region options with region_wal_options.
+ let _ = region_wal_options;
+ let request = RegionRequest::Open(RegionOpenRequest {
+ engine: region_ident.engine,
+ region_dir: region_dir(®ion_storage_path, region_id),
+ options: region_options,
+ skip_wal_replay,
+ });
+ let result = self.region_server.handle_request(region_id, request).await;
+
+ let success = result.is_ok();
+ let error = result.as_ref().map_err(|e| e.output_msg()).err();
+
+ InstructionReply::OpenRegion(SimpleReply {
+ result: success,
+ error,
+ })
+ })
+ }
+}
diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs
new file mode 100644
index 000000000000..63a6d6c813f5
--- /dev/null
+++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs
@@ -0,0 +1,363 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_error::ext::ErrorExt;
+use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
+use common_telemetry::warn;
+use futures_util::future::BoxFuture;
+use store_api::region_request::{RegionCatchupRequest, RegionRequest};
+
+use crate::heartbeat::handler::HandlerContext;
+use crate::heartbeat::task_tracker::WaitResult;
+
+impl HandlerContext {
+ pub(crate) fn handle_upgrade_region_instruction(
+ self,
+ UpgradeRegion {
+ region_id,
+ last_entry_id,
+ wait_for_replay_timeout,
+ }: UpgradeRegion,
+ ) -> BoxFuture<'static, InstructionReply> {
+ Box::pin(async move {
+ let Some(writable) = self.region_server.is_writable(region_id) else {
+ return InstructionReply::UpgradeRegion(UpgradeRegionReply {
+ ready: false,
+ exists: false,
+ error: None,
+ });
+ };
+
+ if writable {
+ return InstructionReply::UpgradeRegion(UpgradeRegionReply {
+ ready: true,
+ exists: true,
+ error: None,
+ });
+ }
+
+ let region_server_moved = self.region_server.clone();
+
+ // The catchup task is almost zero cost if the inside region is writable.
+ // Therefore, it always registers a new catchup task.
+ let register_result = self
+ .catchup_tasks
+ .try_register(
+ region_id,
+ Box::pin(async move {
+ region_server_moved
+ .handle_request(
+ region_id,
+ RegionRequest::Catchup(RegionCatchupRequest {
+ set_writable: true,
+ entry_id: last_entry_id,
+ }),
+ )
+ .await?;
+
+ Ok(())
+ }),
+ )
+ .await;
+
+ if register_result.is_busy() {
+ warn!("Another catchup task is running for the region: {region_id}");
+ }
+
+ // Returns immediately
+ let Some(wait_for_replay_timeout) = wait_for_replay_timeout else {
+ return InstructionReply::UpgradeRegion(UpgradeRegionReply {
+ ready: false,
+ exists: true,
+ error: None,
+ });
+ };
+
+ // We don't care that it returns a newly registered or running task.
+ let mut watcher = register_result.into_watcher();
+ let result = self
+ .catchup_tasks
+ .wait(&mut watcher, wait_for_replay_timeout)
+ .await;
+
+ match result {
+ WaitResult::Timeout => InstructionReply::UpgradeRegion(UpgradeRegionReply {
+ ready: false,
+ exists: true,
+ error: None,
+ }),
+ WaitResult::Finish(Ok(_)) => InstructionReply::UpgradeRegion(UpgradeRegionReply {
+ ready: true,
+ exists: true,
+ error: None,
+ }),
+ WaitResult::Finish(Err(err)) => {
+ InstructionReply::UpgradeRegion(UpgradeRegionReply {
+ ready: false,
+ exists: true,
+ error: Some(err.output_msg()),
+ })
+ }
+ }
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::assert_matches::assert_matches;
+ use std::time::Duration;
+
+ use common_meta::instruction::{InstructionReply, UpgradeRegion};
+ use store_api::region_engine::RegionRole;
+ use store_api::storage::RegionId;
+ use tokio::time::Instant;
+
+ use crate::error;
+ use crate::heartbeat::handler::HandlerContext;
+ use crate::heartbeat::task_tracker::TaskTracker;
+ use crate::tests::{mock_region_server, MockRegionEngine};
+
+ #[tokio::test]
+ async fn test_region_not_exist() {
+ let mut mock_region_server = mock_region_server();
+ let (mock_engine, _) = MockRegionEngine::new();
+ mock_region_server.register_engine(mock_engine);
+
+ let handler_context = HandlerContext {
+ region_server: mock_region_server,
+ catchup_tasks: TaskTracker::new(),
+ };
+
+ let region_id = RegionId::new(1024, 1);
+ let waits = vec![None, Some(Duration::from_millis(100u64))];
+
+ for wait_for_replay_timeout in waits {
+ let reply = handler_context
+ .clone()
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout,
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(!reply.exists);
+ assert!(reply.error.is_none());
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_region_writable() {
+ let mock_region_server = mock_region_server();
+ let region_id = RegionId::new(1024, 1);
+
+ let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
+ region_engine.mock_role = Some(Some(RegionRole::Leader));
+ region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
+ // Should be unreachable.
+ unreachable!();
+ }));
+ });
+ mock_region_server.register_test_region(region_id, mock_engine);
+
+ let handler_context = HandlerContext {
+ region_server: mock_region_server,
+ catchup_tasks: TaskTracker::new(),
+ };
+
+ let waits = vec![None, Some(Duration::from_millis(100u64))];
+
+ for wait_for_replay_timeout in waits {
+ let reply = handler_context
+ .clone()
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout,
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(reply.ready);
+ assert!(reply.exists);
+ assert!(reply.error.is_none());
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_region_not_ready() {
+ let mock_region_server = mock_region_server();
+ let region_id = RegionId::new(1024, 1);
+
+ let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
+ // Region is not ready.
+ region_engine.mock_role = Some(Some(RegionRole::Follower));
+ region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
+ // Note: Don't change.
+ region_engine.handle_request_delay = Some(Duration::from_secs(100));
+ });
+ mock_region_server.register_test_region(region_id, mock_engine);
+
+ let handler_context = HandlerContext {
+ region_server: mock_region_server,
+ catchup_tasks: TaskTracker::new(),
+ };
+
+ let waits = vec![None, Some(Duration::from_millis(100u64))];
+
+ for wait_for_replay_timeout in waits {
+ let reply = handler_context
+ .clone()
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout,
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(!reply.ready);
+ assert!(reply.exists);
+ assert!(reply.error.is_none());
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_region_not_ready_with_retry() {
+ let mock_region_server = mock_region_server();
+ let region_id = RegionId::new(1024, 1);
+
+ let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
+ // Region is not ready.
+ region_engine.mock_role = Some(Some(RegionRole::Follower));
+ region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
+ // Note: Don't change.
+ region_engine.handle_request_delay = Some(Duration::from_millis(300));
+ });
+ mock_region_server.register_test_region(region_id, mock_engine);
+
+ let waits = vec![
+ Some(Duration::from_millis(100u64)),
+ Some(Duration::from_millis(100u64)),
+ ];
+
+ let handler_context = HandlerContext {
+ region_server: mock_region_server,
+ catchup_tasks: TaskTracker::new(),
+ };
+
+ for wait_for_replay_timeout in waits {
+ let reply = handler_context
+ .clone()
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout,
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(!reply.ready);
+ assert!(reply.exists);
+ assert!(reply.error.is_none());
+ }
+ }
+
+ let timer = Instant::now();
+ let reply = handler_context
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout: Some(Duration::from_millis(500)),
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+ // Must less than 300 ms.
+ assert!(timer.elapsed().as_millis() < 300);
+
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(reply.ready);
+ assert!(reply.exists);
+ assert!(reply.error.is_none());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_region_error() {
+ let mock_region_server = mock_region_server();
+ let region_id = RegionId::new(1024, 1);
+
+ let (mock_engine, _) = MockRegionEngine::with_custom_apply_fn(|region_engine| {
+ // Region is not ready.
+ region_engine.mock_role = Some(Some(RegionRole::Follower));
+ region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
+ error::UnexpectedSnafu {
+ violated: "mock_error".to_string(),
+ }
+ .fail()
+ }));
+ // Note: Don't change.
+ region_engine.handle_request_delay = Some(Duration::from_millis(100));
+ });
+ mock_region_server.register_test_region(region_id, mock_engine);
+
+ let handler_context = HandlerContext {
+ region_server: mock_region_server,
+ catchup_tasks: TaskTracker::new(),
+ };
+
+ let reply = handler_context
+ .clone()
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout: None,
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+
+ // It didn't wait for handle returns; it had no idea about the error.
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(!reply.ready);
+ assert!(reply.exists);
+ assert!(reply.error.is_none());
+ }
+
+ let reply = handler_context
+ .clone()
+ .handle_upgrade_region_instruction(UpgradeRegion {
+ region_id,
+ last_entry_id: None,
+ wait_for_replay_timeout: Some(Duration::from_millis(200)),
+ })
+ .await;
+ assert_matches!(reply, InstructionReply::UpgradeRegion(_));
+
+ if let InstructionReply::UpgradeRegion(reply) = reply {
+ assert!(!reply.ready);
+ assert!(reply.exists);
+ assert!(reply.error.is_some());
+ assert!(reply.error.unwrap().contains("mock_error"));
+ }
+ }
+}
diff --git a/src/datanode/src/heartbeat/task_tracker.rs b/src/datanode/src/heartbeat/task_tracker.rs
new file mode 100644
index 000000000000..626754722204
--- /dev/null
+++ b/src/datanode/src/heartbeat/task_tracker.rs
@@ -0,0 +1,279 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures_util::future::BoxFuture;
+use snafu::ResultExt;
+use store_api::storage::RegionId;
+use tokio::sync::watch::{self, Receiver};
+use tokio::sync::RwLock;
+
+use crate::error::{self, Error, Result};
+
+/// The state of a async task.
+#[derive(Debug, Default, Clone)]
+pub(crate) enum TaskState {
+ Error(Arc),
+ #[default]
+ Running,
+ Done(T),
+}
+
+pub(crate) type TaskWatcher = Receiver>;
+
+async fn wait(watcher: &mut TaskWatcher) -> Result {
+ loop {
+ watcher
+ .changed()
+ .await
+ .context(error::WatchAsyncTaskChangeSnafu)?;
+
+ let r = &*watcher.borrow();
+ match r {
+ TaskState::Error(err) => return Err(err.clone()).context(error::AsyncTaskExecuteSnafu),
+ TaskState::Running => {}
+ TaskState::Done(value) => return Ok(value.clone()),
+ }
+ }
+}
+
+/// The running async task.
+pub(crate) struct Task {
+ watcher: TaskWatcher,
+}
+
+pub(crate) struct TaskTrackerInner {
+ state: HashMap>,
+}
+
+impl Default for TaskTrackerInner {
+ fn default() -> Self {
+ TaskTrackerInner {
+ state: HashMap::new(),
+ }
+ }
+}
+
+/// Tracks the long-running async tasks.
+#[derive(Clone)]
+pub(crate) struct TaskTracker {
+ inner: Arc>>,
+}
+
+/// The registering result of a async task.
+pub(crate) enum RegisterResult {
+ // The watcher of the running task.
+ Busy(TaskWatcher),
+ // The watcher of the newly registered task.
+ Running(TaskWatcher),
+}
+
+impl RegisterResult {
+ pub(crate) fn into_watcher(self) -> TaskWatcher {
+ match self {
+ RegisterResult::Busy(inner) => inner,
+ RegisterResult::Running(inner) => inner,
+ }
+ }
+
+ /// Returns true if it's [RegisterResult::Busy].
+ pub(crate) fn is_busy(&self) -> bool {
+ matches!(self, RegisterResult::Busy(_))
+ }
+
+ #[cfg(test)]
+ /// Returns true if it's [RegisterResult::Running].
+ pub(crate) fn is_running(&self) -> bool {
+ matches!(self, RegisterResult::Running(_))
+ }
+}
+
+/// The result of waiting.
+pub(crate) enum WaitResult {
+ Timeout,
+ Finish(Result),
+}
+
+#[cfg(test)]
+impl WaitResult {
+ /// Returns true if it's [WaitResult::Timeout].
+ pub(crate) fn is_timeout(&self) -> bool {
+ matches!(self, WaitResult::Timeout)
+ }
+
+ /// Into the [WaitResult::Timeout] if it's.
+ pub(crate) fn into_finish(self) -> Option> {
+ match self {
+ WaitResult::Timeout => None,
+ WaitResult::Finish(result) => Some(result),
+ }
+ }
+}
+
+impl TaskTracker {
+ /// Returns an empty [AsyncTaskTracker].
+ pub(crate) fn new() -> Self {
+ Self {
+ inner: Arc::new(RwLock::new(TaskTrackerInner::default())),
+ }
+ }
+
+ /// Waits for a [RegisterResult] and returns a [WaitResult].
+ pub(crate) async fn wait(
+ &self,
+ watcher: &mut TaskWatcher,
+ timeout: Duration,
+ ) -> WaitResult {
+ match tokio::time::timeout(timeout, wait(watcher)).await {
+ Ok(result) => WaitResult::Finish(result),
+ Err(_) => WaitResult::Timeout,
+ }
+ }
+
+ /// Tries to register a new async task, returns [RegisterResult::Busy] if previous task is running.
+ pub(crate) async fn try_register(
+ &self,
+ region_id: RegionId,
+ fut: BoxFuture<'static, Result>,
+ ) -> RegisterResult {
+ let mut inner = self.inner.write().await;
+ if let Some(task) = inner.state.get(®ion_id) {
+ RegisterResult::Busy(task.watcher.clone())
+ } else {
+ let moved_inner = self.inner.clone();
+ let (tx, rx) = watch::channel(TaskState::::Running);
+ common_runtime::spawn_bg(async move {
+ match fut.await {
+ Ok(result) => {
+ let _ = tx.send(TaskState::Done(result));
+ }
+ Err(err) => {
+ let _ = tx.send(TaskState::Error(Arc::new(err)));
+ }
+ };
+ moved_inner.write().await.state.remove(®ion_id);
+ });
+ inner.state.insert(
+ region_id,
+ Task {
+ watcher: rx.clone(),
+ },
+ );
+
+ RegisterResult::Running(rx.clone())
+ }
+ }
+
+ #[cfg(test)]
+ async fn watcher(&self, region_id: RegionId) -> Option> {
+ self.inner
+ .read()
+ .await
+ .state
+ .get(®ion_id)
+ .map(|task| task.watcher.clone())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use store_api::storage::RegionId;
+ use tokio::sync::oneshot;
+
+ use crate::heartbeat::task_tracker::{wait, TaskTracker};
+
+ #[derive(Debug, Clone, PartialEq, Eq)]
+ struct TestResult {
+ value: i32,
+ }
+
+ #[tokio::test]
+ async fn test_async_task_tracker_register() {
+ let tracker = TaskTracker::::new();
+ let region_id = RegionId::new(1024, 1);
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let result = tracker
+ .try_register(
+ region_id,
+ Box::pin(async move {
+ let _ = rx.await;
+ Ok(TestResult { value: 1024 })
+ }),
+ )
+ .await;
+
+ assert!(result.is_running());
+
+ let result = tracker
+ .try_register(
+ region_id,
+ Box::pin(async move { Ok(TestResult { value: 1023 }) }),
+ )
+ .await;
+ assert!(result.is_busy());
+ let mut watcher = tracker.watcher(region_id).await.unwrap();
+ // Triggers first future return.
+ tx.send(()).unwrap();
+
+ assert_eq!(
+ TestResult { value: 1024 },
+ wait(&mut watcher).await.unwrap()
+ );
+ let result = tracker
+ .try_register(
+ region_id,
+ Box::pin(async move { Ok(TestResult { value: 1022 }) }),
+ )
+ .await;
+ assert!(result.is_running());
+ }
+
+ #[tokio::test]
+ async fn test_async_task_tracker_wait_timeout() {
+ let tracker = TaskTracker::::new();
+ let region_id = RegionId::new(1024, 1);
+ let (tx, rx) = oneshot::channel::<()>();
+
+ let result = tracker
+ .try_register(
+ region_id,
+ Box::pin(async move {
+ let _ = rx.await;
+ Ok(TestResult { value: 1024 })
+ }),
+ )
+ .await;
+
+ let mut watcher = result.into_watcher();
+ let result = tracker.wait(&mut watcher, Duration::from_millis(100)).await;
+ assert!(result.is_timeout());
+
+ // Triggers first future return.
+ tx.send(()).unwrap();
+ let result = tracker
+ .wait(&mut watcher, Duration::from_millis(100))
+ .await
+ .into_finish()
+ .unwrap()
+ .unwrap();
+ assert_eq!(TestResult { value: 1024 }, result);
+ assert!(tracker.watcher(region_id).await.is_none());
+ }
+}
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index 94ad5870866f..26a14c997d57 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -139,6 +139,16 @@ impl RegionServer {
.collect()
}
+ pub fn is_writable(&self, region_id: RegionId) -> Option {
+ // TODO(weny): Finds a better way.
+ self.inner.region_map.get(®ion_id).and_then(|engine| {
+ engine.role(region_id).map(|role| match role {
+ RegionRole::Follower => false,
+ RegionRole::Leader => true,
+ })
+ })
+ }
+
pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> {
let engine = self
.inner
@@ -178,6 +188,14 @@ impl RegionServer {
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
+
+ #[cfg(test)]
+ /// Registers a region for test purpose.
+ pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
+ self.inner
+ .region_map
+ .insert(region_id, RegionEngineWithStatus::Ready(engine));
+ }
}
#[async_trait]
diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs
index 37e8ea2ce77a..80351f55718b 100644
--- a/src/datanode/src/tests.rs
+++ b/src/datanode/src/tests.rs
@@ -14,6 +14,7 @@
use std::any::Any;
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
use common_error::ext::BoxedError;
@@ -93,7 +94,9 @@ pub type MockRequestHandler =
pub struct MockRegionEngine {
sender: Sender<(RegionId, RegionRequest)>,
- handle_request_mock_fn: Option,
+ pub(crate) handle_request_delay: Option,
+ pub(crate) handle_request_mock_fn: Option,
+ pub(crate) mock_role: Option