From cce5edc88e91f5279ee90a17e72970db9097ad2e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 29 Nov 2023 18:17:28 +0900 Subject: [PATCH] feat: add downgrade leader region step (#2792) * feat: add downgrade leader region step * chore: apply suggestions from CR * chore: rename exist to exists * chore: apply suggestions from CR --- src/common/meta/src/instruction.rs | 38 ++ src/datanode/src/heartbeat/handler.rs | 12 + .../src/procedure/region_migration.rs | 26 + .../downgrade_leader_region.rs | 491 +++++++++++++++++- .../region_migration/migration_start.rs | 2 +- .../region_migration/open_candidate_region.rs | 81 +-- .../procedure/region_migration/test_util.rs | 39 +- .../region_migration/update_metadata.rs | 7 +- .../upgrade_candidate_region.rs | 36 ++ 9 files changed, 661 insertions(+), 71 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 5119eedc2e62..7e787b41433e 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -48,6 +48,27 @@ impl Display for RegionIdent { } } +/// The result of downgrade leader region. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct DowngradeRegionReply { + /// Returns the `last_entry_id` if available. + pub last_entry_id: Option, + /// Indicates whether the region exists. + pub exists: bool, + /// Return error if any during the operation. + pub error: Option, +} + +impl Display for DowngradeRegionReply { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "(last_entry_id={:?}, exists={}, error={:?})", + self.last_entry_id, self.exists, self.error + ) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct SimpleReply { pub result: bool, @@ -87,10 +108,23 @@ impl OpenRegion { } } +/// The instruction of downgrading leader region. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DowngradeRegion { + pub region_id: RegionId, +} + +impl Display for DowngradeRegion { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DowngradeRegion(region_id={})", self.region_id) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Display)] pub enum Instruction { OpenRegion(OpenRegion), CloseRegion(RegionIdent), + DowngradeRegion(DowngradeRegion), InvalidateTableIdCache(TableId), InvalidateTableNameCache(TableName), } @@ -101,6 +135,7 @@ pub enum InstructionReply { OpenRegion(SimpleReply), CloseRegion(SimpleReply), InvalidateTableCache(SimpleReply), + DowngradeRegion(DowngradeRegionReply), } impl Display for InstructionReply { @@ -111,6 +146,9 @@ impl Display for InstructionReply { Self::InvalidateTableCache(reply) => { write!(f, "InstructionReply::Invalidate({})", reply) } + Self::DowngradeRegion(reply) => { + write!(f, "InstructionReply::DowngradeRegion({})", reply) + } } } } diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index c4a2d57d07ad..9985fd6cf40e 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -65,6 +65,10 @@ impl RegionHeartbeatResponseHandler { Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { InvalidHeartbeatResponseSnafu.fail() } + Instruction::DowngradeRegion(_) => { + // TODO(weny): add it later. + todo!() + } } } @@ -88,6 +92,10 @@ impl RegionHeartbeatResponseHandler { error: None, }) } + Instruction::DowngradeRegion(_) => { + // TODO(weny): add it later. + todo!() + } } } @@ -114,6 +122,10 @@ impl RegionHeartbeatResponseHandler { reply.result = success; reply.error = error; } + InstructionReply::DowngradeRegion(_) => { + // TODO(weny): add it later. + todo!() + } } template diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index cabd1f7805ab..295aab78acbd 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -19,9 +19,11 @@ pub(crate) mod open_candidate_region; #[cfg(test)] pub(crate) mod test_util; pub(crate) mod update_metadata; +pub(crate) mod upgrade_candidate_region; use std::any::Any; use std::fmt::Debug; +use std::time::Duration; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; @@ -34,6 +36,7 @@ use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use serde::{Deserialize, Serialize}; use snafu::{location, Location, OptionExt, ResultExt}; use store_api::storage::RegionId; +use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Error, Result}; @@ -80,6 +83,29 @@ pub struct VolatileContext { opening_region_guard: Option, /// `table_route_info` is stored via previous steps for future use. table_route_info: Option>, + /// The deadline of leader region lease. + leader_region_lease_deadline: Option, + /// The last_entry_id of leader region. + leader_region_last_entry_id: Option, +} + +impl VolatileContext { + /// Sets the `leader_region_lease_deadline` if it does not exist. + pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) { + if self.leader_region_lease_deadline.is_none() { + self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout); + } + } + + /// Resets the `leader_region_lease_deadline`. + pub fn reset_leader_region_lease_deadline(&mut self) { + self.leader_region_lease_deadline = None; + } + + /// Sets the `leader_region_last_entry_id`. + pub fn set_last_entry_id(&mut self, last_entry_id: u64) { + self.leader_region_last_entry_id = Some(last_entry_id) + } } /// Used to generate new [Context]. diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index c0ff94330723..2c6a25fa6630 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -13,23 +13,506 @@ // limitations under the License. use std::any::Any; +use std::time::Duration; +use api::v1::meta::MailboxMessage; +use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::instruction::{ + DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, +}; +use common_telemetry::warn; use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use tokio::time::sleep; -use crate::error::Result; +use super::upgrade_candidate_region::UpgradeCandidateRegion; +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::{Context, State}; +use crate::service::mailbox::Channel; + +const DOWNGRADE_LEADER_REGION_TIMEOUT: Duration = Duration::from_secs(1); #[derive(Debug, Serialize, Deserialize)] -pub struct DowngradeLeaderRegion; +pub struct DowngradeLeaderRegion { + // The optimistic retry times. + optimistic_retry: usize, + // The retry initial interval. + retry_initial_interval: Duration, +} + +impl Default for DowngradeLeaderRegion { + fn default() -> Self { + Self { + optimistic_retry: 3, + retry_initial_interval: Duration::from_millis(500), + } + } +} #[async_trait::async_trait] #[typetag::serde] impl State for DowngradeLeaderRegion { - async fn next(&mut self, _ctx: &mut Context) -> Result> { - todo!() + async fn next(&mut self, ctx: &mut Context) -> Result> { + // Ensures the `leader_region_lease_deadline` must exist after recovering. + ctx.volatile_ctx + .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); + self.downgrade_region_with_retry(ctx).await; + + // Safety: must exist. + if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() { + tokio::time::sleep_until(*deadline).await; + } + + Ok(Box::new(UpgradeCandidateRegion)) } fn as_any(&self) -> &dyn Any { self } } + +impl DowngradeLeaderRegion { + /// Builds downgrade region instruction. + fn build_downgrade_region_instruction(&self, ctx: &Context) -> Instruction { + let pc = &ctx.persistent_ctx; + let region_id = pc.region_id; + Instruction::DowngradeRegion(DowngradeRegion { region_id }) + } + + /// Tries to downgrade a leader region. + /// + /// Retry: + /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout. + /// - Failed to downgrade region on the Datanode. + /// + /// Abort: + /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable. + /// - [PushMessage](error::Error::PushMessage), The receiver is dropped. + /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible). + /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply). + /// - Invalid JSON. + async fn downgrade_region( + &self, + ctx: &mut Context, + downgrade_instruction: &Instruction, + ) -> Result<()> { + let pc = &ctx.persistent_ctx; + let region_id = pc.region_id; + let leader = &pc.from_peer; + + let msg = MailboxMessage::json_message( + &format!("Downgrade leader region: {}", region_id), + &format!("Meta@{}", ctx.server_addr()), + &format!("Datanode-{}@{}", leader.id, leader.addr), + common_time::util::current_time_millis(), + downgrade_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: downgrade_instruction.to_string(), + })?; + + let ch = Channel::Datanode(leader.id); + let receiver = ctx + .mailbox + .send(&ch, msg, DOWNGRADE_LEADER_REGION_TIMEOUT) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + let InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id, + exists, + error, + }) = reply + else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect downgrade region reply", + } + .fail(); + }; + + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to downgrade the region {} on Datanode {:?}, error: {:?}", + region_id, leader, error + ), + } + .fail(); + } + + if !exists { + warn!( + "Trying to downgrade the region {} on Datanode {}, but region doesn't exist!", + region_id, leader + ); + } + + if let Some(last_entry_id) = last_entry_id { + ctx.volatile_ctx.set_last_entry_id(last_entry_id); + } + + Ok(()) + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for downgrade leader region {region_id} on Datanode {:?}", + leader, + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(err) => Err(err), + } + } + + /// Downgrades a leader region. + /// + /// Fast path: + /// - Waits for the reply of downgrade instruction. + /// + /// Slow path: + /// - Waits for the lease of the leader region expired. + async fn downgrade_region_with_retry(&self, ctx: &mut Context) { + let instruction = self.build_downgrade_region_instruction(ctx); + + let mut retry = 0; + + loop { + if let Err(err) = self.downgrade_region(ctx, &instruction).await { + retry += 1; + if err.is_retryable() && retry < self.optimistic_retry { + warn!("Failed to downgrade region, error: {err:?}, retry later"); + sleep(self.retry_initial_interval).await; + } else { + break; + } + } else { + // Resets the deadline. + ctx.volatile_ctx.reset_leader_region_lease_deadline(); + break; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::meta::mailbox_message::Payload; + use common_meta::peer::Peer; + use common_time::util::current_time_millis; + use store_api::storage::RegionId; + use tokio::time::Instant; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::{ + new_close_region_reply, send_mock_reply, TestingEnv, + }; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + fn new_downgrade_region_reply( + id: u64, + last_entry_id: Option, + exist: bool, + error: Option, + ) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id, + exists: exist, + error, + })) + .unwrap(), + )), + } + } + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let state = DowngradeLeaderRegion::default(); + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let instruction = &state.build_downgrade_region_instruction(&ctx); + let err = state + .downgrade_region(&mut ctx, instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::PusherNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_pusher_dropped() { + let state = DowngradeLeaderRegion::default(); + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + drop(rx); + + let instruction = &state.build_downgrade_region_instruction(&ctx); + let err = state + .downgrade_region(&mut ctx, instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::PushMessage { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let state = DowngradeLeaderRegion::default(); + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + // Sends an incorrect reply. + send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); + + let instruction = &state.build_downgrade_region_instruction(&ctx); + let err = state + .downgrade_region(&mut ctx, instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_instruction_exceeded_deadline() { + let state = DowngradeLeaderRegion::default(); + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + send_mock_reply(mailbox, rx, |id| { + Err(error::MailboxTimeoutSnafu { id }.build()) + }); + + let instruction = &state.build_downgrade_region_instruction(&ctx); + let err = state + .downgrade_region(&mut ctx, instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + #[tokio::test] + async fn test_downgrade_region_failed() { + let state = DowngradeLeaderRegion::default(); + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + send_mock_reply(mailbox, rx, |id| { + Ok(new_downgrade_region_reply( + id, + None, + false, + Some("test mocked".to_string()), + )) + }); + + let instruction = &state.build_downgrade_region_instruction(&ctx); + let err = state + .downgrade_region(&mut ctx, instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + assert!(err.to_string().contains("test mocked")); + } + + #[tokio::test] + async fn test_downgrade_region_with_retry_fast_path() { + let state = DowngradeLeaderRegion::default(); + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + common_runtime::spawn_bg(async move { + // retry: 0. + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), + ) + .await + .unwrap(); + + // retry: 1. + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)), + ) + .await + .unwrap(); + }); + + state.downgrade_region_with_retry(&mut ctx).await; + assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); + assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none()); + } + + #[tokio::test] + async fn test_downgrade_region_with_retry_slow_path() { + let state = DowngradeLeaderRegion { + optimistic_retry: 3, + retry_initial_interval: Duration::from_millis(100), + }; + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + common_runtime::spawn_bg(async move { + for _ in 0..3 { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), + ) + .await + .unwrap(); + } + }); + + ctx.volatile_ctx + .set_leader_region_lease_deadline(Duration::from_secs(5)); + let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap(); + state.downgrade_region_with_retry(&mut ctx).await; + assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None); + // Should remain no change. + assert_eq!( + ctx.volatile_ctx.leader_region_lease_deadline.unwrap(), + expected_deadline + ) + } + + #[tokio::test] + async fn test_next_upgrade_candidate_state() { + let mut state = Box::::default(); + let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(from_peer_id, tx) + .await; + + send_mock_reply(mailbox, rx, |id| { + Ok(new_downgrade_region_reply(id, Some(1), true, None)) + }); + + let timer = Instant::now(); + let next = state.next(&mut ctx).await.unwrap(); + let elapsed = timer.elapsed().as_secs(); + assert!(elapsed < REGION_LEASE_SECS / 2); + assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); + assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none()); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index ab61da316c8d..47eb0abb980f 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -47,7 +47,7 @@ impl State for RegionMigrationStart { if self.check_leader_region_on_peer(®ion_route, to_peer)? { Ok(Box::new(RegionMigrationEnd)) } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { - Ok(Box::new(DowngradeLeaderRegion)) + Ok(Box::::default()) } else { Ok(Box::new(OpenCandidateRegion)) } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 42e057b58f47..fc4d9c7d9a45 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -41,7 +41,7 @@ impl State for OpenCandidateRegion { let instruction = self.build_open_region_instruction(ctx).await?; self.open_candidate_region(ctx, instruction).await?; - Ok(Box::new(DowngradeLeaderRegion)) + Ok(Box::::default()) } fn as_any(&self) -> &dyn Any { @@ -197,7 +197,9 @@ mod tests { use super::*; use crate::error::Error; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; - use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::test_util::{ + new_close_region_reply, send_mock_reply, TestingEnv, + }; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; fn new_persistent_context() -> PersistentContext { @@ -223,23 +225,6 @@ mod tests { }) } - fn new_close_region_reply(id: u64) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { - result: false, - error: None, - })) - .unwrap(), - )), - } - } - fn new_open_region_reply(id: u64, result: bool, error: Option) -> MailboxMessage { MailboxMessage { id, @@ -328,21 +313,14 @@ mod tests { let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx .insert_heartbeat_response_receiver(to_peer_id, tx) .await; // Sends an incorrect reply. - common_runtime::spawn_bg(async move { - let resp = rx.recv().await.unwrap().unwrap(); - let reply_id = resp.mailbox_message.unwrap().id; - mailbox - .on_recv(reply_id, Ok(new_close_region_reply(reply_id))) - .await - .unwrap(); - }); + send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); let open_instruction = new_mock_open_instruction(to_peer_id, region_id); let err = state @@ -368,23 +346,15 @@ mod tests { let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx .insert_heartbeat_response_receiver(to_peer_id, tx) .await; // Sends an timeout error. - common_runtime::spawn_bg(async move { - let resp = rx.recv().await.unwrap().unwrap(); - let reply_id = resp.mailbox_message.unwrap().id; - mailbox - .on_recv( - reply_id, - Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), - ) - .await - .unwrap(); + send_mock_reply(mailbox, rx, |id| { + Err(error::MailboxTimeoutSnafu { id }.build()) }); let open_instruction = new_mock_open_instruction(to_peer_id, region_id); @@ -411,26 +381,18 @@ mod tests { let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx .insert_heartbeat_response_receiver(to_peer_id, tx) .await; - common_runtime::spawn_bg(async move { - let resp = rx.recv().await.unwrap().unwrap(); - let reply_id = resp.mailbox_message.unwrap().id; - mailbox - .on_recv( - reply_id, - Ok(new_open_region_reply( - reply_id, - false, - Some("test mocked".to_string()), - )), - ) - .await - .unwrap(); + send_mock_reply(mailbox, rx, |id| { + Ok(new_open_region_reply( + id, + false, + Some("test mocked".to_string()), + )) }); let open_instruction = new_mock_open_instruction(to_peer_id, region_id); @@ -471,20 +433,13 @@ mod tests { let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); - let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); mailbox_ctx .insert_heartbeat_response_receiver(to_peer_id, tx) .await; - common_runtime::spawn_bg(async move { - let resp = rx.recv().await.unwrap().unwrap(); - let reply_id = resp.mailbox_message.unwrap().id; - mailbox - .on_recv(reply_id, Ok(new_open_region_reply(reply_id, true, None))) - .await - .unwrap(); - }); + send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None))); let next = state.next(&mut ctx).await.unwrap(); let vc = ctx.volatile_ctx; diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index d880677af03c..277c9f8d90fa 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -14,20 +14,26 @@ use std::sync::Arc; -use api::v1::meta::{HeartbeatResponse, RequestHeader}; +use api::v1::meta::mailbox_message::Payload; +use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; +use common_meta::instruction::{InstructionReply, SimpleReply}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::Sequence; use common_meta::DatanodeId; use common_procedure::{Context as ProcedureContext, ProcedureId}; use common_procedure_test::MockContextProvider; -use tokio::sync::mpsc::Sender; +use common_time::util::current_time_millis; +use tokio::sync::mpsc::{Receiver, Sender}; use super::ContextFactoryImpl; +use crate::error::Result; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef}; use crate::service::mailbox::{Channel, MailboxRef}; +pub type MockHeartbeatReceiver = Receiver>; + /// The context of mailbox. pub struct MailboxContext { mailbox: MailboxRef, @@ -120,3 +126,32 @@ impl TestingEnv { } } } + +pub fn new_close_region_reply(id: u64) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { + result: false, + error: None, + })) + .unwrap(), + )), + } +} + +pub fn send_mock_reply( + mailbox: MailboxRef, + mut rx: MockHeartbeatReceiver, + msg: impl FnOnce(u64) -> Result + Send + 'static, +) { + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); + }); +} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index f41b66f4c09e..9c8d4b85b7e5 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::any::Any; +use std::time::Duration; +use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::rpc::router::RegionStatus; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -36,7 +38,7 @@ impl State for UpdateMetadata { UpdateMetadata::Downgrade => { self.downgrade_leader_region(ctx).await?; - Ok(Box::new(DowngradeLeaderRegion)) + Ok(Box::::default()) } } } @@ -88,6 +90,9 @@ impl UpdateMetadata { debug_assert!(ctx.remove_table_route_value()); + ctx.volatile_ctx + .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); + Ok(()) } } diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs new file mode 100644 index 000000000000..8b15a0730f7a --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::region_migration::{Context, State}; +#[derive(Debug, Serialize, Deserialize)] +pub struct UpgradeCandidateRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for UpgradeCandidateRegion { + async fn next(&mut self, _ctx: &mut Context) -> Result> { + todo!(); + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl UpgradeCandidateRegion {}