diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 4c4148f184af..2b2b84fff81a 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -33,5 +33,8 @@ pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS; /// The lease seconds of metasrv leader. pub const META_LEASE_SECS: u64 = 3; -// In a lease, there are two opportunities for renewal. +/// In a lease, there are two opportunities for renewal. pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; + +/// The default mailbox round-trip timeout. +pub const MAILBOX_RTT_SECS: u64 = 1; diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 7e787b41433e..8ad58552f267 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -111,6 +111,7 @@ impl OpenRegion { /// The instruction of downgrading leader region. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DowngradeRegion { + /// The [RegionId]. pub region_id: RegionId, } @@ -120,20 +121,67 @@ impl Display for DowngradeRegion { } } +/// Upgrades a follower region to leader region. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpgradeRegion { + /// The [RegionId]. + pub region_id: RegionId, + /// The `last_entry_id` of old leader region. + pub last_entry_id: Option, + /// The second of waiting for a wal replay. + /// + /// `None` stands for no wait, + /// it's helpful to verify whether the leader region is ready. + pub wait_for_replay_secs: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, Display)] pub enum Instruction { + /// Opens a region. + /// + /// - Returns true if a specified region exists. OpenRegion(OpenRegion), + /// Closes a region. + /// + /// - Returns true if a specified region does not exist. CloseRegion(RegionIdent), + /// Upgrades a region. + UpgradeRegion(UpgradeRegion), + /// Downgrades a region. DowngradeRegion(DowngradeRegion), + /// Invalidates a specified table cache. InvalidateTableIdCache(TableId), + /// Invalidates a specified table name index cache. InvalidateTableNameCache(TableName), } +/// The reply of [UpgradeRegion]. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct UpgradeRegionReply { + /// Returns true if `last_entry_id` has been replayed to the latest. + pub ready: bool, + /// Indicates whether the region exists. + pub exists: bool, + /// Returns error if any. + pub error: Option, +} + +impl Display for UpgradeRegionReply { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "(ready={}, exists={}, error={:?})", + self.ready, self.exists, self.error + ) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { OpenRegion(SimpleReply), CloseRegion(SimpleReply), + UpgradeRegion(UpgradeRegionReply), InvalidateTableCache(SimpleReply), DowngradeRegion(DowngradeRegionReply), } @@ -143,6 +191,7 @@ impl Display for InstructionReply { match self { Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply), Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply), + Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply), Self::InvalidateTableCache(reply) => { write!(f, "InstructionReply::Invalidate({})", reply) } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 9985fd6cf40e..ee243942e787 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -62,6 +62,9 @@ impl RegionHeartbeatResponseHandler { let close_region_req = RegionRequest::Close(RegionCloseRequest {}); Ok((region_id, close_region_req)) } + Instruction::UpgradeRegion(_) => { + todo!() + } Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { InvalidHeartbeatResponseSnafu.fail() } @@ -86,6 +89,9 @@ impl RegionHeartbeatResponseHandler { result: false, error: None, }), + Instruction::UpgradeRegion(_) => { + todo!() + } Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { InstructionReply::InvalidateTableCache(SimpleReply { result: false, @@ -118,6 +124,9 @@ impl RegionHeartbeatResponseHandler { reply.error = error; } }, + InstructionReply::UpgradeRegion(_) => { + todo!() + } InstructionReply::InvalidateTableCache(reply) => { reply.result = success; reply.error = error; diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 51bed22f1260..f3834cc5e9b5 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::time::Duration; use api::v1::meta::MailboxMessage; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::{MAILBOX_RTT_SECS, REGION_LEASE_SECS}; use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, }; @@ -31,7 +31,7 @@ use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(1); +const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS); #[derive(Debug, Serialize, Deserialize)] pub struct DowngradeLeaderRegion { @@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion { tokio::time::sleep_until(*deadline).await; } - Ok(Box::new(UpgradeCandidateRegion)) + Ok(Box::::default()) } fn as_any(&self) -> &dyn Any { @@ -159,7 +159,7 @@ impl DowngradeLeaderRegion { } Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( - "Mailbox received timeout for downgrade leader region {region_id} on Datanode {:?}", + "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}", leader, ); error::RetryLaterSnafu { reason }.fail() diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 33b465f46b35..bd92217d8722 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -18,6 +18,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::ddl::utils::region_storage_path; +use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::RegionIdent; use serde::{Deserialize, Serialize}; @@ -29,7 +30,7 @@ use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeader use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(1); +const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS); #[derive(Debug, Serialize, Deserialize)] pub struct OpenCandidateRegion; @@ -152,7 +153,7 @@ impl OpenCandidateRegion { } else { error::RetryLaterSnafu { reason: format!( - "Region {region_id} is not opened by Datanode {:?}, error: {error:?}", + "Region {region_id} is not opened by datanode {:?}, error: {error:?}", candidate, ), } @@ -161,7 +162,7 @@ impl OpenCandidateRegion { } Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( - "Mailbox received timeout for open candidate region {region_id} on Datanode {:?}", + "Mailbox received timeout for open candidate region {region_id} on datanode {:?}", candidate, ); error::RetryLaterSnafu { reason }.fail() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index 7d56c51390ae..a0ea0fa1f3b9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -32,9 +32,9 @@ use crate::procedure::region_migration::{Context, State}; pub enum UpdateMetadata { /// Downgrades the leader region. Downgrade, - /// Upgrade the candidate region. + /// Upgrades the candidate region. Upgrade, - /// Rollback the downgraded leader region. + /// Rolls back the downgraded region. Rollback, } diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 8b15a0730f7a..e3eb6e2f1b9d 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -13,19 +13,55 @@ // limitations under the License. use std::any::Any; +use std::time::Duration; +use api::v1::meta::MailboxMessage; +use common_meta::distributed_time_constants::MAILBOX_RTT_SECS; +use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; +use common_telemetry::warn; use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::time::sleep; -use crate::error::Result; +use super::update_metadata::UpdateMetadata; +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::{Context, State}; +use crate::service::mailbox::Channel; + #[derive(Debug, Serialize, Deserialize)] -pub struct UpgradeCandidateRegion; +pub struct UpgradeCandidateRegion { + // The optimistic retry times. + optimistic_retry: usize, + // The retry initial interval. + retry_initial_interval: Duration, + // The replay timeout of a instruction. + replay_timeout: Duration, + // If it's true it requires the candidate region MUST replay the WAL to the latest entry id. + // Otherwise, it will rollback to the old leader region. + require_ready: bool, +} + +impl Default for UpgradeCandidateRegion { + fn default() -> Self { + Self { + optimistic_retry: 3, + retry_initial_interval: Duration::from_millis(500), + replay_timeout: Duration::from_millis(1000), + require_ready: true, + } + } +} #[async_trait::async_trait] #[typetag::serde] impl State for UpgradeCandidateRegion { - async fn next(&mut self, _ctx: &mut Context) -> Result> { - todo!(); + async fn next(&mut self, ctx: &mut Context) -> Result> { + if self.upgrade_region_with_retry(ctx).await { + Ok(Box::new(UpdateMetadata::Upgrade)) + } else { + Ok(Box::new(UpdateMetadata::Rollback)) + } } fn as_any(&self) -> &dyn Any { @@ -33,4 +69,494 @@ impl State for UpgradeCandidateRegion { } } -impl UpgradeCandidateRegion {} +impl UpgradeCandidateRegion { + const UPGRADE_CANDIDATE_REGION_RTT: Duration = Duration::from_secs(MAILBOX_RTT_SECS); + + /// Returns the timeout of the upgrade candidate region. + /// + /// Equals `replay_timeout` + RTT + fn send_upgrade_candidate_region_timeout(&self) -> Duration { + self.replay_timeout + UpgradeCandidateRegion::UPGRADE_CANDIDATE_REGION_RTT + } + + /// Builds upgrade region instruction. + fn build_upgrade_region_instruction(&self, ctx: &Context) -> Instruction { + let pc = &ctx.persistent_ctx; + let region_id = pc.region_id; + let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id; + + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + last_entry_id, + wait_for_replay_secs: Some(self.replay_timeout.as_secs()), + }) + } + + /// Tries to upgrade a candidate region. + /// + /// Retry: + /// - If `require_ready` is true, but the candidate region returns `ready` is false. + /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout. + /// + /// Abort: + /// - The candidate region doesn't exist. + /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable. + /// - [PushMessage](error::Error::PushMessage), The receiver is dropped. + /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible). + /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible). + /// - Invalid JSON (impossible). + async fn upgrade_region(&self, ctx: &Context, upgrade_instruction: &Instruction) -> Result<()> { + let pc = &ctx.persistent_ctx; + let region_id = pc.region_id; + let candidate = &pc.to_peer; + + let msg = MailboxMessage::json_message( + &format!("Upgrade candidate region: {}", region_id), + &format!("Meta@{}", ctx.server_addr()), + &format!("Datanode-{}@{}", candidate.id, candidate.addr), + common_time::util::current_time_millis(), + upgrade_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: upgrade_instruction.to_string(), + })?; + + let ch = Channel::Datanode(candidate.id); + let receiver = ctx + .mailbox + .send(&ch, msg, self.send_upgrade_candidate_region_timeout()) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + let InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready, + exists, + error, + }) = reply + else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "Unexpected reply of the upgrade region instruction", + } + .fail(); + }; + + // Notes: The order of handling is important. + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to upgrade the region {} on datanode {:?}, error: {:?}", + region_id, candidate, error + ), + } + .fail(); + } + + ensure!( + exists, + error::UnexpectedSnafu { + violated: format!( + "Expected region {} doesn't exist on datanode {:?}", + region_id, candidate + ) + } + ); + + if self.require_ready && !ready { + return error::RetryLaterSnafu { + reason: format!( + "Candidate region {} still replaying the wal on datanode {:?}", + region_id, candidate + ), + } + .fail(); + } + + Ok(()) + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}", + candidate, + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(err) => Err(err), + } + } + + /// Upgrades a candidate region. + /// + /// Returns true if the candidate region is upgraded successfully. + async fn upgrade_region_with_retry(&self, ctx: &Context) -> bool { + let upgrade_instruction = self.build_upgrade_region_instruction(ctx); + + let mut retry = 0; + let mut upgraded = false; + + loop { + if let Err(err) = self.upgrade_region(ctx, &upgrade_instruction).await { + retry += 1; + if err.is_retryable() && retry < self.optimistic_retry { + warn!("Failed to upgrade region, error: {err:?}, retry later"); + sleep(self.retry_initial_interval).await; + } else { + break; + } + } else { + upgraded = true; + break; + } + } + + upgraded + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::meta::mailbox_message::Payload; + use common_meta::peer::Peer; + use common_time::util::current_time_millis; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::{ + new_close_region_reply, send_mock_reply, TestingEnv, + }; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + fn new_upgrade_region_reply( + id: u64, + ready: bool, + + exists: bool, + + error: Option, + ) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready, + exists, + error, + })) + .unwrap(), + )), + } + } + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let state = UpgradeCandidateRegion::default(); + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + let err = state.upgrade_region(&ctx, instruction).await.unwrap_err(); + + assert_matches!(err, Error::PusherNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_pusher_dropped() { + let state = UpgradeCandidateRegion::default(); + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + drop(rx); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + let err = state.upgrade_region(&ctx, instruction).await.unwrap_err(); + + assert_matches!(err, Error::PushMessage { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let state = UpgradeCandidateRegion::default(); + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + let err = state.upgrade_region(&ctx, instruction).await.unwrap_err(); + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_upgrade_region_failed() { + let state = UpgradeCandidateRegion::default(); + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + // A reply contains an error. + send_mock_reply(mailbox, rx, |id| { + Ok(new_upgrade_region_reply( + id, + true, + true, + Some("test mocked".to_string()), + )) + }); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + let err = state.upgrade_region(&ctx, instruction).await.unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + assert!(err.to_string().contains("test mocked")); + } + + #[tokio::test] + async fn test_upgrade_region_not_found() { + let state = UpgradeCandidateRegion::default(); + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + send_mock_reply(mailbox, rx, |id| { + Ok(new_upgrade_region_reply(id, true, false, None)) + }); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + let err = state.upgrade_region(&ctx, instruction).await.unwrap_err(); + + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + assert!(err.to_string().contains("doesn't exist")); + } + + #[tokio::test] + async fn test_upgrade_region_require_ready() { + let mut state = UpgradeCandidateRegion { + require_ready: true, + ..Default::default() + }; + + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + send_mock_reply(mailbox, rx, |id| { + Ok(new_upgrade_region_reply(id, false, true, None)) + }); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + let err = state.upgrade_region(&ctx, instruction).await.unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + assert!(err.to_string().contains("still replaying the wal")); + + // Sets the `require_ready` to false. + state.require_ready = false; + + let mailbox = mailbox_ctx.mailbox().clone(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + send_mock_reply(mailbox, rx, |id| { + Ok(new_upgrade_region_reply(id, false, true, None)) + }); + + let instruction = &state.build_upgrade_region_instruction(&ctx); + state.upgrade_region(&ctx, instruction).await.unwrap(); + } + + #[tokio::test] + async fn test_upgrade_region_with_retry_ok() { + let mut state = Box::::default(); + state.retry_initial_interval = Duration::from_millis(100); + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), + ) + .await + .unwrap(); + + // retry: 1 + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_upgrade_region_reply(reply_id, false, true, None)), + ) + .await + .unwrap(); + + // retry: 2 + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_upgrade_region_reply(reply_id, true, true, None)), + ) + .await + .unwrap(); + }); + + let next = state.next(&mut ctx).await.unwrap(); + + let update_metadata = next.as_any().downcast_ref::().unwrap(); + + assert_matches!(update_metadata, UpdateMetadata::Upgrade); + } + + #[tokio::test] + async fn test_upgrade_region_with_retry_failed() { + let mut state = Box::::default(); + state.retry_initial_interval = Duration::from_millis(100); + let persistent_context = new_persistent_context(); + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx) + .await; + + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), + ) + .await + .unwrap(); + + // retry: 1 + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_upgrade_region_reply(reply_id, false, true, None)), + ) + .await + .unwrap(); + + // retry: 2 + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_upgrade_region_reply(reply_id, false, false, None)), + ) + .await + .unwrap(); + }); + + let next = state.next(&mut ctx).await.unwrap(); + + let update_metadata = next.as_any().downcast_ref::().unwrap(); + assert_matches!(update_metadata, UpdateMetadata::Rollback); + } +}