Skip to content

Commit

Permalink
feat: register the opening region
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 17, 2023
1 parent 62339b6 commit 1c613a7
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 15 deletions.
16 changes: 15 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::JoinError;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
Expand All @@ -29,6 +31,17 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Another procedure is opening the region: {} on peer: {}",
region_id,
peer_id
))]
RegionOpening {
location: Location,
peer_id: DatanodeId,
region_id: RegionId,
},

#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
location: Location,
Expand Down Expand Up @@ -625,7 +638,8 @@ impl ErrorExt for Error {
| Error::UnexpectedInstructionReply { .. }
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. } => StatusCode::Unexpected,
| Error::TableIdChanged { .. }
| Error::RegionOpening { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
14 changes: 13 additions & 1 deletion src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use store_api::storage::RegionId;
use self::migration_start::RegionMigrationStart;
use crate::error::{Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
use crate::service::mailbox::MailboxRef;

/// It's shared in each step and available even after recovering.
Expand Down Expand Up @@ -67,7 +68,15 @@ impl PersistentContext {
///
/// The additional remote fetches are only required in the worst cases.
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}
pub struct VolatileContext {
/// `opening_region_guard` will be set after the
/// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
///
/// `opening_region_guard` should be consumed after
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) .
opening_region_guard: Option<OpeningRegionGuard>,
}

/// Used to generate new [Context].
pub trait ContextFactory {
Expand All @@ -78,6 +87,7 @@ pub trait ContextFactory {
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}
Expand All @@ -88,6 +98,7 @@ impl ContextFactory for ContextFactoryImpl {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
opening_region_keeper: self.opening_region_keeper,
mailbox: self.mailbox,
server_addr: self.server_addr,
}
Expand All @@ -101,6 +112,7 @@ pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,33 @@ impl OpenCandidateRegion {
/// Abort(non-retry):
/// - The Datanode is unreachable(e.g., Candidate pusher is not found).
/// - Unexpected instruction reply.
/// - Another procedure is opening the candidate region.
///
/// Retry:
/// - Exceeded deadline of open instruction.
/// - Datanode failed to open the candidate region.
async fn open_candidate_region(
&self,
ctx: &Context,
ctx: &mut Context,
open_instruction: Instruction,
) -> Result<()> {
let pc = &ctx.persistent_ctx;
let vc = &mut ctx.volatile_ctx;
let region_id = pc.region_id;
let candidate = &pc.to_peer;

// Registers the opening region.
let guard = ctx
.opening_region_keeper
.register(candidate.id, region_id)
.context(error::RegionOpeningSnafu {
peer_id: candidate.id,
region_id,
})?;

debug_assert!(vc.opening_region_guard.is_none());
vc.opening_region_guard = Some(guard);

let msg = MailboxMessage::json_message(
&format!("Open candidate region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
Expand All @@ -132,8 +146,6 @@ impl OpenCandidateRegion {
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
.await?;

// TODO(weny): Registers the opening region.

match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
Expand Down Expand Up @@ -264,18 +276,44 @@ mod tests {
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let mut ctx = env.context_factory().new_context(persistent_context);

let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&ctx, open_instruction)
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();

assert_matches!(err, Error::PusherNotFound { .. });
assert!(!err.is_retryable());
}

#[tokio::test]
async fn test_candidate_region_opening_error() {
let state = OpenCandidateRegion;
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;

let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let opening_region_keeper = env.opening_region_keeper();
let _guard = opening_region_keeper
.register(to_peer_id, region_id)
.unwrap();

let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();

assert_matches!(err, Error::RegionOpening { .. });
assert!(!err.is_retryable());
}

#[tokio::test]
async fn test_unexpected_instruction_reply() {
let state = OpenCandidateRegion;
Expand All @@ -286,7 +324,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;

let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand All @@ -308,7 +346,7 @@ mod tests {

let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&ctx, open_instruction)
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();

Expand All @@ -326,7 +364,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;

let mut env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand All @@ -351,7 +389,7 @@ mod tests {

let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&ctx, open_instruction)
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();

Expand All @@ -369,7 +407,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();

let ctx = env.context_factory().new_context(persistent_context);
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
let mailbox = mailbox_ctx.mailbox().clone();

Expand Down Expand Up @@ -397,7 +435,7 @@ mod tests {

let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
let err = state
.open_candidate_region(&ctx, open_instruction)
.open_candidate_region(&mut ctx, open_instruction)
.await
.unwrap_err();

Expand All @@ -412,6 +450,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();

Expand Down Expand Up @@ -448,6 +487,11 @@ mod tests {
});

let next = state.next(&mut ctx).await.unwrap();
let vc = ctx.volatile_ctx;
assert_eq!(
vc.opening_region_guard.unwrap().info(),
(to_peer_id, region_id)
);

let _ = next
.as_any()
Expand Down
13 changes: 11 additions & 2 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use tokio::sync::mpsc::Sender;

use super::ContextFactoryImpl;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};

// TODO(weny): remove it.
#[allow(dead_code)]
/// The context of mailbox.
pub struct MailboxContext {
mailbox: MailboxRef,
Expand Down Expand Up @@ -64,6 +63,7 @@ impl MailboxContext {
pub struct TestingEnv {
table_metadata_manager: TableMetadataManagerRef,
mailbox_ctx: MailboxContext,
opening_region_keeper: OpeningRegionKeeperRef,
server_addr: String,
}

Expand All @@ -76,9 +76,11 @@ impl TestingEnv {
let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 1, kv_backend.clone());

let mailbox_ctx = MailboxContext::new(mailbox_sequence);
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());

Self {
table_metadata_manager,
opening_region_keeper,
mailbox_ctx,
server_addr: "localhost".to_string(),
}
Expand All @@ -88,6 +90,7 @@ impl TestingEnv {
pub fn context_factory(&self) -> ContextFactoryImpl {
ContextFactoryImpl {
table_metadata_manager: self.table_metadata_manager.clone(),
opening_region_keeper: self.opening_region_keeper.clone(),
volatile_ctx: Default::default(),
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
Expand All @@ -99,10 +102,16 @@ impl TestingEnv {
&mut self.mailbox_ctx
}

/// Returns the [TableMetadataManagerRef]
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}

/// Returns the [OpeningRegionKeeperRef]
pub fn opening_region_keeper(&self) -> &OpeningRegionKeeperRef {
&self.opening_region_keeper
}

/// Returns a [ProcedureContext] with a random [ProcedureId] and a [MockContextProvider].
pub fn procedure_context() -> ProcedureContext {
ProcedureContext {
Expand Down
7 changes: 7 additions & 0 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ impl Drop for OpeningRegionGuard {
}
}

impl OpeningRegionGuard {
/// Returns opening region info.
pub fn info(&self) -> (DatanodeId, RegionId) {
(self.datanode_id, self.region_id)
}
}

pub type OpeningRegionKeeperRef = Arc<OpeningRegionKeeper>;

#[derive(Debug, Clone, Default)]
Expand Down

0 comments on commit 1c613a7

Please sign in to comment.