From 9620c77624060ea8f194a7e391215a74c4b6d38c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 16 Nov 2023 04:28:29 +0000 Subject: [PATCH] feat: add open candidate region state --- .../src/procedure/region_migration.rs | 5 +- .../region_migration/open_candidate_region.rs | 432 +++++++++++++++++- .../procedure/region_migration/test_util.rs | 54 +++ 3 files changed, 486 insertions(+), 5 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 7c91ca8a30ae..8bd5d6633955 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -36,6 +36,7 @@ use store_api::storage::RegionId; use self::migration_start::RegionMigrationStart; use crate::error::{Error, Result}; use crate::procedure::utils::region_lock_key; +use crate::service::mailbox::MailboxRef; /// It's shared in each step and available even after recovering. /// @@ -71,12 +72,14 @@ pub struct VolatileContext {} /// The context of procedure execution. pub struct Context { table_metadata_manager: TableMetadataManagerRef, + mailbox: MailboxRef, + server_addr: String, } impl Context { /// Returns address of meta server. pub fn server_addr(&self) -> &str { - todo!() + &self.server_addr } } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 16707ba01937..ffb2276d043b 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,11 +13,23 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; +use std::time::Duration; +use api::v1::meta::MailboxMessage; +use common_meta::ddl::utils::region_storage_path; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::RegionIdent; use serde::{Deserialize, Serialize}; +use snafu::{location, Location, OptionExt, ResultExt}; -use crate::error::Result; +use super::downgrade_leader_region::DowngradeLeaderRegion; +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::{Context, PersistentContext, State, VolatileContext}; +use crate::service::mailbox::Channel; + +const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_millis(2000); #[derive(Debug, Serialize, Deserialize)] pub struct OpenCandidateRegion; @@ -25,16 +37,428 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { + /// Opens the candidate region on `to_peer`. async fn next( &mut self, - _ctx: &Context, - _pc: &mut PersistentContext, + ctx: &Context, + pc: &mut PersistentContext, _vc: &mut VolatileContext, ) -> Result> { - todo!() + let instruction = self.build_open_region_instruction(ctx, pc).await?; + self.open_candidate_region(ctx, pc, instruction).await?; + + Ok(Box::new(DowngradeLeaderRegion)) } fn as_any(&self) -> &dyn Any { self } } + +impl OpenCandidateRegion { + /// Builds open region instructions + /// + /// Abort(non-retry): + /// - Table Info is not found. + async fn build_open_region_instruction( + &self, + ctx: &Context, + pc: &PersistentContext, + ) -> Result { + let cluster_id = pc.cluster_id; + let table_id = pc.region_id.table_id(); + let region_number = pc.region_id.region_number(); + let candidate = &pc.to_peer; + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableInfoNotFoundSnafu { table_id })? + .into_inner() + .table_info; + + // The region storage path is immutable after the region is created. + // Therefore, it's safe to store it in `VolatileContext` for future use. + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + + let engine = table_info.meta.engine; + let region_options: HashMap = (&table_info.meta.options).into(); + + let open_instruction = Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + cluster_id, + datanode_id: candidate.id, + table_id, + region_number, + engine, + }, + ®ion_storage_path, + region_options, + )); + + Ok(open_instruction) + } + + /// Opens the candidate region. + /// + /// Abort(non-retry): + /// - The Datanode is unreachable(e.g., Candidate pusher is not found). + /// - Unexpected instruction reply. + /// + /// Retry: + /// - Exceeded deadline of open instruction. + /// - Datanode failed to open the candidate region. + async fn open_candidate_region( + &self, + ctx: &Context, + pc: &PersistentContext, + open_instruction: Instruction, + ) -> Result<()> { + let region_id = pc.region_id; + let candidate = &pc.to_peer; + + let msg = MailboxMessage::json_message( + &format!("Open candidate region: {}", region_id), + &format!("Meta@{}", ctx.server_addr()), + &format!("Datanode-{}@{}", candidate.id, candidate.addr), + common_time::util::current_time_millis(), + &open_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: open_instruction.to_string(), + })?; + + let ch = Channel::Datanode(candidate.id); + let receiver = ctx + .mailbox + .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT) + .await?; + + // TODO(weny): Registers the opening region. + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect open region reply", + } + .fail(); + }; + + if result { + Ok(()) + } else { + error::RetryLaterSnafu { + reason: format!( + "Region {region_id} is not opened by Datanode {:?}, error: {error:?}", + candidate, + ), + } + .fail() + } + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for open candidate region {region_id} on Datanode {:?}", + candidate, + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::meta::mailbox_message::Payload; + use common_catalog::consts::MITO2_ENGINE; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::DatanodeId; + use common_time::util::current_time_millis; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::test_util::TestingEnv; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { + Instruction::OpenRegion(OpenRegion { + region_ident: RegionIdent { + cluster_id: 0, + datanode_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + engine: MITO2_ENGINE.to_string(), + }, + region_storage_path: "/bar/foo/region/".to_string(), + options: Default::default(), + }) + } + + fn new_close_region_reply(id: u64) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { + result: false, + error: None, + })) + .unwrap(), + )), + } + } + + fn new_open_region_reply(id: u64, result: bool, error: Option) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) + .unwrap(), + )), + } + } + + #[tokio::test] + async fn test_table_info_is_not_found_error() { + let state = OpenCandidateRegion; + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let err = state + .build_open_region_instruction(&ctx, &persistent_context) + .await + .unwrap_err(); + + assert_matches!(err, Error::TableInfoNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let ctx = env.context(); + + let open_instruction = + new_mock_open_instruction(persistent_context.to_peer.id, persistent_context.region_id); + let err = state + .open_candidate_region(&ctx, &persistent_context, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::PusherNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let mut env = TestingEnv::new(); + let ctx = env.context(); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + let to_peer_id = persistent_context.to_peer.id; + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + // Sends an incorrect reply. + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv(reply_id, Ok(new_close_region_reply(reply_id))) + .await + .unwrap(); + }); + + let open_instruction = new_mock_open_instruction(to_peer_id, persistent_context.region_id); + let err = state + .open_candidate_region(&ctx, &persistent_context, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_instruction_exceeded_deadline() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let mut env = TestingEnv::new(); + let ctx = env.context(); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + let to_peer_id = persistent_context.to_peer.id; + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + // Sends an timeout error. + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), + ) + .await + .unwrap(); + }); + + let open_instruction = new_mock_open_instruction(to_peer_id, persistent_context.region_id); + let err = state + .open_candidate_region(&ctx, &persistent_context, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + #[tokio::test] + async fn test_open_candidate_region_failed() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let mut env = TestingEnv::new(); + let ctx = env.context(); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + let to_peer_id = persistent_context.to_peer.id; + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_open_region_reply( + reply_id, + false, + Some("test mocked".to_string()), + )), + ) + .await + .unwrap(); + }); + + let open_instruction = new_mock_open_instruction(to_peer_id, persistent_context.region_id); + let err = state + .open_candidate_region(&ctx, &persistent_context, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + assert!(err.to_string().contains("test mocked")); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(OpenCandidateRegion); + // from_peer: 1 + // to_peer: 2 + let mut persistent_context = new_persistent_context(); + let mut volatile_context = VolatileContext::default(); + let mut env = TestingEnv::new(); + + // Prepares table + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let ctx = env.context(); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + let to_peer_id = persistent_context.to_peer.id; + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv(reply_id, Ok(new_open_region_reply(reply_id, true, None))) + .await + .unwrap(); + }); + + let next = state + .next(&ctx, &mut persistent_context, &mut volatile_context) + .await + .unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 436645f4dc62..dfaf990f61b8 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -14,16 +14,57 @@ use std::sync::Arc; +use api::v1::meta::{HeartbeatResponse, RequestHeader}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::sequence::Sequence; +use common_meta::DatanodeId; use common_procedure::{Context as ProcedureContext, ProcedureId}; use common_procedure_test::MockContextProvider; +use tokio::sync::mpsc::Sender; +use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::procedure::region_migration::Context; +use crate::service::mailbox::{Channel, MailboxRef}; + +// TODO(weny): remove it. +#[allow(dead_code)] +/// The context of mailbox. +pub struct MailboxContext { + mailbox: MailboxRef, + // The pusher is used in the mailbox. + pushers: Pushers, +} + +impl MailboxContext { + pub fn new(sequence: Sequence) -> Self { + let pushers = Pushers::default(); + let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence); + + Self { mailbox, pushers } + } + + /// Inserts a pusher for `datanode_id` + pub async fn insert_heartbeat_response_receiver( + &mut self, + datanode_id: DatanodeId, + tx: Sender>, + ) { + let pusher_id = Channel::Datanode(datanode_id).pusher_id(); + let pusher = Pusher::new(tx, &RequestHeader::default()); + let _ = self.pushers.insert(pusher_id, pusher).await; + } + + pub fn mailbox(&self) -> &MailboxRef { + &self.mailbox + } +} /// `TestingEnv` provides components during the tests. pub struct TestingEnv { table_metadata_manager: TableMetadataManagerRef, + mailbox_ctx: MailboxContext, + server_addr: String, } impl TestingEnv { @@ -32,8 +73,14 @@ impl TestingEnv { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone()); + + let mailbox_ctx = MailboxContext::new(mailbox_sequence); + Self { table_metadata_manager, + mailbox_ctx, + server_addr: "localhost".to_string(), } } @@ -41,9 +88,16 @@ impl TestingEnv { pub fn context(&self) -> Context { Context { table_metadata_manager: self.table_metadata_manager.clone(), + mailbox: self.mailbox_ctx.mailbox().clone(), + server_addr: self.server_addr.to_string(), } } + /// Returns the mutable [MailboxContext]. + pub fn mailbox_context(&mut self) -> &mut MailboxContext { + &mut self.mailbox_ctx + } + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager }