Skip to content

Commit

Permalink
feat: use explicitly set table id (#2945)
Browse files Browse the repository at this point in the history
* feat: make standalone table metadata allocator able to use explicitly set table id

* rebase

* fix: resolve PR comments
  • Loading branch information
MichaelScofield authored Dec 21, 2023
1 parent bad8918 commit 8776b12
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 42 deletions.
10 changes: 9 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,20 @@ pub struct StartCommand {

impl StartCommand {
fn load_options(&self, cli_options: &CliOptions) -> Result<Options> {
let mut opts: StandaloneOptions = Options::load_layered_options(
let opts: StandaloneOptions = Options::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
None,
)?;

self.convert_options(cli_options, opts)
}

pub fn convert_options(
&self,
cli_options: &CliOptions,
mut opts: StandaloneOptions,
) -> Result<Options> {
opts.mode = Mode::Standalone;

if let Some(dir) = &cli_options.log_dir {
Expand Down
7 changes: 2 additions & 5 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::meta::Partition;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};
use table::metadata::RawTableInfo;

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;

pub mod alter_table;
Expand Down Expand Up @@ -71,8 +69,7 @@ pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
task: &CreateTableTask,
) -> Result<TableMetadata>;
}

Expand Down
11 changes: 5 additions & 6 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,7 @@ async fn handle_create_table_task(
.table_metadata_allocator
.create(
&TableMetadataAllocatorContext { cluster_id },
&mut create_table_task.table_info,
&create_table_task.partitions,
&create_table_task,
)
.await?;

Expand All @@ -397,6 +396,8 @@ async fn handle_create_table_task(
region_wal_options,
} = table_meta;

create_table_task.table_info.ident.table_id = table_id;

let id = ddl_manager
.submit_create_table_task(
cluster_id,
Expand Down Expand Up @@ -454,9 +455,7 @@ impl DdlTaskExecutor for DdlManager {
mod tests {
use std::sync::Arc;

use api::v1::meta::Partition;
use common_procedure::local::LocalManager;
use table::metadata::RawTableInfo;

use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
Expand All @@ -471,6 +470,7 @@ mod tests {
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::rpc::ddl::CreateTableTask;
use crate::state_store::KvStateStore;

/// A dummy implemented [DatanodeManager].
Expand All @@ -491,8 +491,7 @@ mod tests {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
_table_info: &mut RawTableInfo,
_partitions: &[Partition],
_task: &CreateTableTask,
) -> Result<TableMetadata> {
unimplemented!()
}
Expand Down
11 changes: 6 additions & 5 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::table_name::TableName;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DdlTask {
CreateTable(CreateTableTask),
DropTable(DropTableTask),
Expand Down Expand Up @@ -100,6 +100,7 @@ impl TryFrom<Task> for DdlTask {
}
}

#[derive(Clone)]
pub struct SubmitDdlTaskRequest {
pub task: DdlTask,
}
Expand Down Expand Up @@ -173,7 +174,7 @@ impl From<SubmitDdlTaskResponse> for PbSubmitDdlTaskResponse {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct DropTableTask {
pub catalog: String,
pub schema: String,
Expand Down Expand Up @@ -224,7 +225,7 @@ impl TryFrom<PbDropTableTask> for DropTableTask {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
pub partitions: Vec<Partition>,
Expand Down Expand Up @@ -319,7 +320,7 @@ impl<'de> Deserialize<'de> for CreateTableTask {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct AlterTableTask {
pub alter_table: AlterExpr,
}
Expand Down Expand Up @@ -389,7 +390,7 @@ impl<'de> Deserialize<'de> for AlterTableTask {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct TruncateTableTask {
pub catalog: String,
pub schema: String,
Expand Down
51 changes: 40 additions & 11 deletions src/frontend/src/instance/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,26 @@
use std::collections::HashMap;
use std::sync::Arc;

use api::v1::meta::Partition;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::region::check_response_header;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::error::{self as meta_error, Result as MetaResult, UnsupportedSnafu};
use common_meta::peer::Peer;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, tracing};
use common_telemetry::{debug, info, tracing};
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};

use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};

Expand Down Expand Up @@ -119,19 +118,49 @@ impl StandaloneTableMetadataAllocator {
wal_options_allocator,
}
}

async fn allocate_table_id(&self, task: &CreateTableTask) -> MetaResult<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;

ensure!(
!self
.table_id_sequence
.min_max()
.await
.contains(&(table_id as u64)),
UnsupportedSnafu {
operation: format!(
"create table by id {} that is reserved in this node",
table_id
)
}
);

info!(
"Received explicitly allocated table id {}, will use it directly.",
table_id
);

table_id
} else {
self.table_id_sequence.next().await? as TableId
};
Ok(table_id)
}
}

#[async_trait]
impl TableMetadataAllocator for StandaloneTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
raw_table_info: &mut RawTableInfo,
partitions: &[Partition],
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let table_id = self.table_id_sequence.next().await? as u32;
raw_table_info.ident.table_id = table_id;
let region_routes = partitions
let table_id = self.allocate_table_id(task).await?;

let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
Expand Down
16 changes: 7 additions & 9 deletions src/meta-srv/src/table_meta_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::meta::Partition;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceRef;
use common_meta::wal::options_allocator::build_region_wal_options;
use common_meta::wal::WalOptionsAllocator;
use common_telemetry::{debug, warn};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ};
use table::metadata::RawTableInfo;

use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
Expand Down Expand Up @@ -58,13 +57,11 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
raw_table_info: &mut RawTableInfo,
partitions: &[Partition],
task: &CreateTableTask,
) -> MetaResult<TableMetadata> {
let (table_id, region_routes) = handle_create_region_routes(
ctx.cluster_id,
raw_table_info,
partitions,
task,
&self.ctx,
&self.selector,
&self.table_id_sequence,
Expand Down Expand Up @@ -96,12 +93,14 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator {
/// pre-allocates create table's table id and region routes.
async fn handle_create_region_routes(
cluster_id: u64,
table_info: &mut RawTableInfo,
partitions: &[Partition],
task: &CreateTableTask,
ctx: &SelectorContext,
selector: &SelectorRef,
table_id_sequence: &SequenceRef,
) -> Result<(TableId, Vec<RegionRoute>)> {
let table_info = &task.table_info;
let partitions = &task.partitions;

let mut peers = selector
.select(
cluster_id,
Expand Down Expand Up @@ -139,7 +138,6 @@ async fn handle_create_region_routes(
.next()
.await
.context(error::NextSequenceSnafu)? as u32;
table_info.ident.table_id = table_id;

ensure!(
partitions.len() <= MAX_REGION_SEQ as usize,
Expand Down
8 changes: 3 additions & 5 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use api::v1::promql_request::Promql;
use api::v1::{
column, AddColumn, AddColumns, AlterExpr, Basic, Column, ColumnDataType, ColumnDef,
CreateTableExpr, InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery,
PromqlRequest, RequestHeader, SemanticType, TableId,
PromqlRequest, RequestHeader, SemanticType,
};
use auth::user_provider_from_option;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_catalog::consts::MITO_ENGINE;
use common_query::Output;
use common_recordbatch::RecordBatches;
use servers::grpc::GrpcServerConfig;
Expand Down Expand Up @@ -447,9 +447,7 @@ fn testing_create_expr() -> CreateTableExpr {
primary_keys: vec!["host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(TableId {
id: MIN_USER_TABLE_ID,
}),
table_id: None,
engine: MITO_ENGINE.to_string(),
}
}
Expand Down

0 comments on commit 8776b12

Please sign in to comment.