Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add open candidate region step #2757

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::JoinError;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
Expand All @@ -29,6 +31,17 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Another procedure is opening the region: {} on peer: {}",
region_id,
peer_id
))]
RegionOpeningRace {
location: Location,
peer_id: DatanodeId,
region_id: RegionId,
},

#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
location: Location,
Expand Down Expand Up @@ -625,7 +638,8 @@ impl ErrorExt for Error {
| Error::UnexpectedInstructionReply { .. }
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. } => StatusCode::Unexpected,
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
23 changes: 21 additions & 2 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use store_api::storage::RegionId;
use self::migration_start::RegionMigrationStart;
use crate::error::{Error, Result};
use crate::procedure::utils::region_lock_key;
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
use crate::service::mailbox::MailboxRef;

/// It's shared in each step and available even after recovering.
///
Expand Down Expand Up @@ -66,7 +68,15 @@ impl PersistentContext {
///
/// The additional remote fetches are only required in the worst cases.
#[derive(Debug, Clone, Default)]
pub struct VolatileContext {}
pub struct VolatileContext {
/// `opening_region_guard` will be set after the
/// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
///
/// `opening_region_guard` should be consumed after
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) .
opening_region_guard: Option<OpeningRegionGuard>,
}

/// Used to generate new [Context].
pub trait ContextFactory {
Expand All @@ -77,6 +87,9 @@ pub trait ContextFactory {
pub struct ContextFactoryImpl {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}

impl ContextFactory for ContextFactoryImpl {
Expand All @@ -85,6 +98,9 @@ impl ContextFactory for ContextFactoryImpl {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
opening_region_keeper: self.opening_region_keeper,
mailbox: self.mailbox,
server_addr: self.server_addr,
}
}
}
Expand All @@ -96,12 +112,15 @@ pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: OpeningRegionKeeperRef,
mailbox: MailboxRef,
server_addr: String,
}

impl Context {
/// Returns address of meta server.
pub fn server_addr(&self) -> &str {
todo!()
&self.server_addr
}
}

Expand Down
Loading
Loading