diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 8ad58552f267..c6cef18cbb7e 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -96,14 +96,21 @@ pub struct OpenRegion { pub region_ident: RegionIdent, pub region_storage_path: String, pub options: HashMap, + pub region_wal_options: HashMap, } impl OpenRegion { - pub fn new(region_ident: RegionIdent, path: &str, options: HashMap) -> Self { + pub fn new( + region_ident: RegionIdent, + path: &str, + options: HashMap, + region_wal_options: HashMap, + ) -> Self { Self { region_ident, region_storage_path: path.to_string(), options, + region_wal_options, } } } @@ -218,12 +225,13 @@ mod tests { }, "test/foo", HashMap::new(), + HashMap::new(), )); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","options":{}}}"#, + r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","options":{},"region_wal_options":{}}}"#, serialized ); diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 2c4c50711e4a..431b0c20c0e5 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -47,8 +47,9 @@ pub struct RegionInfo { #[serde(default)] pub region_options: HashMap, /// The per-region wal options. + /// Key: region number (in string representation). Value: the encoded wal options of the region. #[serde(default)] - pub region_wal_options: HashMap, + pub region_wal_options: HashMap, } pub struct DatanodeTableKey { @@ -179,7 +180,7 @@ impl DatanodeTableManager { .filter_map(|region_number| { region_wal_options .get(region_number) - .map(|wal_options| (*region_number, wal_options.clone())) + .map(|wal_options| (region_number.to_string(), wal_options.clone())) }) .collect(); @@ -223,6 +224,7 @@ impl DatanodeTableManager { opts.push(TxnOp::Delete(raw_key)) } } + // TODO(niebayes): maybe also check any update on region_wal_options. let need_update_options = region_info.region_options != *new_region_options; for (datanode, regions) in new_region_distribution.into_iter() { let need_update = @@ -300,6 +302,41 @@ mod tests { assert!(parsed.is_ok()); } + // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str` + // and `serde_json::to_vec` + `serde_json::from_slice` works for `DatanodeTableValue`. + // Warning: if the key of `region_wal_options` is of type non-String, this test would fail. + #[test] + fn test_serde_with_region_info() { + let region_info = RegionInfo { + engine: "test_engine".to_string(), + region_storage_path: "test_storage_path".to_string(), + region_options: HashMap::from([ + ("a".to_string(), "aa".to_string()), + ("b".to_string(), "bb".to_string()), + ("c".to_string(), "cc".to_string()), + ]), + region_wal_options: HashMap::from([ + ("1".to_string(), "aaa".to_string()), + ("2".to_string(), "bbb".to_string()), + ("3".to_string(), "ccc".to_string()), + ]), + }; + let table_value = DatanodeTableValue { + table_id: 1, + regions: vec![], + region_info, + version: 1, + }; + + let encoded = serde_json::to_string(&table_value).unwrap(); + let decoded = serde_json::from_str(&encoded).unwrap(); + assert_eq!(table_value, decoded); + + let encoded = serde_json::to_vec(&table_value).unwrap(); + let decoded = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(table_value, decoded); + } + #[test] fn test_strip_table_id() { fn test_err(raw_key: &[u8]) { diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index cd1d692a4dd7..c07fb03f5319 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -54,9 +54,12 @@ impl RegionHeartbeatResponseHandler { region_ident, region_storage_path, options, + region_wal_options, }) => Ok(Box::new(|region_server| { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); + // TODO(niebayes): extends options with region_wal_options. + let _ = region_wal_options; let request = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), @@ -239,6 +242,7 @@ mod tests { }, path, HashMap::new(), + HashMap::new(), )) } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index f96226f5e0a7..d47fc8384cdf 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -621,6 +621,7 @@ mod tests { opening_region, &path, HashMap::new(), + HashMap::new(), ))) .unwrap(), )) diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index 7d9737a6440c..c9eff42eccc9 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -24,7 +24,6 @@ use common_meta::RegionIdent; use common_telemetry::{debug, info}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; use super::update_metadata::UpdateRegionMetadata; use super::{RegionFailoverContext, State}; @@ -42,10 +41,10 @@ pub(super) struct ActivateRegion { // the new leader node needs to remark the failed region as "inactive" // to prevent it from renewing the lease. remark_inactive_region: bool, - // An `None` option stands for uninitialized. + // An `None` option stands for unintialized. region_storage_path: Option, region_options: Option>, - region_wal_options: Option>, + region_wal_options: Option>, } impl ActivateRegion { @@ -85,14 +84,19 @@ impl ActivateRegion { }; info!("Activating region: {candidate_ident:?}"); let region_options: HashMap = (&table_info.meta.options).into(); + // TODO(niebayes): properly fetch or construct region wal options. + let region_wal_options = HashMap::new(); let instruction = Instruction::OpenRegion(OpenRegion::new( candidate_ident.clone(), ®ion_storage_path, region_options.clone(), + region_wal_options.clone(), )); self.region_storage_path = Some(region_storage_path); self.region_options = Some(region_options); + self.region_wal_options = Some(region_wal_options); + let msg = MailboxMessage::json_message( "Activate Region", &format!("Metasrv@{}", ctx.selector_ctx.server_addr), @@ -231,6 +235,7 @@ mod tests { }, &env.path, HashMap::new(), + HashMap::new(), ))) .unwrap(), )) @@ -265,7 +270,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {} }"# + r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {}, region_wal_options: {} }"# ); } @@ -301,6 +306,7 @@ mod tests { }, &env.path, HashMap::new(), + HashMap::new(), ))) .unwrap(), )) diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c120b30a26da..b148c3b275ae 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -277,7 +277,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"# ); } diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 39c21991a561..31313ef4f44c 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -23,7 +23,6 @@ use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; @@ -36,7 +35,7 @@ pub(super) struct UpdateRegionMetadata { candidate: Peer, region_storage_path: String, region_options: HashMap, - region_wal_options: HashMap, + region_wal_options: HashMap, } impl UpdateRegionMetadata { @@ -44,7 +43,7 @@ impl UpdateRegionMetadata { candidate: Peer, region_storage_path: String, region_options: HashMap, - region_wal_options: HashMap, + region_wal_options: HashMap, ) -> Self { Self { candidate, diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index ea5176299f93..38bf7aadf7af 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -76,6 +76,8 @@ impl OpenCandidateRegion { let engine = table_info.meta.engine.clone(); let region_options: HashMap = (&table_info.meta.options).into(); + // TODO(niebayes): properly fetch or construct region wal options. + let region_wal_options = HashMap::new(); let open_instruction = Instruction::OpenRegion(OpenRegion::new( RegionIdent { @@ -87,6 +89,7 @@ impl OpenCandidateRegion { }, ®ion_storage_path, region_options, + region_wal_options, )); Ok(open_instruction) @@ -211,6 +214,7 @@ mod tests { }, region_storage_path: "/bar/foo/region/".to_string(), options: Default::default(), + region_wal_options: Default::default(), }) }