diff --git a/Cargo.lock b/Cargo.lock index ba35273b8166..7978408bc55a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1540,6 +1540,7 @@ dependencies = [ "servers", "session", "snafu", + "store-api", "substrait 0.4.3", "table", "temp-env", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 1ccd8fe21200..c28b2982877b 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -58,6 +58,7 @@ serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true +store-api.workspace = true substrait.workspace = true table.workspace = true tokio.workspace = true diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 7ec5956adca7..0f73d2d2fbc7 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -22,12 +22,15 @@ use clap::Parser; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::peer::Peer; +use common_meta::region_meta::wal_meta::{KeyName, RegionWalMeta, RegionWalMetaKey}; +use common_meta::region_meta::RegionMeta; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::table_name::TableName; use common_telemetry::info; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use rand::Rng; +use store_api::storage::{RegionId, RegionNumber}; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; use self::metadata::TableMetadataBencher; @@ -160,3 +163,26 @@ fn create_region_routes() -> Vec { regions } + +fn create_region_meta_map(table_id: TableId) -> HashMap { + // The region ids shall be coincident with the those used in `create_region_routes`. + (0..64u64) + .map(|region_id| { + let key = RegionId::from_u64(region_id).region_number(); + let value = new_bench_region_meta(table_id, region_id); + (key, value) + }) + .collect() +} + +fn new_bench_region_meta(table_id: TableId, region_id: u64) -> RegionMeta { + let topic_key = RegionWalMetaKey::new( + table_id, + RegionId::from_u64(region_id).region_number(), + KeyName::KafkaTopic, + ) + .to_string(); + let topic_value = "test_topic".to_string(); + let wal_meta = RegionWalMeta::with_metas([(topic_key, topic_value)]); + RegionMeta { wal_meta } +} diff --git a/src/cmd/src/cli/bench/metadata.rs b/src/cmd/src/cli/bench/metadata.rs index 11492c7c1f0b..b164b73ff3ab 100644 --- a/src/cmd/src/cli/bench/metadata.rs +++ b/src/cmd/src/cli/bench/metadata.rs @@ -17,7 +17,7 @@ use std::time::Instant; use common_meta::key::TableMetadataManagerRef; use common_meta::table_name::TableName; -use super::{bench_self_recorded, create_region_routes, create_table_info}; +use super::{bench_self_recorded, create_region_meta_map, create_region_routes, create_table_info}; pub struct TableMetadataBencher { table_metadata_manager: TableMetadataManagerRef, @@ -44,11 +44,12 @@ impl TableMetadataBencher { let table_name = TableName::new("bench_catalog", "bench_schema", table_name); let table_info = create_table_info(i, table_name); let region_routes = create_region_routes(); + let region_meta_map = create_region_meta_map(i); let start = Instant::now(); self.table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, region_meta_map) .await .unwrap(); diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 6a996bca6b20..eae72dc83c3c 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -394,6 +394,8 @@ impl MigrateTableMetadata { ); let region_distribution: RegionDistribution = value.regions_id_map.clone().into_iter().collect(); + // TODO(niebayes): Requires properly construct a region_meta_map. + let region_metas = Vec::new(); let datanode_table_kvs = region_distribution .into_iter() @@ -405,6 +407,7 @@ impl MigrateTableMetadata { DatanodeTableValue::new( table_id, regions, + region_metas.clone(), RegionInfo { engine: engine.to_string(), region_storage_path: region_storage_path.clone(), diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 85b4bb8d23da..6d286fffebef 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, @@ -25,7 +27,7 @@ use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; @@ -35,6 +37,7 @@ use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; use crate::metrics; +use crate::region_meta::RegionMeta; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; @@ -78,6 +81,10 @@ impl CreateTableProcedure { &self.creator.data.region_routes } + pub fn region_meta_map(&self) -> &HashMap { + &self.creator.data.region_meta_map + } + /// Checks whether the table exists. async fn on_prepare(&mut self) -> Result { let expr = &self.creator.data.task.create_table; @@ -235,8 +242,10 @@ impl CreateTableProcedure { let raw_table_info = self.table_info().clone(); let region_routes = self.region_routes().clone(); + let region_meta_map = self.region_meta_map().clone(); + manager - .create_table_metadata(raw_table_info, region_routes) + .create_table_metadata(raw_table_info, region_routes, region_meta_map) .await?; info!("Created table metadata for table {table_id}"); @@ -293,6 +302,8 @@ impl TableCreator { cluster_id, task, region_routes, + // TODO(niebayes): Allocate region metadata in `TableMetadataAllocator`. + region_meta_map: HashMap::new(), }, } } @@ -313,6 +324,7 @@ pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, pub region_routes: Vec, + pub region_meta_map: HashMap, pub cluster_id: u64, } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4e3c712082d5..45954d7326e8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -266,6 +266,12 @@ pub enum Error { #[snafu(display("Retry later"))] RetryLater { source: BoxedError }, + + #[snafu(display("The length of region metas is not identical with that of regions, num_region_metas: {}, num_regions: {}", num_region_metas, num_regions))] + RegionMetaLengthMismatched { + num_region_metas: usize, + num_regions: usize, + }, } pub type Result = std::result::Result; @@ -323,6 +329,8 @@ impl ErrorExt for Error { RetryLater { source, .. } => source.status_code(), InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), + + RegionMetaLengthMismatched { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c66ead5b8002..5d6e67c59443 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -84,6 +84,7 @@ use crate::ddl::utils::region_storage_path; use crate::error::{self, Result, SerdeJsonSnafu}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::region_meta::RegionMeta; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; use crate::DatanodeId; @@ -364,6 +365,7 @@ impl TableMetadataManager { &self, mut table_info: RawTableInfo, region_routes: Vec, + region_meta_map: HashMap, ) -> Result<()> { let region_numbers = region_routes .iter() @@ -399,9 +401,12 @@ impl TableMetadataManager { &engine, ®ion_storage_path, region_options, + region_meta_map, distribution, )?; + // TODO(niebayes): Build a txn to persist region metas. + // Creates table route. let table_route_value = TableRouteValue::new(region_routes); let (create_table_route_txn, on_create_table_route_failure) = self @@ -746,6 +751,7 @@ mod tests { use bytes::Bytes; use futures::TryStreamExt; + use store_api::storage::{RegionId, RegionNumber, TableId}; use table::metadata::{RawTableInfo, TableInfo}; use super::datanode_table::DatanodeTableKey; @@ -758,11 +764,16 @@ mod tests { use crate::key::{to_removed_key, DeserializedValueWithBytes, TableMetadataManager}; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; + use crate::region_meta::wal_meta::{KeyName, RegionWalMeta, RegionWalMetaKey}; + use crate::region_meta::RegionMeta; use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus}; + use crate::DatanodeId; #[test] fn test_deserialized_value_with_bytes() { - let region_route = new_test_region_route(); + let datanode_id = 1; + let region_id = 1; + let region_route = new_test_region_route(region_id, datanode_id); let region_routes = vec![region_route.clone()]; let expected_region_routes = @@ -795,59 +806,93 @@ mod tests { assert_eq!(removed, to_removed_key(key)); } - fn new_test_region_route() -> RegionRoute { - new_region_route(1, 2) - } - - fn new_region_route(region_id: u64, datanode: u64) -> RegionRoute { + fn new_test_region_route(region_id: u64, datanode_id: DatanodeId) -> RegionRoute { RegionRoute { region: Region { id: region_id.into(), - name: "r1".to_string(), + name: format!("r{region_id}"), partition: None, attrs: BTreeMap::new(), }, - leader_peer: Some(Peer::new(datanode, "a2")), + leader_peer: Some(Peer::new(datanode_id, format!("a{region_id}"))), follower_peers: vec![], leader_status: None, } } - fn new_test_table_info(region_numbers: impl Iterator) -> TableInfo { - test_utils::new_test_table_info(10, region_numbers) + fn new_test_table_info(table_id: TableId, region_numbers: Vec) -> TableInfo { + test_utils::new_test_table_info(table_id, region_numbers) + } + + fn new_test_region_meta_map( + table_id: TableId, + region_ids: Vec, + ) -> HashMap { + region_ids + .into_iter() + .map(|region_id| { + let region_number = RegionId::from_u64(region_id).region_number(); + + let topic_key = + RegionWalMetaKey::new(table_id, region_number, KeyName::KafkaTopic).to_string(); + let topic_value = "test_topic".to_string(); + let wal_meta = RegionWalMeta::with_metas([(topic_key, topic_value)]); + + let region_meta = RegionMeta { wal_meta }; + + (region_number, region_meta) + }) + .collect() } #[tokio::test] async fn test_create_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let region_route = new_test_region_route(); + + let datanode_id = 1; + let table_id = 1; + let region_id = 1; + let region_number = RegionId::from_u64(region_id).region_number(); + + let region_route = new_test_region_route(region_id, datanode_id); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let region_meta_map = new_test_region_meta_map(table_id, vec![region_id]); + let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + region_meta_map.clone(), + ) .await .unwrap(); - // if metadata was already created, it should be ok. + + // creating metadata with the same key and content is idempotent and deemed ok. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + region_meta_map.clone(), + ) .await .unwrap(); + + // creating metadata with the same key but different content would incur an error. let mut modified_region_routes = region_routes.clone(); modified_region_routes.push(region_route.clone()); - // if remote metadata was exists, it should return an error. assert!(table_metadata_manager - .create_table_metadata(table_info.clone(), modified_region_routes) + .create_table_metadata(table_info.clone(), modified_region_routes, region_meta_map) .await .is_err()); + // ensures the metadata is not overwritten. let (remote_table_info, remote_table_route) = table_metadata_manager .get_full_table_info(10) .await .unwrap(); - assert_eq!( remote_table_info.unwrap().into_inner().table_info, table_info @@ -862,18 +907,23 @@ mod tests { async fn test_delete_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let region_route = new_test_region_route(); + + let datanode_id = 1; + let table_id = 1; + let region_id = 1; + let region_number = RegionId::from_u64(region_id).region_number(); + + let region_route = new_test_region_route(region_id, datanode_id); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); - let table_id = table_info.ident.table_id; - let datanode_id = 2; + let region_meta_map = new_test_region_meta_map(table_id, vec![region_id]); + let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + let table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata(table_info.clone(), region_routes.clone(), region_meta_map) .await .unwrap(); @@ -937,29 +987,38 @@ mod tests { async fn test_rename_table() { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let region_route = new_test_region_route(); + + let datanode_id = 1; + let table_id = 1; + let region_id = 1; + let region_number = RegionId::from_u64(region_id).region_number(); + + let region_route = new_test_region_route(region_id, datanode_id); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); - let table_id = table_info.ident.table_id; + let region_meta_map = new_test_region_meta_map(table_id, vec![region_id]); + let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata(table_info.clone(), region_routes.clone(), region_meta_map) .await .unwrap(); + + // renames the table. let new_table_name = "another_name".to_string(); let table_info_value = DeserializedValueWithBytes::from_inner(TableInfoValue::new(table_info.clone())); - table_metadata_manager .rename_table(table_info_value.clone(), new_table_name.clone()) .await .unwrap(); - // if remote metadata was updated, it should be ok. + + // renaming a table to the same name is idempotent and deemed ok. table_metadata_manager .rename_table(table_info_value.clone(), new_table_name.clone()) .await .unwrap(); + let mut modified_table_info = table_info.clone(); modified_table_info.name = "hi".to_string(); let modified_table_info_value = @@ -1005,14 +1064,20 @@ mod tests { async fn test_update_table_info() { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let region_route = new_test_region_route(); + + let datanode_id = 1; + let table_id = 1; + let region_id = 1; + let region_number = RegionId::from_u64(region_id).region_number(); + + let region_route = new_test_region_route(region_id, datanode_id); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); - let table_id = table_info.ident.table_id; + let region_meta_map = new_test_region_meta_map(table_id, vec![region_id]); + let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata(table_info.clone(), region_routes.clone(), region_meta_map) .await .unwrap(); let mut new_table_info = table_info.clone(); @@ -1057,42 +1122,33 @@ mod tests { async fn test_update_table_leader_region_status() { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let datanode = 1; + + let datanode_id = 1; + let table_id = 1; + let region_ids = [1, 2]; + let region_routes = vec![ - RegionRoute { - region: Region { - id: 1.into(), - name: "r1".to_string(), - partition: None, - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(datanode, "a2")), - leader_status: Some(RegionStatus::Downgraded), - follower_peers: vec![], - }, - RegionRoute { - region: Region { - id: 2.into(), - name: "r2".to_string(), - partition: None, - attrs: BTreeMap::new(), - }, - leader_peer: Some(Peer::new(datanode, "a1")), - leader_status: None, - follower_peers: vec![], - }, + new_test_region_route(region_ids[0], datanode_id), + new_test_region_route(region_ids[1], datanode_id), ]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); - let table_id = table_info.ident.table_id; - let current_table_route_value = - DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); + let region_meta_map = new_test_region_meta_map(table_id, region_ids.to_vec()); + let table_info: RawTableInfo = new_test_table_info( + table_id, + region_ids + .iter() + .map(|id| RegionId::from_u64(*id).region_number()) + .collect(), + ) + .into(); + // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata(table_info.clone(), region_routes.clone(), region_meta_map) .await .unwrap(); + let current_table_route_value = + DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); table_metadata_manager .update_leader_region_status(table_id, ¤t_table_route_value, |region_route| { if region_route.leader_status.is_some() { @@ -1143,28 +1199,38 @@ mod tests { async fn test_update_table_route() { let mem_kv = Arc::new(MemoryKvBackend::default()); let table_metadata_manager = TableMetadataManager::new(mem_kv); - let region_route = new_test_region_route(); + + let datanode_id = 1; + let table_id = 1; + let region_id = 1; + let region_number = RegionId::from_u64(region_id).region_number(); + + let region_route = new_test_region_route(region_id, datanode_id); let region_routes = vec![region_route.clone()]; - let table_info: RawTableInfo = - new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); - let table_id = table_info.ident.table_id; + let region_meta_map = new_test_region_meta_map(table_id, vec![region_id]); + let table_info: RawTableInfo = new_test_table_info(table_id, vec![region_number]).into(); + let engine = table_info.meta.engine.as_str(); let region_storage_path = region_storage_path(&table_info.catalog_name, &table_info.schema_name); let current_table_route_value = DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); + // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata(table_info.clone(), region_routes.clone(), region_meta_map) .await .unwrap(); assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await; + + // updates metadata with new region routes. let new_region_routes = vec![ - new_region_route(1, 1), - new_region_route(2, 2), - new_region_route(3, 3), + new_test_region_route(1, 1), + new_test_region_route(2, 2), + new_test_region_route(3, 3), ]; // it should be ok. + // TODO(niebayes): Requires integrating region metas. table_metadata_manager .update_table_route( table_id, @@ -1202,7 +1268,7 @@ mod tests { .inner .update(new_region_routes.clone()), ); - let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)]; + let new_region_routes = vec![new_test_region_route(2, 4), new_test_region_route(5, 5)]; // it should be ok. table_metadata_manager .update_table_route( @@ -1224,10 +1290,10 @@ mod tests { // The ABA problem. let wrong_table_route_value = DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![ - new_region_route(1, 1), - new_region_route(2, 2), - new_region_route(3, 3), - new_region_route(4, 4), + new_test_region_route(1, 1), + new_test_region_route(2, 2), + new_test_region_route(3, 3), + new_test_region_route(4, 4), ])); assert!(table_metadata_manager .update_table_route( @@ -1244,4 +1310,6 @@ mod tests { .await .is_err()); } + + // TODO(niebayes): Maybe add necessary tests for integrating region metas. } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 1872f36b4fa7..2a75e6db24cf 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -29,6 +29,7 @@ use crate::key::{ use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::region_meta::RegionMeta; use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; use crate::DatanodeId; @@ -101,16 +102,25 @@ impl TableMetaKey for DatanodeTableKey { pub struct DatanodeTableValue { pub table_id: TableId, pub regions: Vec, + /// Region's unique metadata or options. + pub region_metas: Vec, + /// Regions' common metadata or options. #[serde(flatten)] pub region_info: RegionInfo, version: u64, } impl DatanodeTableValue { - pub fn new(table_id: TableId, regions: Vec, region_info: RegionInfo) -> Self { + pub fn new( + table_id: TableId, + regions: Vec, + region_metas: Vec, + region_info: RegionInfo, + ) -> Self { Self { table_id, regions, + region_metas, region_info, version: 0, } @@ -165,15 +175,27 @@ impl DatanodeTableManager { engine: &str, region_storage_path: &str, region_options: HashMap, + region_meta_map: HashMap, distribution: RegionDistribution, ) -> Result { let txns = distribution .into_iter() .map(|(datanode_id, regions)| { + let region_metas = regions + .iter() + .map(|region_number| { + region_meta_map + .get(region_number) + .cloned() + .unwrap_or_default() + }) + .collect::>(); + let key = DatanodeTableKey::new(datanode_id, table_id); let val = DatanodeTableValue::new( table_id, regions, + region_metas, RegionInfo { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), @@ -201,6 +223,9 @@ impl DatanodeTableManager { ) -> Result { let mut opts = Vec::new(); + // TODO(niebayes): Properly construct or fetch region metas. + let new_region_metas = Vec::new(); + // Removes the old datanode table key value pairs for current_datanode in current_region_distribution.keys() { if !new_region_distribution.contains_key(current_datanode) { @@ -221,8 +246,13 @@ impl DatanodeTableManager { if need_update { let key = DatanodeTableKey::new(datanode, table_id); let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new(table_id, regions, region_info.clone()) - .try_as_raw_value()?; + let val = DatanodeTableValue::new( + table_id, + regions, + new_region_metas.clone(), + region_info.clone(), + ) + .try_as_raw_value()?; opts.push(TxnOp::Put(raw_key, val)); } } @@ -256,10 +286,19 @@ impl DatanodeTableManager { #[cfg(test)] mod tests { use super::*; + use crate::region_meta::wal_meta::{KeyName, RegionWalMeta, RegionWalMetaKey}; + + fn new_test_region_meta(table_id: TableId, region_number: RegionNumber) -> RegionMeta { + let topic_key = + RegionWalMetaKey::new(table_id, region_number, KeyName::KafkaTopic).to_string(); + let topic_value = "test_topic".to_string(); + let wal_meta = RegionWalMeta::with_metas([(topic_key, topic_value)]); + RegionMeta { wal_meta } + } #[test] fn test_serde() { - let key = DatanodeTableKey { + let key: DatanodeTableKey = DatanodeTableKey { datanode_id: 1, table_id: 2, }; @@ -269,10 +308,11 @@ mod tests { let value = DatanodeTableValue { table_id: 42, regions: vec![1, 2, 3], + region_metas: Vec::new(), region_info: RegionInfo::default(), version: 1, }; - let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"version":1}"#; + let literal = br#"{"table_id":42,"regions":[1,2,3],"region_metas":[],"engine":"","region_storage_path":"","region_options":{},"version":1}"#; let raw_value = value.try_as_raw_value().unwrap(); assert_eq!(raw_value, literal); @@ -281,11 +321,57 @@ mod tests { assert_eq!(actual, value); // test serde default - let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#; + let raw_str = br#"{"table_id":42,"regions":[1,2,3],"region_metas":[],"version":1}"#; let parsed = DatanodeTableValue::try_from_raw_value(raw_str); assert!(parsed.is_ok()); } + #[test] + fn test_serde_with_region_metas() { + let table_id = 42; + let region_numbers = vec![1, 2, 3]; + let region_metas = region_numbers + .iter() + .map(|region_number| new_test_region_meta(table_id, *region_number)) + .collect(); + + let value = DatanodeTableValue { + table_id: 42, + regions: region_numbers, + region_metas, + region_info: RegionInfo::default(), + version: 1, + }; + + let literal = br#" + { + "table_id": 42, + "regions": [1, 2, 3], + "region_metas": [ + {"wal_meta": {"42/1/kafka_topic": "test_topic"}}, + {"wal_meta": {"42/2/kafka_topic": "test_topic"}}, + {"wal_meta": {"42/3/kafka_topic": "test_topic"}} + ], + "engine": "", + "region_storage_path": "", + "region_options": {}, + "version": 1 + } + "#; + let literal = String::from_utf8(literal.to_vec()) + .unwrap() + .chars() + .filter(|c| !c.is_whitespace()) + .collect::(); + let literal = literal.as_bytes(); + + let raw_value = value.try_as_raw_value().unwrap(); + assert_eq!(raw_value, literal); + + let parsed = DatanodeTableValue::try_from_raw_value(literal).unwrap(); + assert_eq!(parsed, value); + } + #[test] fn test_strip_table_id() { fn test_err(raw_key: &[u8]) { diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 996f793b10de..d5f369f6c6a3 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -29,6 +29,7 @@ pub mod kv_backend; pub mod metrics; pub mod peer; pub mod range_stream; +pub mod region_meta; pub mod rpc; pub mod sequence; pub mod state_store; diff --git a/src/common/meta/src/region_meta.rs b/src/common/meta/src/region_meta.rs new file mode 100644 index 000000000000..a6d0347985d6 --- /dev/null +++ b/src/common/meta/src/region_meta.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod wal_meta; + +use serde::{Deserialize, Serialize}; + +use crate::region_meta::wal_meta::RegionWalMeta; + +/// Stores a region's unique metadata. Any common metadata or options among regions shall not be stored in the struct. +#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq)] +pub struct RegionMeta { + /// The region's unique wal metadata. + pub wal_meta: RegionWalMeta, +} diff --git a/src/common/meta/src/region_meta/wal_meta.rs b/src/common/meta/src/region_meta/wal_meta.rs new file mode 100644 index 000000000000..8c9ade25821e --- /dev/null +++ b/src/common/meta/src/region_meta/wal_meta.rs @@ -0,0 +1,84 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; +use store_api::storage::{RegionNumber, TableId}; + +/// The name of a wal meta key. +#[derive(Debug)] +pub enum KeyName { + /// Kafka topic. + KafkaTopic, +} + +impl Display for KeyName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name_str = match self { + KeyName::KafkaTopic => "kafka_topic", + }; + f.write_str(name_str) + } +} + +/// The key to identify a wal meta. +pub struct RegionWalMetaKey { + table_id: TableId, + region_number: RegionNumber, + name: KeyName, +} + +impl RegionWalMetaKey { + /// Construct a wal meta key with the given table id, region number, and key name. + pub fn new(table_id: TableId, region_number: RegionNumber, name: KeyName) -> Self { + Self { + table_id, + region_number, + name, + } + } +} + +impl ToString for RegionWalMetaKey { + fn to_string(&self) -> String { + format!("{}/{}/{}", self.table_id, self.region_number, self.name) + } +} + +/// A region's unique wal metadata. +// FIXME(niebayes): Shall String or RegionWalMetaKey be used as the key type? +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct RegionWalMeta(HashMap); + +impl RegionWalMeta { + pub fn with_metas(metas: T) -> Self + where + T: IntoIterator, + { + Self(HashMap::from_iter(metas)) + } + + /// Gets the value associated with the given wal meta key. + pub fn get(&self, key: &RegionWalMetaKey) -> Option<&String> { + self.0.get(&key.to_string()) + } + + /// Inserts a key-value pair with the key represented as a wal meta key while the value a string. + /// This insertion always succeeds since the given key-value pair will override the existing one. + pub fn insert(&mut self, key: RegionWalMetaKey, value: String) { + self.0.insert(key.to_string(), value); + } +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index f8a658f55aed..9331409ece7f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -508,22 +508,45 @@ mod tests { use common_meta::key::datanode_table::DatanodeTableManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; + use common_meta::region_meta::wal_meta::{KeyName, RegionWalMeta, RegionWalMetaKey}; + use common_meta::region_meta::RegionMeta; use store_api::region_request::RegionRequest; - use store_api::storage::RegionId; + use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::config::DatanodeOptions; use crate::datanode::DatanodeBuilder; use crate::tests::{mock_region_server, MockRegionEngine}; + fn new_test_region_meta_map( + table_id: TableId, + region_numbers: Vec, + ) -> HashMap { + region_numbers + .into_iter() + .map(|region_number| { + let topic_key = + RegionWalMetaKey::new(table_id, region_number, KeyName::KafkaTopic).to_string(); + let topic_value = "test_topic".to_string(); + let wal_meta = RegionWalMeta::with_metas([(topic_key, topic_value)]); + let region_meta = RegionMeta { wal_meta }; + (region_number, region_meta) + }) + .collect() + } + async fn setup_table_datanode(kv: &KvBackendRef) { let mgr = DatanodeTableManager::new(kv.clone()); + let table_id = 1028; + let region_numbers = vec![0, 1, 2]; + let txn = mgr .build_create_txn( 1028, "mock", "foo/bar/weny", HashMap::from([("foo".to_string(), "bar".to_string())]), - BTreeMap::from([(0, vec![0, 1, 2])]), + new_test_region_meta_map(table_id, region_numbers.clone()), + BTreeMap::from([(0, region_numbers)]), ) .unwrap(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 67fcba4519e4..a0011a9db8c8 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -221,7 +221,7 @@ mod test { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -351,7 +351,7 @@ mod test { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index ab61da316c8d..d6faa1817e57 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -129,6 +129,7 @@ impl RegionMigrationStart { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -183,7 +184,7 @@ mod tests { }; env.table_metadata_manager() - .create_table_metadata(table_info, vec![region_route]) + .create_table_metadata(table_info, vec![region_route], HashMap::new()) .await .unwrap(); @@ -217,7 +218,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -251,7 +252,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -278,7 +279,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 42e057b58f47..a6f09704c1f5 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -463,7 +463,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index f41b66f4c09e..f2c6b388ffed 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -95,6 +95,7 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -162,7 +163,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -215,7 +216,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 49d09a2ad3fc..422b24a94e5e 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -368,7 +368,7 @@ async fn test_submit_alter_region_requests() { let table_info = test_data::new_table_info(); context .table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata(table_info.clone(), region_routes.clone(), HashMap::new()) .await .unwrap(); diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index f9471f7e07e1..923066f06255 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -241,7 +241,7 @@ impl OpeningRegionKeeper { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; @@ -310,7 +310,7 @@ mod tests { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -370,7 +370,7 @@ mod tests { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -420,7 +420,7 @@ mod tests { let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -457,7 +457,7 @@ mod tests { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); @@ -506,7 +506,7 @@ mod tests { let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 4428ec625822..42c309f471ba 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use chrono::DateTime; @@ -143,7 +144,7 @@ pub(crate) async fn prepare_table_region_and_info_value( region_route_factory(4, 3), ]; table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::new()) .await .unwrap(); } diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index e5e24b9b9596..03929a1d9c88 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -21,6 +21,8 @@ use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; +use common_meta::region_meta::wal_meta::{KeyName, RegionWalMeta, RegionWalMetaKey}; +use common_meta::region_meta::RegionMeta; use common_meta::rpc::router::{Region, RegionRoute}; use common_query::prelude::Expr; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; @@ -37,7 +39,7 @@ use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use partition::partition::{PartitionBound, PartitionDef}; use partition::range::RangePartitionRule; use partition::PartitionRuleRef; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, TableId}; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; use table::meter_insert_request; use table::requests::InsertRequest; @@ -80,6 +82,23 @@ pub fn new_test_table_info( .unwrap() } +fn new_test_region_meta_map( + table_id: TableId, + region_numbers: Vec, +) -> HashMap { + region_numbers + .into_iter() + .map(|region_number| { + let topic_key = + RegionWalMetaKey::new(table_id, region_number, KeyName::KafkaTopic).to_string(); + let topic_value = "test_topic".to_string(); + let wal_meta = RegionWalMeta::with_metas([(topic_key, topic_value)]); + let region_meta = RegionMeta { wal_meta }; + (region_number, region_meta) + }) + .collect() +} + /// Create a partition rule manager with two tables, one is partitioned by single column, and /// the other one is two. The tables are under default catalog and schema. /// @@ -102,9 +121,13 @@ pub(crate) async fn create_partition_rule_manager( let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend)); + // TODO(niebayes): Maybe make region numbers coincident with region ids. + let table_id = 1; + let region_numbers = vec![1u32, 2, 3]; + table_metadata_manager .create_table_metadata( - new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(), + new_test_table_info(table_id, "table_1", region_numbers.clone().into_iter()).into(), vec![ RegionRoute { region: Region { @@ -161,13 +184,17 @@ pub(crate) async fn create_partition_rule_manager( leader_status: None, }, ], + new_test_region_meta_map(table_id, region_numbers), ) .await .unwrap(); + let table_id = 2; + let region_numbers = vec![1u32, 2, 3]; + table_metadata_manager .create_table_metadata( - new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(), + new_test_table_info(table_id, "table_2", region_numbers.clone().into_iter()).into(), vec![ RegionRoute { region: Region { @@ -230,6 +257,7 @@ pub(crate) async fn create_partition_rule_manager( leader_status: None, }, ], + new_test_region_meta_map(table_id, region_numbers), ) .await .unwrap();