Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add update metadata step for upgrading candidate region #2811

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl TableMetadataManager {
&self,
table_id: TableId,
region_info: RegionInfo,
current_table_route_value: DeserializedValueWithBytes<TableRouteValue>,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
new_region_options: &HashMap<String, String>,
) -> Result<()> {
Expand All @@ -606,7 +606,7 @@ impl TableMetadataManager {

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.build_update_txn(table_id, &current_table_route_value, &new_table_route_value)?;
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;

let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);

Expand Down Expand Up @@ -1173,7 +1173,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
)
Expand All @@ -1190,7 +1190,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
)
Expand All @@ -1212,7 +1212,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
)
Expand All @@ -1237,7 +1237,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
wrong_table_route_value,
&wrong_table_route_value,
new_region_routes,
&HashMap::new(),
)
Expand Down
9 changes: 8 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to find table route for {region_id}"))]
RegionRouteNotFound {
region_id: RegionId,
location: Location,
},

#[snafu(display("Table info not found: {}", table_id))]
TableInfoNotFound {
table_id: TableId,
Expand Down Expand Up @@ -658,7 +664,8 @@ impl ErrorExt for Error {
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. } => StatusCode::Unexpected,
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(async_closure)]
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(option_take_if)]
#![feature(extract_if)]

pub mod bootstrap;
mod cache_invalidator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl UpdateRegionMetadata {
region_storage_path: self.region_storage_path.to_string(),
region_options: self.region_options.clone(),
},
table_route_value,
&table_route_value,
new_region_routes,
&self.region_options,
)
Expand Down
54 changes: 48 additions & 6 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;

use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::peer::Peer;
Expand Down Expand Up @@ -81,8 +82,13 @@ pub struct VolatileContext {
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
opening_region_guard: Option<OpeningRegionGuard>,
/// `table_route_info` is stored via previous steps for future use.
table_route_info: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `table_route` is stored via previous steps for future use.
table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `table_info` is stored via previous steps for future use.
///
/// `table_info` should remain unchanged during the procedure;
/// no other DDL procedure executed concurrently for the current table.
table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
/// The deadline of leader region lease.
leader_region_lease_deadline: Option<Instant>,
/// The last_entry_id of leader region.
Expand Down Expand Up @@ -153,15 +159,15 @@ impl Context {
&self.server_addr
}

/// Returns the `table_route_value` of [VolatileContext] if any.
/// Returns the `table_route` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_route_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
let table_route_value = &mut self.volatile_ctx.table_route_info;
let table_route_value = &mut self.volatile_ctx.table_route;

if table_route_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
Expand All @@ -183,9 +189,45 @@ impl Context {
Ok(table_route_value.as_ref().unwrap())
}

/// Removes the `table_route_value` of [VolatileContext], returns true if any.
/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route_info.take();
let value = self.volatile_ctx.table_route.take();
value.is_some()
}

/// Returns the `table_info` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_info_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
let table_info_value = &mut self.volatile_ctx.table_info;

if table_info_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(|e| error::Error::RetryLater {
reason: e.to_string(),
location: location!(),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?;

*table_info_value = Some(table_info);
}

Ok(table_info_value.as_ref().unwrap())
}

/// Removes the `table_info` of [VolatileContext], returns true if any.
pub fn remove_table_info_value(&mut self) -> bool {
let value = self.volatile_ctx.table_info.take();
value.is_some()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,11 @@ mod tests {

use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};

fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
Expand Down Expand Up @@ -54,38 +54,28 @@ impl OpenCandidateRegion {
///
/// Abort(non-retry):
/// - Table Info is not found.
async fn build_open_region_instruction(&self, ctx: &Context) -> Result<Instruction> {
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let candidate = &pc.to_peer;
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(|e| error::Error::RetryLater {
reason: e.to_string(),
location: location!(),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner()
.table_info;
let candidate_id = pc.to_peer.id;

let table_info_value = ctx.get_table_info_value().await?;
let table_info = &table_info_value.table_info;

// The region storage path is immutable after the region is created.
// Therefore, it's safe to store it in `VolatileContext` for future use.
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);

let engine = table_info.meta.engine;
let engine = table_info.meta.engine.clone();
let region_options: HashMap<String, String> = (&table_info.meta.options).into();

let open_instruction = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
cluster_id,
datanode_id: candidate.id,
datanode_id: candidate_id,
table_id,
region_number,
engine,
Expand Down Expand Up @@ -198,17 +188,12 @@ mod tests {
use crate::error::Error;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
self, new_close_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};

fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}

fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
Expand Down Expand Up @@ -244,9 +229,12 @@ mod tests {
let state = OpenCandidateRegion;
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let mut ctx = env.context_factory().new_context(persistent_context);

let err = state.build_open_region_instruction(&ctx).await.unwrap_err();
let err = state
.build_open_region_instruction(&mut ctx)
.await
.unwrap_err();

assert_matches!(err, Error::TableInfoNotFound { .. });
assert!(!err.is_retryable());
Expand Down
15 changes: 15 additions & 0 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};

use super::ContextFactoryImpl;
use crate::error::Result;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::procedure::region_migration::PersistentContext;
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};

Expand Down Expand Up @@ -127,6 +130,7 @@ impl TestingEnv {
}
}

/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
id,
Expand All @@ -144,6 +148,7 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage {
}
}

/// Sends a mock reply.
pub fn send_mock_reply(
mailbox: MailboxRef,
mut rx: MockHeartbeatReceiver,
Expand All @@ -155,3 +160,13 @@ pub fn send_mock_reply(
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
});
}

/// Generates a [PersistentContext].
pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(from),
to_peer: Peer::empty(to),
region_id,
cluster_id: 0,
}
}
Loading
Loading