Skip to content

Commit

Permalink
feat: add update metadata step for upgrading candidate region (#2811)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Nov 29, 2023
1 parent cce5edc commit ae81535
Show file tree
Hide file tree
Showing 11 changed files with 697 additions and 248 deletions.
12 changes: 6 additions & 6 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl TableMetadataManager {
&self,
table_id: TableId,
region_info: RegionInfo,
current_table_route_value: DeserializedValueWithBytes<TableRouteValue>,
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
new_region_routes: Vec<RegionRoute>,
new_region_options: &HashMap<String, String>,
) -> Result<()> {
Expand All @@ -606,7 +606,7 @@ impl TableMetadataManager {

let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.build_update_txn(table_id, &current_table_route_value, &new_table_route_value)?;
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;

let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]);

Expand Down Expand Up @@ -1173,7 +1173,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
)
Expand All @@ -1190,7 +1190,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
)
Expand All @@ -1212,7 +1212,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
current_table_route_value.clone(),
&current_table_route_value,
new_region_routes.clone(),
&HashMap::new(),
)
Expand All @@ -1237,7 +1237,7 @@ mod tests {
region_storage_path: region_storage_path.to_string(),
region_options: HashMap::new(),
},
wrong_table_route_value,
&wrong_table_route_value,
new_region_routes,
&HashMap::new(),
)
Expand Down
9 changes: 8 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to find table route for {region_id}"))]
RegionRouteNotFound {
region_id: RegionId,
location: Location,
},

#[snafu(display("Table info not found: {}", table_id))]
TableInfoNotFound {
table_id: TableId,
Expand Down Expand Up @@ -658,7 +664,8 @@ impl ErrorExt for Error {
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. }
| Error::RegionOpeningRace { .. } => StatusCode::Unexpected,
| Error::RegionOpeningRace { .. }
| Error::RegionRouteNotFound { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(async_closure)]
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(option_take_if)]
#![feature(extract_if)]

pub mod bootstrap;
mod cache_invalidator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl UpdateRegionMetadata {
region_storage_path: self.region_storage_path.to_string(),
region_options: self.region_options.clone(),
},
table_route_value,
&table_route_value,
new_region_routes,
&self.region_options,
)
Expand Down
54 changes: 48 additions & 6 deletions src/meta-srv/src/procedure/region_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;

use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::peer::Peer;
Expand Down Expand Up @@ -81,8 +82,13 @@ pub struct VolatileContext {
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
opening_region_guard: Option<OpeningRegionGuard>,
/// `table_route_info` is stored via previous steps for future use.
table_route_info: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `table_route` is stored via previous steps for future use.
table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
/// `table_info` is stored via previous steps for future use.
///
/// `table_info` should remain unchanged during the procedure;
/// no other DDL procedure executed concurrently for the current table.
table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
/// The deadline of leader region lease.
leader_region_lease_deadline: Option<Instant>,
/// The last_entry_id of leader region.
Expand Down Expand Up @@ -153,15 +159,15 @@ impl Context {
&self.server_addr
}

/// Returns the `table_route_value` of [VolatileContext] if any.
/// Returns the `table_route` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_route_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
let table_route_value = &mut self.volatile_ctx.table_route_info;
let table_route_value = &mut self.volatile_ctx.table_route;

if table_route_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
Expand All @@ -183,9 +189,45 @@ impl Context {
Ok(table_route_value.as_ref().unwrap())
}

/// Removes the `table_route_value` of [VolatileContext], returns true if any.
/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route_info.take();
let value = self.volatile_ctx.table_route.take();
value.is_some()
}

/// Returns the `table_info` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_info_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
let table_info_value = &mut self.volatile_ctx.table_info;

if table_info_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(|e| error::Error::RetryLater {
reason: e.to_string(),
location: location!(),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?;

*table_info_value = Some(table_info);
}

Ok(table_info_value.as_ref().unwrap())
}

/// Removes the `table_info` of [VolatileContext], returns true if any.
pub fn remove_table_info_value(&mut self) -> bool {
let value = self.volatile_ctx.table_info.take();
value.is_some()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,11 @@ mod tests {

use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};

fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_meta::ddl::utils::region_storage_path;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::RegionIdent;
use serde::{Deserialize, Serialize};
use snafu::{location, Location, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};

use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
Expand Down Expand Up @@ -54,38 +54,28 @@ impl OpenCandidateRegion {
///
/// Abort(non-retry):
/// - Table Info is not found.
async fn build_open_region_instruction(&self, ctx: &Context) -> Result<Instruction> {
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let candidate = &pc.to_peer;
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(|e| error::Error::RetryLater {
reason: e.to_string(),
location: location!(),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner()
.table_info;
let candidate_id = pc.to_peer.id;

let table_info_value = ctx.get_table_info_value().await?;
let table_info = &table_info_value.table_info;

// The region storage path is immutable after the region is created.
// Therefore, it's safe to store it in `VolatileContext` for future use.
let region_storage_path =
region_storage_path(&table_info.catalog_name, &table_info.schema_name);

let engine = table_info.meta.engine;
let engine = table_info.meta.engine.clone();
let region_options: HashMap<String, String> = (&table_info.meta.options).into();

let open_instruction = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
cluster_id,
datanode_id: candidate.id,
datanode_id: candidate_id,
table_id,
region_number,
engine,
Expand Down Expand Up @@ -198,17 +188,12 @@ mod tests {
use crate::error::Error;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::test_util::{
new_close_region_reply, send_mock_reply, TestingEnv,
self, new_close_region_reply, send_mock_reply, TestingEnv,
};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};

fn new_persistent_context() -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
cluster_id: 0,
}
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
}

fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
Expand Down Expand Up @@ -244,9 +229,12 @@ mod tests {
let state = OpenCandidateRegion;
let persistent_context = new_persistent_context();
let env = TestingEnv::new();
let ctx = env.context_factory().new_context(persistent_context);
let mut ctx = env.context_factory().new_context(persistent_context);

let err = state.build_open_region_instruction(&ctx).await.unwrap_err();
let err = state
.build_open_region_instruction(&mut ctx)
.await
.unwrap_err();

assert_matches!(err, Error::TableInfoNotFound { .. });
assert!(!err.is_retryable());
Expand Down
15 changes: 15 additions & 0 deletions src/meta-srv/src/procedure/region_migration/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::instruction::{InstructionReply, SimpleReply};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::sequence::Sequence;
use common_meta::DatanodeId;
use common_procedure::{Context as ProcedureContext, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_time::util::current_time_millis;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};

use super::ContextFactoryImpl;
use crate::error::Result;
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::procedure::region_migration::PersistentContext;
use crate::region::lease_keeper::{OpeningRegionKeeper, OpeningRegionKeeperRef};
use crate::service::mailbox::{Channel, MailboxRef};

Expand Down Expand Up @@ -127,6 +130,7 @@ impl TestingEnv {
}
}

/// Generates a [InstructionReply::CloseRegion] reply.
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
MailboxMessage {
id,
Expand All @@ -144,6 +148,7 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage {
}
}

/// Sends a mock reply.
pub fn send_mock_reply(
mailbox: MailboxRef,
mut rx: MockHeartbeatReceiver,
Expand All @@ -155,3 +160,13 @@ pub fn send_mock_reply(
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
});
}

/// Generates a [PersistentContext].
pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext {
PersistentContext {
from_peer: Peer::empty(from),
to_peer: Peer::empty(to),
region_id,
cluster_id: 0,
}
}
Loading

0 comments on commit ae81535

Please sign in to comment.