diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index d335907c5a4f..304463c59fea 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -59,9 +59,34 @@ impl Display for SimpleReply { } } +impl Display for OpenRegion { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "OpenRegion(region_ident={}, region_storage_path={})", + self.region_ident, self.region_storage_path + ) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpenRegion { + pub region_ident: RegionIdent, + pub region_storage_path: String, +} + +impl OpenRegion { + pub fn new(region_ident: RegionIdent, path: &str) -> Self { + Self { + region_ident, + region_storage_path: path.to_string(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Display)] pub enum Instruction { - OpenRegion(RegionIdent), + OpenRegion(OpenRegion), CloseRegion(RegionIdent), InvalidateTableIdCache(TableId), InvalidateTableNameCache(TableName), @@ -93,18 +118,21 @@ mod tests { #[test] fn test_serialize_instruction() { - let open_region = Instruction::OpenRegion(RegionIdent { - cluster_id: 1, - datanode_id: 2, - table_id: 1024, - region_number: 1, - engine: "mito2".to_string(), - }); + let open_region = Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + cluster_id: 1, + datanode_id: 2, + table_id: 1024, + region_number: 1, + engine: "mito2".to_string(), + }, + "test/foo", + )); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#, + r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo"}}"#, serialized ); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 7eba2a87000f..dac1efca90db 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -19,11 +19,12 @@ use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult}; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::RegionIdent; use common_query::Output; use common_telemetry::error; use snafu::OptionExt; +use store_api::path_utils::region_dir; use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; @@ -43,11 +44,14 @@ impl RegionHeartbeatResponseHandler { fn instruction_to_request(instruction: Instruction) -> MetaResult<(RegionId, RegionRequest)> { match instruction { - Instruction::OpenRegion(region_ident) => { + Instruction::OpenRegion(OpenRegion { + region_ident, + region_storage_path, + }) => { let region_id = Self::region_ident_to_region_id(®ion_ident); let open_region_req = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, - region_dir: "".to_string(), + region_dir: region_dir(®ion_storage_path, region_id), options: HashMap::new(), }); Ok((region_id, open_region_req)) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 6cb9392caea2..3407390b463a 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{Instruction, InstructionReply, RegionIdent}; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, RegionIdent}; use common_query::prelude::ScalarUdf; use common_query::Output; use common_runtime::Runtime; @@ -90,13 +90,16 @@ fn close_region_instruction() -> Instruction { } fn open_region_instruction() -> Instruction { - Instruction::OpenRegion(RegionIdent { - table_id: 1024, - region_number: 0, - cluster_id: 1, - datanode_id: 2, - engine: "mito2".to_string(), - }) + Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + table_id: 1024, + region_number: 0, + cluster_id: 1, + datanode_id: 2, + engine: "mito2".to_string(), + }, + "path/dir", + )) } pub struct MockQueryEngine; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 27a4fa1d2450..8d3636591264 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -386,7 +386,9 @@ mod tests { use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader}; - use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::ddl::utils::region_storage_path; + use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::TableMetadataManager; use common_meta::sequence::Sequence; use common_meta::DatanodeId; @@ -426,6 +428,7 @@ mod tests { pub context: RegionFailoverContext, pub heartbeat_receivers: HashMap>>, pub pushers: Pushers, + pub path: String, } impl TestingEnv { @@ -549,6 +552,7 @@ mod tests { }, pushers, heartbeat_receivers, + path: region_storage_path(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME).to_string(), } } } @@ -606,7 +610,11 @@ mod tests { let (candidate_tx, mut candidate_rx) = tokio::sync::mpsc::channel(1); for (datanode_id, mut recv) in env.heartbeat_receivers.into_iter() { let mailbox_clone = env.context.mailbox.clone(); - let failed_region_clone = failed_region.clone(); + let opening_region = RegionIdent { + datanode_id, + ..failed_region.clone() + }; + let path = env.path.to_string(); let candidate_tx = candidate_tx.clone(); let _handle = common_runtime::spawn_bg(async move { let resp = recv.recv().await.unwrap().unwrap(); @@ -614,9 +622,10 @@ mod tests { assert_eq!( received.payload, Some(Payload::Json( - serde_json::to_string(&Instruction::OpenRegion( - failed_region_clone.clone() - )) + serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( + opening_region, + &path + ))) .unwrap(), )) ); diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 7fcb939c25b7..666efa6522f8 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -16,17 +16,18 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use async_trait::async_trait; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::ddl::utils::region_storage_path; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::{debug, info}; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use super::update_metadata::UpdateRegionMetadata; use super::{RegionFailoverContext, State}; use crate::error::{ - Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, + self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; use crate::inactive_region_manager::InactiveRegionManager; @@ -36,20 +37,49 @@ use crate::service::mailbox::{Channel, MailboxReceiver}; #[derive(Serialize, Deserialize, Debug)] pub(super) struct ActivateRegion { candidate: Peer, + region_storage_path: Option, } impl ActivateRegion { pub(super) fn new(candidate: Peer) -> Self { - Self { candidate } + Self { + candidate, + region_storage_path: None, + } } async fn send_open_region_message( - &self, + &mut self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, timeout: Duration, ) -> Result { - let instruction = Instruction::OpenRegion(failed_region.clone()); + let table_id = failed_region.table_id; + // TODO(weny): considers fetching table info only once. + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::TableInfoNotFoundSnafu { table_id })? + .table_info; + + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + + let candidate_ident = RegionIdent { + datanode_id: self.candidate.id, + ..failed_region.clone() + }; + info!("Activating region: {candidate_ident:?}"); + + let instruction = Instruction::OpenRegion(OpenRegion::new( + candidate_ident.clone(), + ®ion_storage_path, + )); + + self.region_storage_path = Some(region_storage_path); let msg = MailboxMessage::json_message( "Activate Region", @@ -72,12 +102,8 @@ impl ActivateRegion { // command in time, it was considered an inactive node by metasrv, then it replied, and the // current region failed over again, and the node was selected as a candidate, so it needs // to clear its previous state first. - let candidate = RegionIdent { - datanode_id: self.candidate.id, - ..failed_region.clone() - }; InactiveRegionManager::new(&ctx.in_memory) - .deregister_inactive_region(&candidate) + .deregister_inactive_region(&candidate_ident) .await?; let ch = Channel::Datanode(self.candidate.id); @@ -85,7 +111,7 @@ impl ActivateRegion { } async fn handle_response( - &self, + &mut self, mailbox_receiver: MailboxReceiver, failed_region: &RegionIdent, ) -> Result> { @@ -102,7 +128,14 @@ impl ActivateRegion { .fail(); }; if result { - Ok(Box::new(UpdateRegionMetadata::new(self.candidate.clone()))) + Ok(Box::new(UpdateRegionMetadata::new( + self.candidate.clone(), + self.region_storage_path + .clone() + .context(error::UnexpectedSnafu { + violated: "expected region_storage_path", + })?, + ))) } else { // The region could be just indeed cannot be opened by the candidate, retry // would be in vain. Then why not just end the failover procedure? Because we @@ -135,7 +168,6 @@ impl State for ActivateRegion { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { - info!("Activating region: {failed_region:?}"); let mailbox_receiver = self .send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT) .await?; @@ -160,7 +192,7 @@ mod tests { let failed_region = env.failed_region(1).await; let candidate = 2; - let state = ActivateRegion::new(Peer::new(candidate, "")); + let mut state = ActivateRegion::new(Peer::new(candidate, "")); let mailbox_receiver = state .send_open_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await @@ -179,7 +211,14 @@ mod tests { assert_eq!( received.payload, Some(Payload::Json( - serde_json::to_string(&Instruction::OpenRegion(failed_region.clone())).unwrap(), + serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + datanode_id: candidate, + ..failed_region.clone() + }, + &env.path + ))) + .unwrap(), )) ); @@ -212,7 +251,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" } }"# + r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public" }"# ); } @@ -224,7 +263,7 @@ mod tests { let failed_region = env.failed_region(1).await; let candidate = 2; - let state = ActivateRegion::new(Peer::new(candidate, "")); + let mut state = ActivateRegion::new(Peer::new(candidate, "")); let mailbox_receiver = state .send_open_region_message(&env.context, &failed_region, Duration::from_millis(100)) .await @@ -241,7 +280,14 @@ mod tests { assert_eq!( received.payload, Some(Payload::Json( - serde_json::to_string(&Instruction::OpenRegion(failed_region.clone())).unwrap() + serde_json::to_string(&Instruction::OpenRegion(OpenRegion::new( + RegionIdent { + datanode_id: candidate, + ..failed_region.clone() + }, + &env.path + ))) + .unwrap(), )) ); diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index ce575f62d05f..0b4df7505d55 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -226,7 +226,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" } }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, region_storage_path: None }"# ); } @@ -268,7 +268,7 @@ mod tests { // Timeout or not, proceed to `ActivateRegion`. assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" } }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, region_storage_path: None }"# ); } } diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs index 6c26f22286a6..a68092763144 100644 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs @@ -96,6 +96,7 @@ mod tests { mut heartbeat_receivers, context, pushers, + .. } = env; for frontend_id in 4..=7 { diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 0191cd6218ac..6bb352849f38 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -13,7 +13,6 @@ // limitations under the License. use async_trait::async_trait; -use common_meta::ddl::utils::region_storage_path; use common_meta::key::table_route::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; @@ -24,20 +23,22 @@ use snafu::{OptionExt, ResultExt}; use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; -use crate::error::{ - self, Result, RetryLaterSnafu, TableInfoNotFoundSnafu, TableRouteNotFoundSnafu, -}; +use crate::error::{self, Result, RetryLaterSnafu, TableRouteNotFoundSnafu}; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; #[derive(Serialize, Deserialize, Debug)] pub(super) struct UpdateRegionMetadata { candidate: Peer, + region_storage_path: String, } impl UpdateRegionMetadata { - pub(super) fn new(candidate: Peer) -> Self { - Self { candidate } + pub(super) fn new(candidate: Peer, region_storage_path: String) -> Self { + Self { + candidate, + region_storage_path, + } } /// Updates the metadata of the table. @@ -71,17 +72,6 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let table_info = ctx - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu)? - .context(TableInfoNotFoundSnafu { table_id })? - .table_info; - let region_storage_patch = - region_storage_path(&table_info.catalog_name, &table_info.schema_name); - let mut new_region_routes = table_route_value.region_routes.clone(); for region_route in new_region_routes.iter_mut() { @@ -101,7 +91,7 @@ impl UpdateRegionMetadata { .update_table_route( table_id, engine, - ®ion_storage_patch, + &self.region_storage_path, table_route_value, new_region_routes, ) @@ -184,7 +174,7 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; - let mut state = UpdateRegionMetadata::new(Peer::new(2, "")); + let mut state = UpdateRegionMetadata::new(Peer::new(2, ""), env.path.clone()); let next_state = state.next(&env.context, &failed_region).await.unwrap(); assert_eq!(format!("{next_state:?}"), "InvalidateCache"); @@ -197,7 +187,7 @@ mod tests { async fn test(env: TestingEnv, failed_region: u32, candidate: u64) -> Vec { let failed_region = env.failed_region(failed_region).await; - let state = UpdateRegionMetadata::new(Peer::new(candidate, "")); + let state = UpdateRegionMetadata::new(Peer::new(candidate, ""), env.path.clone()); state .update_table_route(&env.context, &failed_region) .await @@ -335,17 +325,17 @@ mod tests { let failed_region_2 = env.failed_region(2).await; let table_id = failed_region_1.table_id; - + let path = env.path.clone(); let _ = futures::future::join_all(vec![ tokio::spawn(async move { - let state = UpdateRegionMetadata::new(Peer::new(2, "")); + let state = UpdateRegionMetadata::new(Peer::new(2, ""), path); state .update_metadata(&ctx_1, &failed_region_1) .await .unwrap(); }), tokio::spawn(async move { - let state = UpdateRegionMetadata::new(Peer::new(3, "")); + let state = UpdateRegionMetadata::new(Peer::new(3, ""), env.path.clone()); state .update_metadata(&ctx_2, &failed_region_2) .await