From 8776b1204bf2754a47e12dde8d292bd9d581dd55 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 21 Dec 2023 13:51:43 +0800 Subject: [PATCH] feat: use explicitly set table id (#2945) * feat: make standalone table metadata allocator able to use explicitly set table id * rebase * fix: resolve PR comments --- src/cmd/src/standalone.rs | 10 ++++- src/common/meta/src/ddl.rs | 7 +--- src/common/meta/src/ddl_manager.rs | 11 +++--- src/common/meta/src/rpc/ddl.rs | 11 +++--- src/frontend/src/instance/standalone.rs | 51 +++++++++++++++++++------ src/meta-srv/src/table_meta_alloc.rs | 16 ++++---- tests-integration/tests/grpc.rs | 8 ++-- 7 files changed, 72 insertions(+), 42 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index fb0e23ebefd9..bf15c6b6eb30 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -249,12 +249,20 @@ pub struct StartCommand { impl StartCommand { fn load_options(&self, cli_options: &CliOptions) -> Result { - let mut opts: StandaloneOptions = Options::load_layered_options( + let opts: StandaloneOptions = Options::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), None, )?; + self.convert_options(cli_options, opts) + } + + pub fn convert_options( + &self, + cli_options: &CliOptions, + mut opts: StandaloneOptions, + ) -> Result { opts.mode = Mode::Standalone; if let Some(dir) = &cli_options.log_dir { diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 5e03354fcca9..793df3f9c4d6 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -15,17 +15,15 @@ use std::collections::HashMap; use std::sync::Arc; -use api::v1::meta::Partition; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; -use table::metadata::RawTableInfo; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; use crate::error::Result; use crate::key::TableMetadataManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; -use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::router::RegionRoute; pub mod alter_table; @@ -71,8 +69,7 @@ pub trait TableMetadataAllocator: Send + Sync { async fn create( &self, ctx: &TableMetadataAllocatorContext, - table_info: &mut RawTableInfo, - partitions: &[Partition], + task: &CreateTableTask, ) -> Result; } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 8d94b8bfcac7..471de7ac852f 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -386,8 +386,7 @@ async fn handle_create_table_task( .table_metadata_allocator .create( &TableMetadataAllocatorContext { cluster_id }, - &mut create_table_task.table_info, - &create_table_task.partitions, + &create_table_task, ) .await?; @@ -397,6 +396,8 @@ async fn handle_create_table_task( region_wal_options, } = table_meta; + create_table_task.table_info.ident.table_id = table_id; + let id = ddl_manager .submit_create_table_task( cluster_id, @@ -454,9 +455,7 @@ impl DdlTaskExecutor for DdlManager { mod tests { use std::sync::Arc; - use api::v1::meta::Partition; use common_procedure::local::LocalManager; - use table::metadata::RawTableInfo; use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; @@ -471,6 +470,7 @@ mod tests { use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; + use crate::rpc::ddl::CreateTableTask; use crate::state_store::KvStateStore; /// A dummy implemented [DatanodeManager]. @@ -491,8 +491,7 @@ mod tests { async fn create( &self, _ctx: &TableMetadataAllocatorContext, - _table_info: &mut RawTableInfo, - _partitions: &[Partition], + _task: &CreateTableTask, ) -> Result { unimplemented!() } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index b9bbd762b14d..935afac311e0 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -32,7 +32,7 @@ use table::metadata::{RawTableInfo, TableId}; use crate::error::{self, Result}; use crate::table_name::TableName; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum DdlTask { CreateTable(CreateTableTask), DropTable(DropTableTask), @@ -100,6 +100,7 @@ impl TryFrom for DdlTask { } } +#[derive(Clone)] pub struct SubmitDdlTaskRequest { pub task: DdlTask, } @@ -173,7 +174,7 @@ impl From for PbSubmitDdlTaskResponse { } } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct DropTableTask { pub catalog: String, pub schema: String, @@ -224,7 +225,7 @@ impl TryFrom for DropTableTask { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct CreateTableTask { pub create_table: CreateTableExpr, pub partitions: Vec, @@ -319,7 +320,7 @@ impl<'de> Deserialize<'de> for CreateTableTask { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct AlterTableTask { pub alter_table: AlterExpr, } @@ -389,7 +390,7 @@ impl<'de> Deserialize<'de> for AlterTableTask { } } -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct TruncateTableTask { pub catalog: String, pub schema: String, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 95229d343694..37279cb5771d 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,27 +15,26 @@ use std::collections::HashMap; use std::sync::Arc; -use api::v1::meta::Partition; use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; -use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::error::{self as meta_error, Result as MetaResult, UnsupportedSnafu}; use common_meta::peer::Peer; +use common_meta::rpc::ddl::CreateTableTask; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; use common_meta::wal::options_allocator::build_region_wal_options; use common_meta::wal::WalOptionsAllocator; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{debug, tracing}; +use common_telemetry::{debug, info, tracing}; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; -use table::metadata::RawTableInfo; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::{RegionId, TableId}; use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; @@ -119,6 +118,36 @@ impl StandaloneTableMetadataAllocator { wal_options_allocator, } } + + async fn allocate_table_id(&self, task: &CreateTableTask) -> MetaResult { + let table_id = if let Some(table_id) = &task.create_table.table_id { + let table_id = table_id.id; + + ensure!( + !self + .table_id_sequence + .min_max() + .await + .contains(&(table_id as u64)), + UnsupportedSnafu { + operation: format!( + "create table by id {} that is reserved in this node", + table_id + ) + } + ); + + info!( + "Received explicitly allocated table id {}, will use it directly.", + table_id + ); + + table_id + } else { + self.table_id_sequence.next().await? as TableId + }; + Ok(table_id) + } } #[async_trait] @@ -126,12 +155,12 @@ impl TableMetadataAllocator for StandaloneTableMetadataAllocator { async fn create( &self, _ctx: &TableMetadataAllocatorContext, - raw_table_info: &mut RawTableInfo, - partitions: &[Partition], + task: &CreateTableTask, ) -> MetaResult { - let table_id = self.table_id_sequence.next().await? as u32; - raw_table_info.ident.table_id = table_id; - let region_routes = partitions + let table_id = self.allocate_table_id(task).await?; + + let region_routes = task + .partitions .iter() .enumerate() .map(|(i, partition)| { diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 7a4183e58a88..f317496ad2ac 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::Partition; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::rpc::ddl::CreateTableTask; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; use common_meta::wal::options_allocator::build_region_wal_options; @@ -24,7 +24,6 @@ use common_meta::wal::WalOptionsAllocator; use common_telemetry::{debug, warn}; use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; -use table::metadata::RawTableInfo; use crate::error::{self, Result, TooManyPartitionsSnafu}; use crate::metasrv::{SelectorContext, SelectorRef}; @@ -58,13 +57,11 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { async fn create( &self, ctx: &TableMetadataAllocatorContext, - raw_table_info: &mut RawTableInfo, - partitions: &[Partition], + task: &CreateTableTask, ) -> MetaResult { let (table_id, region_routes) = handle_create_region_routes( ctx.cluster_id, - raw_table_info, - partitions, + task, &self.ctx, &self.selector, &self.table_id_sequence, @@ -96,12 +93,14 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { /// pre-allocates create table's table id and region routes. async fn handle_create_region_routes( cluster_id: u64, - table_info: &mut RawTableInfo, - partitions: &[Partition], + task: &CreateTableTask, ctx: &SelectorContext, selector: &SelectorRef, table_id_sequence: &SequenceRef, ) -> Result<(TableId, Vec)> { + let table_info = &task.table_info; + let partitions = &task.partitions; + let mut peers = selector .select( cluster_id, @@ -139,7 +138,6 @@ async fn handle_create_region_routes( .next() .await .context(error::NextSequenceSnafu)? as u32; - table_info.ident.table_id = table_id; ensure!( partitions.len() <= MAX_REGION_SEQ as usize, diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index a0d19908f503..7f099adb253b 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -17,11 +17,11 @@ use api::v1::promql_request::Promql; use api::v1::{ column, AddColumn, AddColumns, AlterExpr, Basic, Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery, - PromqlRequest, RequestHeader, SemanticType, TableId, + PromqlRequest, RequestHeader, SemanticType, }; use auth::user_provider_from_option; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; +use common_catalog::consts::MITO_ENGINE; use common_query::Output; use common_recordbatch::RecordBatches; use servers::grpc::GrpcServerConfig; @@ -447,9 +447,7 @@ fn testing_create_expr() -> CreateTableExpr { primary_keys: vec!["host".to_string()], create_if_not_exists: true, table_options: Default::default(), - table_id: Some(TableId { - id: MIN_USER_TABLE_ID, - }), + table_id: None, engine: MITO_ENGINE.to_string(), } }