diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 621a68243036..649383f06d1b 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -16,9 +16,11 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_meta::peer::Peer; +use common_meta::DatanodeId; use common_runtime::JoinError; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; +use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; @@ -29,6 +31,17 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display( + "Another procedure is opening the region: {} on peer: {}", + region_id, + peer_id + ))] + RegionOpeningRace { + location: Location, + peer_id: DatanodeId, + region_id: RegionId, + }, + #[snafu(display("Failed to create default catalog and schema"))] InitMetadata { location: Location, @@ -625,7 +638,8 @@ impl ErrorExt for Error { | Error::UnexpectedInstructionReply { .. } | Error::Unexpected { .. } | Error::Txn { .. } - | Error::TableIdChanged { .. } => StatusCode::Unexpected, + | Error::TableIdChanged { .. } + | Error::RegionOpeningRace { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 8ec21826952e..7bde0aa54b3e 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -36,6 +36,8 @@ use store_api::storage::RegionId; use self::migration_start::RegionMigrationStart; use crate::error::{Error, Result}; use crate::procedure::utils::region_lock_key; +use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef}; +use crate::service::mailbox::MailboxRef; /// It's shared in each step and available even after recovering. /// @@ -66,7 +68,15 @@ impl PersistentContext { /// /// The additional remote fetches are only required in the worst cases. #[derive(Debug, Clone, Default)] -pub struct VolatileContext {} +pub struct VolatileContext { + /// `opening_region_guard` will be set after the + /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step. + /// + /// `opening_region_guard` should be consumed after + /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region + /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) . + opening_region_guard: Option, +} /// Used to generate new [Context]. pub trait ContextFactory { @@ -77,6 +87,9 @@ pub trait ContextFactory { pub struct ContextFactoryImpl { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, + mailbox: MailboxRef, + server_addr: String, } impl ContextFactory for ContextFactoryImpl { @@ -85,6 +98,9 @@ impl ContextFactory for ContextFactoryImpl { persistent_ctx, volatile_ctx: self.volatile_ctx, table_metadata_manager: self.table_metadata_manager, + opening_region_keeper: self.opening_region_keeper, + mailbox: self.mailbox, + server_addr: self.server_addr, } } } @@ -96,12 +112,15 @@ pub struct Context { persistent_ctx: PersistentContext, volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, + mailbox: MailboxRef, + server_addr: String, } impl Context { /// Returns address of meta server. pub fn server_addr(&self) -> &str { - todo!() + &self.server_addr } } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index c056ec741601..42e057b58f47 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,11 +13,23 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; +use std::time::Duration; +use api::v1::meta::MailboxMessage; +use common_meta::ddl::utils::region_storage_path; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::RegionIdent; use serde::{Deserialize, Serialize}; +use snafu::{location, Location, OptionExt, ResultExt}; -use crate::error::Result; +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::{Context, State}; +use crate::service::mailbox::Channel; + +const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(1); #[derive(Debug, Serialize, Deserialize)] pub struct OpenCandidateRegion; @@ -25,11 +37,465 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { - async fn next(&mut self, _ctx: &mut Context) -> Result> { - todo!() + async fn next(&mut self, ctx: &mut Context) -> Result> { + let instruction = self.build_open_region_instruction(ctx).await?; + self.open_candidate_region(ctx, instruction).await?; + + Ok(Box::new(DowngradeLeaderRegion)) } fn as_any(&self) -> &dyn Any { self } } + +impl OpenCandidateRegion { + /// Builds open region instructions + /// + /// Abort(non-retry): + /// - Table Info is not found. + async fn build_open_region_instruction(&self, ctx: &Context) -> Result { + let pc = &ctx.persistent_ctx; + let cluster_id = pc.cluster_id; + let table_id = pc.region_id.table_id(); + let region_number = pc.region_id.region_number(); + let candidate = &pc.to_peer; + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(|e| error::Error::RetryLater { + reason: e.to_string(), + location: location!(), + })? + .context(error::TableInfoNotFoundSnafu { table_id })? + .into_inner() + .table_info; + + // The region storage path is immutable after the region is created. + // Therefore, it's safe to store it in `VolatileContext` for future use. + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + + let engine = table_info.meta.engine; + let region_options: HashMap = (&table_info.meta.options).into(); + + let open_instruction = Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + cluster_id, + datanode_id: candidate.id, + table_id, + region_number, + engine, + }, + ®ion_storage_path, + region_options, + )); + + Ok(open_instruction) + } + + /// Opens the candidate region. + /// + /// Abort(non-retry): + /// - The Datanode is unreachable(e.g., Candidate pusher is not found). + /// - Unexpected instruction reply. + /// - Another procedure is opening the candidate region. + /// + /// Retry: + /// - Exceeded deadline of open instruction. + /// - Datanode failed to open the candidate region. + async fn open_candidate_region( + &self, + ctx: &mut Context, + open_instruction: Instruction, + ) -> Result<()> { + let pc = &ctx.persistent_ctx; + let vc = &mut ctx.volatile_ctx; + let region_id = pc.region_id; + let candidate = &pc.to_peer; + + // Registers the opening region. + let guard = ctx + .opening_region_keeper + .register(candidate.id, region_id) + .context(error::RegionOpeningRaceSnafu { + peer_id: candidate.id, + region_id, + })?; + + debug_assert!(vc.opening_region_guard.is_none()); + vc.opening_region_guard = Some(guard); + + let msg = MailboxMessage::json_message( + &format!("Open candidate region: {}", region_id), + &format!("Meta@{}", ctx.server_addr()), + &format!("Datanode-{}@{}", candidate.id, candidate.addr), + common_time::util::current_time_millis(), + &open_instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: open_instruction.to_string(), + })?; + + let ch = Channel::Datanode(candidate.id); + let receiver = ctx + .mailbox + .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect open region reply", + } + .fail(); + }; + + if result { + Ok(()) + } else { + error::RetryLaterSnafu { + reason: format!( + "Region {region_id} is not opened by Datanode {:?}, error: {error:?}", + candidate, + ), + } + .fail() + } + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for open candidate region {region_id} on Datanode {:?}", + candidate, + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use api::v1::meta::mailbox_message::Payload; + use common_catalog::consts::MITO2_ENGINE; + use common_meta::key::test_utils::new_test_table_info; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::DatanodeId; + use common_time::util::current_time_millis; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; + use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + + fn new_persistent_context() -> PersistentContext { + PersistentContext { + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_id: RegionId::new(1024, 1), + cluster_id: 0, + } + } + + fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { + Instruction::OpenRegion(OpenRegion { + region_ident: RegionIdent { + cluster_id: 0, + datanode_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + engine: MITO2_ENGINE.to_string(), + }, + region_storage_path: "/bar/foo/region/".to_string(), + options: Default::default(), + }) + } + + fn new_close_region_reply(id: u64) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { + result: false, + error: None, + })) + .unwrap(), + )), + } + } + + fn new_open_region_reply(id: u64, result: bool, error: Option) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) + .unwrap(), + )), + } + } + + #[tokio::test] + async fn test_table_info_is_not_found_error() { + let state = OpenCandidateRegion; + let persistent_context = new_persistent_context(); + let env = TestingEnv::new(); + let ctx = env.context_factory().new_context(persistent_context); + + let err = state.build_open_region_instruction(&ctx).await.unwrap_err(); + + assert_matches!(err, Error::TableInfoNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let open_instruction = new_mock_open_instruction(to_peer_id, region_id); + let err = state + .open_candidate_region(&mut ctx, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::PusherNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_candidate_region_opening_error() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let opening_region_keeper = env.opening_region_keeper(); + let _guard = opening_region_keeper + .register(to_peer_id, region_id) + .unwrap(); + + let open_instruction = new_mock_open_instruction(to_peer_id, region_id); + let err = state + .open_candidate_region(&mut ctx, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RegionOpeningRace { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + // Sends an incorrect reply. + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv(reply_id, Ok(new_close_region_reply(reply_id))) + .await + .unwrap(); + }); + + let open_instruction = new_mock_open_instruction(to_peer_id, region_id); + let err = state + .open_candidate_region(&mut ctx, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_instruction_exceeded_deadline() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + + let mut env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + // Sends an timeout error. + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Err(error::MailboxTimeoutSnafu { id: reply_id }.build()), + ) + .await + .unwrap(); + }); + + let open_instruction = new_mock_open_instruction(to_peer_id, region_id); + let err = state + .open_candidate_region(&mut ctx, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + #[tokio::test] + async fn test_open_candidate_region_failed() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + let mut env = TestingEnv::new(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv( + reply_id, + Ok(new_open_region_reply( + reply_id, + false, + Some("test mocked".to_string()), + )), + ) + .await + .unwrap(); + }); + + let open_instruction = new_mock_open_instruction(to_peer_id, region_id); + let err = state + .open_candidate_region(&mut ctx, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + assert!(err.to_string().contains("test mocked")); + } + + #[tokio::test] + async fn test_next_downgrade_leader_region_state() { + let mut state = Box::new(OpenCandidateRegion); + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + let mut env = TestingEnv::new(); + + // Prepares table + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(persistent_context.region_id), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.table_metadata_manager() + .create_table_metadata(table_info, region_routes) + .await + .unwrap(); + + let mut ctx = env.context_factory().new_context(persistent_context); + let mailbox_ctx = env.mailbox_context(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(to_peer_id, tx) + .await; + + common_runtime::spawn_bg(async move { + let resp = rx.recv().await.unwrap().unwrap(); + let reply_id = resp.mailbox_message.unwrap().id; + mailbox + .on_recv(reply_id, Ok(new_open_region_reply(reply_id, true, None))) + .await + .unwrap(); + }); + + let next = state.next(&mut ctx).await.unwrap(); + let vc = ctx.volatile_ctx; + assert_eq!( + vc.opening_region_guard.unwrap().info(), + (to_peer_id, region_id) + ); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index cfac3b4cb789..d880677af03c 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -14,16 +14,57 @@ use std::sync::Arc; +use api::v1::meta::{HeartbeatResponse, RequestHeader}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::sequence::Sequence; +use common_meta::DatanodeId; use common_procedure::{Context as ProcedureContext, ProcedureId}; use common_procedure_test::MockContextProvider; +use tokio::sync::mpsc::Sender; use super::ContextFactoryImpl; +use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef}; +use crate::service::mailbox::{Channel, MailboxRef}; + +/// The context of mailbox. +pub struct MailboxContext { + mailbox: MailboxRef, + // The pusher is used in the mailbox. + pushers: Pushers, +} + +impl MailboxContext { + pub fn new(sequence: Sequence) -> Self { + let pushers = Pushers::default(); + let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence); + + Self { mailbox, pushers } + } + + /// Inserts a pusher for `datanode_id` + pub async fn insert_heartbeat_response_receiver( + &mut self, + datanode_id: DatanodeId, + tx: Sender>, + ) { + let pusher_id = Channel::Datanode(datanode_id).pusher_id(); + let pusher = Pusher::new(tx, &RequestHeader::default()); + let _ = self.pushers.insert(pusher_id, pusher).await; + } + + pub fn mailbox(&self) -> &MailboxRef { + &self.mailbox + } +} /// `TestingEnv` provides components during the tests. pub struct TestingEnv { table_metadata_manager: TableMetadataManagerRef, + mailbox_ctx: MailboxContext, + opening_region_keeper: OpeningRegionKeeperRef, + server_addr: String, } impl TestingEnv { @@ -32,8 +73,16 @@ impl TestingEnv { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone()); + + let mailbox_ctx = MailboxContext::new(mailbox_sequence); + let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); + Self { table_metadata_manager, + opening_region_keeper, + mailbox_ctx, + server_addr: "localhost".to_string(), } } @@ -41,14 +90,28 @@ impl TestingEnv { pub fn context_factory(&self) -> ContextFactoryImpl { ContextFactoryImpl { table_metadata_manager: self.table_metadata_manager.clone(), + opening_region_keeper: self.opening_region_keeper.clone(), volatile_ctx: Default::default(), + mailbox: self.mailbox_ctx.mailbox().clone(), + server_addr: self.server_addr.to_string(), } } + /// Returns the mutable [MailboxContext]. + pub fn mailbox_context(&mut self) -> &mut MailboxContext { + &mut self.mailbox_ctx + } + + /// Returns the [TableMetadataManagerRef] pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } + /// Returns the [OpeningRegionKeeperRef] + pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef { + &self.opening_region_keeper + } + /// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider]. pub fn procedure_context() -> ProcedureContext { ProcedureContext { diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 993f40c08181..f9471f7e07e1 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -171,6 +171,13 @@ impl Drop for OpeningRegionGuard { } } +impl OpeningRegionGuard { + /// Returns opening region info. + pub fn info(&self) -> (DatanodeId, RegionId) { + (self.datanode_id, self.region_id) + } +} + pub type OpeningRegionKeeperRef = Arc; #[derive(Debug, Clone, Default)]