From f9a68495909d7d89b0b7bcf73ccee94fc2972460 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 15 Sep 2023 17:07:54 +0800 Subject: [PATCH] feat: region storage path (#2404) * feat: region storage path * Update src/common/meta/src/key/datanode_table.rs Co-authored-by: Weny Xu * chore: by cr * feat: upgrade proto --------- Co-authored-by: Weny Xu --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/cmd/src/cli/upgrade.rs | 12 +++++- src/common/meta/src/ddl/create_table.rs | 10 ++--- src/common/meta/src/ddl/utils.rs | 5 +++ src/common/meta/src/key.rs | 28 +++++++++++-- src/common/meta/src/key/datanode_table.rs | 40 +++++++++++++++---- src/file-table-engine/src/engine/immutable.rs | 10 +++-- src/file-table-engine/src/test_util.rs | 4 +- src/meta-srv/src/error.rs | 4 +- .../region_failover/update_metadata.rs | 24 ++++++++++- src/meta-srv/src/procedure/tests.rs | 3 +- src/mito/src/engine.rs | 6 +-- src/mito/src/engine/procedure/create.rs | 4 +- src/mito/src/engine/tests.rs | 17 ++++++-- src/store-api/src/path_utils.rs | 16 +++++--- src/store-api/src/region_request.rs | 4 +- 17 files changed, 143 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4e509461924..d8562be5a6d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4104,7 +4104,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=81495b166b2c8909f05b3fcaa09eb299bb43a995#81495b166b2c8909f05b3fcaa09eb299bb43a995" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e81a60e817a348ee5b7dfbd991f86d35cd068ce5#e81a60e817a348ee5b7dfbd991f86d35cd068ce5" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 88be3cb9687d..2d6832e1e036 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "81495b166b2c8909f05b3fcaa09eb299bb43a995" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e81a60e817a348ee5b7dfbd991f86d35cd068ce5" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 32d43065ebe0..c7bf1b693cf3 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use clap::Parser; use client::api::v1::meta::TableRouteValue; +use common_meta::ddl::utils::region_storage_path; use common_meta::error as MetaError; use common_meta::key::catalog_name::{CatalogNameKey, CatalogNameValue}; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; @@ -387,6 +388,10 @@ impl MigrateTableMetadata { async fn create_datanode_table_keys(&self, value: &TableGlobalValue) { let table_id = value.table_id(); let engine = value.table_info.meta.engine.as_str(); + let region_storage_path = region_storage_path( + &value.table_info.catalog_name, + &value.table_info.schema_name, + ); let region_distribution: RegionDistribution = value.regions_id_map.clone().into_iter().collect(); @@ -397,7 +402,12 @@ impl MigrateTableMetadata { info!("Creating DatanodeTableKey '{k}' => {regions:?}"); ( k, - DatanodeTableValue::new(table_id, regions, engine.to_string()), + DatanodeTableValue::new( + table_id, + regions, + engine.to_string(), + region_storage_path.clone(), + ), ) }) .collect::>(); diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index dbaf0f82ce53..904dcc9148e3 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -29,7 +29,7 @@ use strum::AsRefStr; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; -use crate::ddl::utils::{handle_operate_region_error, handle_retry_error}; +use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path}; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; @@ -161,8 +161,7 @@ impl CreateTableProcedure { column_defs, primary_key, create_if_not_exists: true, - catalog: String::new(), - schema: String::new(), + path: String::new(), options: create_table_expr.table_options.clone(), }) } @@ -174,6 +173,7 @@ impl CreateTableProcedure { let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; let schema = &create_table_expr.schema_name; + let storage_path = region_storage_path(catalog, schema); let request_template = self.create_region_request_template()?; @@ -191,9 +191,7 @@ impl CreateTableProcedure { let mut create_region_request = request_template.clone(); create_region_request.region_id = region_id.as_u64(); - create_region_request.catalog = catalog.to_string(); - create_region_request.schema = schema.to_string(); - + create_region_request.path = storage_path.clone(); PbRegionRequest::Create(create_region_request) }) .collect::>(); diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 785f9d26e18e..edc31b80c4ff 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -42,3 +42,8 @@ pub fn handle_retry_error(e: Error) -> ProcedureError { ProcedureError::external(e) } } + +#[inline] +pub fn region_storage_path(catalog: &str, schema: &str) -> String { + format!("{}/{}", catalog, schema) +} diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 3e5f5d82fde6..b616725f366f 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -71,6 +71,7 @@ use table_name::{TableNameKey, TableNameManager, TableNameValue}; use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; +use crate::ddl::utils::region_storage_path; use crate::error::{self, Result, SerdeJsonSnafu}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; @@ -242,6 +243,8 @@ impl TableMetadataManager { table_info.meta.region_numbers = region_numbers; let table_id = table_info.ident.table_id; let engine = table_info.meta.engine.clone(); + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); // Creates table name. let table_name = TableNameKey::new( @@ -261,9 +264,12 @@ impl TableMetadataManager { // Creates datanode table key value pairs. let distribution = region_distribution(®ion_routes)?; - let create_datanode_table_txn = - self.datanode_table_manager() - .build_create_txn(table_id, &engine, distribution)?; + let create_datanode_table_txn = self.datanode_table_manager().build_create_txn( + table_id, + &engine, + ®ion_storage_path, + distribution, + )?; // Creates table route. let table_route_value = TableRouteValue::new(region_routes); @@ -441,6 +447,7 @@ impl TableMetadataManager { &self, table_id: TableId, engine: &str, + region_storage_path: &str, current_table_route_value: TableRouteValue, new_region_routes: Vec, ) -> Result<()> { @@ -452,6 +459,7 @@ impl TableMetadataManager { let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( table_id, engine, + region_storage_path, current_region_distribution, new_region_distribution, )?; @@ -554,6 +562,7 @@ mod tests { use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder}; use super::datanode_table::DatanodeTableKey; + use crate::ddl::utils::region_storage_path; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -867,6 +876,8 @@ mod tests { new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; let engine = table_info.meta.engine.as_str(); + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); let current_table_route_value = TableRouteValue::new(region_routes.clone()); // creates metadata. table_metadata_manager @@ -884,6 +895,7 @@ mod tests { .update_table_route( table_id, engine, + ®ion_storage_path, current_table_route_value.clone(), new_region_routes.clone(), ) @@ -896,6 +908,7 @@ mod tests { .update_table_route( table_id, engine, + ®ion_storage_path, current_table_route_value.clone(), new_region_routes.clone(), ) @@ -909,6 +922,7 @@ mod tests { .update_table_route( table_id, engine, + ®ion_storage_path, current_table_route_value.clone(), new_region_routes.clone(), ) @@ -925,7 +939,13 @@ mod tests { new_region_route(4, 4), ]); assert!(table_metadata_manager - .update_table_route(table_id, engine, wrong_table_route_value, new_region_routes) + .update_table_route( + table_id, + engine, + ®ion_storage_path, + wrong_table_route_value, + new_region_routes + ) .await .is_err()); } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 83f4130336ba..64053da75274 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -87,15 +87,23 @@ pub struct DatanodeTableValue { pub regions: Vec, #[serde(default)] pub engine: String, + #[serde(default)] + pub region_storage_path: String, version: u64, } impl DatanodeTableValue { - pub fn new(table_id: TableId, regions: Vec, engine: String) -> Self { + pub fn new( + table_id: TableId, + regions: Vec, + engine: String, + region_storage_path: String, + ) -> Self { Self { table_id, regions, engine, + region_storage_path, version: 0, } } @@ -147,13 +155,19 @@ impl DatanodeTableManager { &self, table_id: TableId, engine: &str, + region_storage_path: &str, distribution: RegionDistribution, ) -> Result { let txns = distribution .into_iter() .map(|(datanode_id, regions)| { let key = DatanodeTableKey::new(datanode_id, table_id); - let val = DatanodeTableValue::new(table_id, regions, engine.to_string()); + let val = DatanodeTableValue::new( + table_id, + regions, + engine.to_string(), + region_storage_path.to_string(), + ); Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?)) }) @@ -169,6 +183,7 @@ impl DatanodeTableManager { &self, table_id: TableId, engine: &str, + region_storage_path: &str, current_region_distribution: RegionDistribution, new_region_distribution: RegionDistribution, ) -> Result { @@ -189,16 +204,26 @@ impl DatanodeTableManager { if *current_region != regions { let key = DatanodeTableKey::new(datanode, table_id); let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new(table_id, regions, engine.to_string()) - .try_as_raw_value()?; + let val = DatanodeTableValue::new( + table_id, + regions, + engine.to_string(), + region_storage_path.to_string(), + ) + .try_as_raw_value()?; opts.push(TxnOp::Put(raw_key, val)); } } else { // New datanodes let key = DatanodeTableKey::new(datanode, table_id); let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new(table_id, regions, engine.to_string()) - .try_as_raw_value()?; + let val = DatanodeTableValue::new( + table_id, + regions, + engine.to_string(), + region_storage_path.to_string(), + ) + .try_as_raw_value()?; opts.push(TxnOp::Put(raw_key, val)); } } @@ -246,9 +271,10 @@ mod tests { table_id: 42, regions: vec![1, 2, 3], engine: Default::default(), + region_storage_path: Default::default(), version: 1, }; - let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","version":1}"#; + let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","version":1}"#; let raw_value = value.try_as_raw_value().unwrap(); assert_eq!(raw_value, literal); diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index 660ba969cd70..8b0f8c71b45e 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -23,7 +23,7 @@ use common_telemetry::{debug, logging}; use datatypes::schema::Schema; use object_store::ObjectStore; use snafu::ResultExt; -use store_api::path_utils::table_dir; +use store_api::path_utils::table_dir_with_catalog_and_schema; use table::engine::{EngineContext, TableEngine, TableEngineProcedure, TableReference}; use table::error::TableOperationSnafu; use table::metadata::{TableId, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; @@ -245,7 +245,7 @@ impl EngineInner { let table_schema = Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?); - let table_dir = table_dir(&catalog_name, &schema_name, table_id); + let table_dir = table_dir_with_catalog_and_schema(&catalog_name, &schema_name, table_id); let table_full_name = table_ref.to_string(); @@ -345,7 +345,8 @@ impl EngineInner { } let table_id = request.table_id; - let table_dir = table_dir(&catalog_name, &schema_name, table_id); + let table_dir = + table_dir_with_catalog_and_schema(&catalog_name, &schema_name, table_id); let (metadata, table_info) = self .recover_table_manifest_and_info(&table_full_name, &table_dir) @@ -388,7 +389,8 @@ impl EngineInner { let _lock = self.table_mutex.lock().await; if let Some(table) = self.get_table(req.table_id) { let table_id = table.table_info().ident.table_id; - let table_dir = table_dir(&req.catalog_name, &req.schema_name, table_id); + let table_dir = + table_dir_with_catalog_and_schema(&req.catalog_name, &req.schema_name, table_id); delete_table_manifest( &table_full_name, diff --git a/src/file-table-engine/src/test_util.rs b/src/file-table-engine/src/test_util.rs index 28a852c86c30..261f826cf725 100644 --- a/src/file-table-engine/src/test_util.rs +++ b/src/file-table-engine/src/test_util.rs @@ -20,7 +20,7 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use object_store::services::Fs; use object_store::ObjectStore; -use store_api::path_utils::table_dir; +use store_api::path_utils::table_dir_with_catalog_and_schema; use table::engine::{EngineContext, TableEngine}; use table::metadata::{RawTableInfo, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{self, CreateTableRequest, TableOptions}; @@ -143,7 +143,7 @@ pub async fn setup_test_engine_and_table(prefix: &str) -> TestEngineComponents { let table_info = table_ref.table_info(); - let table_dir = table_dir( + let table_dir = table_dir_with_catalog_and_schema( &table_info.catalog_name, &table_info.schema_name, table_info.ident.table_id, diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b48bc5f95244..37f206f897bb 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -253,9 +253,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Table info not found: {}", table_name))] + #[snafu(display("Table info not found: {}", table_id))] TableInfoNotFound { - table_name: String, + table_id: TableId, location: Location, }, diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index a58c687b2e4d..0191cd6218ac 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -13,6 +13,7 @@ // limitations under the License. use async_trait::async_trait; +use common_meta::ddl::utils::region_storage_path; use common_meta::key::table_route::TableRouteKey; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; @@ -23,7 +24,9 @@ use snafu::{OptionExt, ResultExt}; use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; -use crate::error::{self, Result, RetryLaterSnafu, TableRouteNotFoundSnafu}; +use crate::error::{ + self, Result, RetryLaterSnafu, TableInfoNotFoundSnafu, TableRouteNotFoundSnafu, +}; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; @@ -68,6 +71,17 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(TableInfoNotFoundSnafu { table_id })? + .table_info; + let region_storage_patch = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + let mut new_region_routes = table_route_value.region_routes.clone(); for region_route in new_region_routes.iter_mut() { @@ -84,7 +98,13 @@ impl UpdateRegionMetadata { ); ctx.table_metadata_manager - .update_table_route(table_id, engine, table_route_value, new_region_routes) + .update_table_route( + table_id, + engine, + ®ion_storage_patch, + table_route_value, + new_region_routes, + ) .await .context(error::UpdateTableRouteSnafu)?; diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 8ff2ac86b641..9f11a6ef92ae 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -144,8 +144,7 @@ fn test_create_region_request_template() { ], primary_key: vec![2, 1], create_if_not_exists: true, - catalog: String::new(), - schema: String::new(), + path: String::new(), options: HashMap::new(), }; assert_eq!(template, expected); diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 09f081673ae6..5fc54d01c590 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -31,7 +31,7 @@ use key_lock::KeyLock; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use storage::manifest::manifest_compress_type; -use store_api::path_utils::{region_name, table_dir}; +use store_api::path_utils::{region_name, table_dir_with_catalog_and_schema}; use store_api::storage::{ CloseOptions, ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, CompactionStrategy, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, @@ -433,7 +433,7 @@ impl MitoEngineInner { let table_id = request.table_id; let engine_ctx = StorageEngineContext::default(); - let table_dir = table_dir(catalog_name, schema_name, table_id); + let table_dir = table_dir_with_catalog_and_schema(catalog_name, schema_name, table_id); let Some((manifest, table_info)) = self .recover_table_manifest_and_info(table_name, &table_dir) @@ -523,7 +523,7 @@ impl MitoEngineInner { let name = &table_info.name; let table_id = table_info.ident.table_id; - let table_dir = table_dir(catalog, schema, table_id); + let table_dir = table_dir_with_catalog_and_schema(catalog, schema, table_id); let table_ref = TableReference { catalog, schema, diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index ab8ef6040518..489942c84d9e 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -23,7 +23,7 @@ use common_telemetry::metric::Timer; use datatypes::schema::{Schema, SchemaRef}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; -use store_api::path_utils::table_dir; +use store_api::path_utils::table_dir_with_catalog_and_schema; use store_api::storage::{ ColumnId, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, RegionDescriptorBuilder, RegionId, RegionNumber, StorageEngine, @@ -208,7 +208,7 @@ impl TableCreator { /// - Callers MUST acquire the table lock first. /// - The procedure may call this method multiple times. pub(crate) async fn create_table(&mut self) -> Result { - let table_dir = table_dir( + let table_dir = table_dir_with_catalog_and_schema( &self.data.request.catalog_name, &self.data.request.schema_name, self.data.request.id, diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index be533a8ca9c1..43616136c385 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -29,6 +29,7 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::region::RegionImpl; use storage::EngineImpl; use store_api::manifest::Manifest; +use store_api::path_utils::table_dir_with_catalog_and_schema; use store_api::storage::{ReadContext, ScanRequest}; use table::metadata::TableType; use table::requests::{ @@ -176,11 +177,11 @@ fn test_region_name() { fn test_table_dir() { assert_eq!( "data/greptime/public/1024/", - table_dir("greptime", "public", 1024) + table_dir_with_catalog_and_schema("greptime", "public", 1024) ); assert_eq!( "data/0x4354a1/prometheus/1024/", - table_dir("0x4354a1", "prometheus", 1024) + table_dir_with_catalog_and_schema("0x4354a1", "prometheus", 1024) ); } @@ -880,7 +881,11 @@ async fn test_flush_table_all_regions() { let region_name = region_name(table_id, 0); let table_info = table.table_info(); - let table_dir = table_dir(&table_info.catalog_name, &table_info.schema_name, table_id); + let table_dir = table_dir_with_catalog_and_schema( + &table_info.catalog_name, + &table_info.schema_name, + table_id, + ); let region_dir = format!( "{}/{}/{}", @@ -914,7 +919,11 @@ async fn test_flush_table_with_region_id() { let region_name = region_name(table_id, 0); let table_info = table.table_info(); - let table_dir = table_dir(&table_info.catalog_name, &table_info.schema_name, table_id); + let table_dir = table_dir_with_catalog_and_schema( + &table_info.catalog_name, + &table_info.schema_name, + table_id, + ); let region_dir = format!( "{}/{}/{}", diff --git a/src/store-api/src/path_utils.rs b/src/store-api/src/path_utils.rs index 7f35709b0ed0..59dd640024a4 100644 --- a/src/store-api/src/path_utils.rs +++ b/src/store-api/src/path_utils.rs @@ -32,15 +32,21 @@ pub fn region_name(table_id: TableId, region_number: RegionNumber) -> String { format!("{table_id}_{region_number:010}") } +// TODO(jeremy): There are still some dependencies on it. Someone will be here soon to remove it. +pub fn table_dir_with_catalog_and_schema(catalog: &str, schema: &str, table_id: TableId) -> String { + let path = format!("{}/{}", catalog, schema); + table_dir(&path, table_id) +} + #[inline] -pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String { - format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/") +pub fn table_dir(path: &str, table_id: TableId) -> String { + format!("{DATA_DIR}{path}/{table_id}/") } -pub fn region_dir(catalog_name: &str, schema_name: &str, region_id: RegionId) -> String { +pub fn region_dir(path: &str, region_id: RegionId) -> String { format!( "{}{}", - table_dir(catalog_name, schema_name, region_id.table_id()), + table_dir(path, region_id.table_id()), region_name(region_id.table_id(), region_id.region_number()) ) } @@ -53,7 +59,7 @@ mod tests { fn test_region_dir() { let region_id = RegionId::new(42, 1); assert_eq!( - region_dir("my_catalog", "my_schema", region_id), + region_dir("my_catalog/my_schema", region_id), "data/my_catalog/my_schema/42/42_0000000001" ); } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 7fb65cdc6e18..cae637830fb5 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -70,7 +70,7 @@ impl RegionRequest { .map(ColumnMetadata::try_from_column_def) .collect::>>()?; let region_id = create.region_id.into(); - let region_dir = region_dir(&create.catalog, &create.schema, region_id); + let region_dir = region_dir(&create.path, region_id); Ok(vec![( region_id, Self::Create(RegionCreateRequest { @@ -89,7 +89,7 @@ impl RegionRequest { )]), region_request::Body::Open(open) => { let region_id = open.region_id.into(); - let region_dir = region_dir(&open.catalog, &open.schema, region_id); + let region_dir = region_dir(&open.path, region_id); Ok(vec![( region_id, Self::Open(RegionOpenRequest {