diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 621a68243036..bfbe2497ae14 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -16,9 +16,11 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_meta::peer::Peer; +use common_meta::DatanodeId; use common_runtime::JoinError; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; +use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; @@ -29,6 +31,17 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display( + "Another procedure is opening the region: {} on peer: {}", + region_id, + peer_id + ))] + RegionOpening { + location: Location, + peer_id: DatanodeId, + region_id: RegionId, + }, + #[snafu(display("Failed to create default catalog and schema"))] InitMetadata { location: Location, @@ -625,7 +638,8 @@ impl ErrorExt for Error { | Error::UnexpectedInstructionReply { .. } | Error::Unexpected { .. } | Error::Txn { .. } - | Error::TableIdChanged { .. } => StatusCode::Unexpected, + | Error::TableIdChanged { .. } + | Error::RegionOpening { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidateTableCache { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index a12a39a0571e..7bde0aa54b3e 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -36,6 +36,7 @@ use store_api::storage::RegionId; use self::migration_start::RegionMigrationStart; use crate::error::{Error, Result}; use crate::procedure::utils::region_lock_key; +use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef}; use crate::service::mailbox::MailboxRef; /// It's shared in each step and available even after recovering. @@ -67,7 +68,15 @@ impl PersistentContext { /// /// The additional remote fetches are only required in the worst cases. #[derive(Debug, Clone, Default)] -pub struct VolatileContext {} +pub struct VolatileContext { + /// `opening_region_guard` will be set after the + /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step. + /// + /// `opening_region_guard` should be consumed after + /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region + /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) . + opening_region_guard: Option, +} /// Used to generate new [Context]. pub trait ContextFactory { @@ -78,6 +87,7 @@ pub trait ContextFactory { pub struct ContextFactoryImpl { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, mailbox: MailboxRef, server_addr: String, } @@ -88,6 +98,7 @@ impl ContextFactory for ContextFactoryImpl { persistent_ctx, volatile_ctx: self.volatile_ctx, table_metadata_manager: self.table_metadata_manager, + opening_region_keeper: self.opening_region_keeper, mailbox: self.mailbox, server_addr: self.server_addr, } @@ -101,6 +112,7 @@ pub struct Context { persistent_ctx: PersistentContext, volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, + opening_region_keeper: OpeningRegionKeeperRef, mailbox: MailboxRef, server_addr: String, } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 7243295e30fb..89158363a175 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -102,19 +102,33 @@ impl OpenCandidateRegion { /// Abort(non-retry): /// - The Datanode is unreachable(e.g., Candidate pusher is not found). /// - Unexpected instruction reply. + /// - Another procedure is opening the candidate region. /// /// Retry: /// - Exceeded deadline of open instruction. /// - Datanode failed to open the candidate region. async fn open_candidate_region( &self, - ctx: &Context, + ctx: &mut Context, open_instruction: Instruction, ) -> Result<()> { let pc = &ctx.persistent_ctx; + let vc = &mut ctx.volatile_ctx; let region_id = pc.region_id; let candidate = &pc.to_peer; + // Registers the opening region. + let guard = ctx + .opening_region_keeper + .register(candidate.id, region_id) + .context(error::RegionOpeningSnafu { + peer_id: candidate.id, + region_id, + })?; + + debug_assert!(vc.opening_region_guard.is_none()); + vc.opening_region_guard = Some(guard); + let msg = MailboxMessage::json_message( &format!("Open candidate region: {}", region_id), &format!("Meta@{}", ctx.server_addr()), @@ -132,8 +146,6 @@ impl OpenCandidateRegion { .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT) .await?; - // TODO(weny): Registers the opening region. - match receiver.await? { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; @@ -264,11 +276,11 @@ mod tests { let region_id = persistent_context.region_id; let to_peer_id = persistent_context.to_peer.id; let env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); let open_instruction = new_mock_open_instruction(to_peer_id, region_id); let err = state - .open_candidate_region(&ctx, open_instruction) + .open_candidate_region(&mut ctx, open_instruction) .await .unwrap_err(); @@ -276,6 +288,32 @@ mod tests { assert!(!err.is_retryable()); } + #[tokio::test] + async fn test_candidate_region_opening_error() { + let state = OpenCandidateRegion; + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let to_peer_id = persistent_context.to_peer.id; + + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + let opening_region_keeper = env.opening_region_keeper(); + let _guard = opening_region_keeper + .register(to_peer_id, region_id) + .unwrap(); + + let open_instruction = new_mock_open_instruction(to_peer_id, region_id); + let err = state + .open_candidate_region(&mut ctx, open_instruction) + .await + .unwrap_err(); + + assert_matches!(err, Error::RegionOpening { .. }); + assert!(!err.is_retryable()); + } + #[tokio::test] async fn test_unexpected_instruction_reply() { let state = OpenCandidateRegion; @@ -286,7 +324,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -308,7 +346,7 @@ mod tests { let open_instruction = new_mock_open_instruction(to_peer_id, region_id); let err = state - .open_candidate_region(&ctx, open_instruction) + .open_candidate_region(&mut ctx, open_instruction) .await .unwrap_err(); @@ -326,7 +364,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -351,7 +389,7 @@ mod tests { let open_instruction = new_mock_open_instruction(to_peer_id, region_id); let err = state - .open_candidate_region(&ctx, open_instruction) + .open_candidate_region(&mut ctx, open_instruction) .await .unwrap_err(); @@ -369,7 +407,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); - let ctx = env.context_factory().new_context(persistent_context); + let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); let mailbox = mailbox_ctx.mailbox().clone(); @@ -397,7 +435,7 @@ mod tests { let open_instruction = new_mock_open_instruction(to_peer_id, region_id); let err = state - .open_candidate_region(&ctx, open_instruction) + .open_candidate_region(&mut ctx, open_instruction) .await .unwrap_err(); @@ -412,6 +450,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); @@ -448,6 +487,11 @@ mod tests { }); let next = state.next(&mut ctx).await.unwrap(); + let vc = ctx.volatile_ctx; + assert_eq!( + vc.opening_region_guard.unwrap().info(), + (to_peer_id, region_id) + ); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 2182ebf7e684..d880677af03c 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -25,10 +25,9 @@ use tokio::sync::mpsc::Sender; use super::ContextFactoryImpl; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef}; use crate::service::mailbox::{Channel, MailboxRef}; -// TODO(weny): remove it. -#[allow(dead_code)] /// The context of mailbox. pub struct MailboxContext { mailbox: MailboxRef, @@ -64,6 +63,7 @@ impl MailboxContext { pub struct TestingEnv { table_metadata_manager: TableMetadataManagerRef, mailbox_ctx: MailboxContext, + opening_region_keeper: OpeningRegionKeeperRef, server_addr: String, } @@ -76,9 +76,11 @@ impl TestingEnv { let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone()); let mailbox_ctx = MailboxContext::new(mailbox_sequence); + let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); Self { table_metadata_manager, + opening_region_keeper, mailbox_ctx, server_addr: "localhost".to_string(), } @@ -88,6 +90,7 @@ impl TestingEnv { pub fn context_factory(&self) -> ContextFactoryImpl { ContextFactoryImpl { table_metadata_manager: self.table_metadata_manager.clone(), + opening_region_keeper: self.opening_region_keeper.clone(), volatile_ctx: Default::default(), mailbox: self.mailbox_ctx.mailbox().clone(), server_addr: self.server_addr.to_string(), @@ -99,10 +102,16 @@ impl TestingEnv { &mut self.mailbox_ctx } + /// Returns the [TableMetadataManagerRef] pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } + /// Returns the [OpeningRegionKeeperRef] + pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef { + &self.opening_region_keeper + } + /// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider]. pub fn procedure_context() -> ProcedureContext { ProcedureContext { diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 993f40c08181..f9471f7e07e1 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -171,6 +171,13 @@ impl Drop for OpeningRegionGuard { } } +impl OpeningRegionGuard { + /// Returns opening region info. + pub fn info(&self) -> (DatanodeId, RegionId) { + (self.datanode_id, self.region_id) + } +} + pub type OpeningRegionKeeperRef = Arc; #[derive(Debug, Clone, Default)]