From 7e6a73e2d85fe2d62c42269c3e924787e60e1937 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 12 Sep 2023 12:01:39 +0800 Subject: [PATCH] feat: consolidate Insert request related partitioning and distributed processing operations into Inserter (#2346) * refactor: RegionRequest as param of RegionRequestHandler.handle Signed-off-by: Zhenchi * feat: partition insert & delete reqs for both standalone and distributed mode Signed-off-by: Zhenchi * chore: nit change Signed-off-by: Zhenchi * fix: wrong function nameg Signed-off-by: Zhenchi * feat: do request in inserter & deleter Signed-off-by: Zhenchi * feat: remove RegionRequestHandler.handle Signed-off-by: Zhenchi * refactor: rename table_creator Signed-off-by: Zhenchi * chore: nit change Signed-off-by: Zhenchi * refactor: address comments Signed-off-by: Zhenchi * chore: nit change Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/client/src/region_handler.rs | 10 +- src/common/meta/src/ddl_manager.rs | 8 +- src/frontend/src/delete.rs | 144 +++++++-- src/frontend/src/error.rs | 15 +- src/frontend/src/insert.rs | 266 ++++++++++++---- src/frontend/src/instance.rs | 121 +++----- src/frontend/src/instance/distributed.rs | 70 +---- .../src/instance/distributed/deleter.rs | 284 ------------------ .../src/instance/distributed/inserter.rs | 275 ----------------- src/frontend/src/instance/grpc.rs | 39 +++ src/frontend/src/instance/standalone.rs | 54 ++-- src/frontend/src/lib.rs | 1 + src/frontend/src/region_req_factory.rs | 43 +++ src/frontend/src/req_convert/common.rs | 2 + .../src/req_convert/common/partitioner.rs | 69 +++++ .../src/req_convert/delete/column_to_row.rs | 2 +- .../src/req_convert/delete/row_to_region.rs | 33 +- .../src/req_convert/delete/table_to_region.rs | 211 ++++++------- .../src/req_convert/insert/column_to_row.rs | 2 +- .../src/req_convert/insert/row_to_region.rs | 33 +- .../src/req_convert/insert/stmt_to_region.rs | 29 +- .../src/req_convert/insert/table_to_region.rs | 211 ++++++------- src/frontend/src/statement.rs | 64 +--- src/frontend/src/statement/copy_table_from.rs | 3 +- src/frontend/src/statement/dml.rs | 24 +- src/frontend/src/table.rs | 6 +- src/meta-srv/src/lib.rs | 2 +- src/meta-srv/src/metasrv/builder.rs | 6 +- .../{table_creator.rs => table_meta_alloc.rs} | 0 src/partition/src/manager.rs | 36 ++- src/partition/src/splitter.rs | 91 ++---- 31 files changed, 890 insertions(+), 1264 deletions(-) delete mode 100644 src/frontend/src/instance/distributed/deleter.rs delete mode 100644 src/frontend/src/instance/distributed/inserter.rs create mode 100644 src/frontend/src/region_req_factory.rs create mode 100644 src/frontend/src/req_convert/common/partitioner.rs rename src/meta-srv/src/{table_creator.rs => table_meta_alloc.rs} (100%) diff --git a/src/client/src/region_handler.rs b/src/client/src/region_handler.rs index 9238389f6d0a..a3977d8fd6b3 100644 --- a/src/client/src/region_handler.rs +++ b/src/client/src/region_handler.rs @@ -14,22 +14,14 @@ use std::sync::Arc; -use api::v1::region::{region_request, QueryRequest}; +use api::v1::region::QueryRequest; use async_trait::async_trait; -use common_meta::datanode_manager::AffectedRows; use common_recordbatch::SendableRecordBatchStream; -use session::context::QueryContextRef; use crate::error::Result; #[async_trait] pub trait RegionRequestHandler: Send + Sync { - async fn handle( - &self, - request: region_request::Body, - ctx: QueryContextRef, - ) -> Result; - // TODO(ruihang): add trace id and span id in the request. async fn do_get(&self, request: QueryRequest) -> Result; } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1814b38645d5..b4787611d71e 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -49,7 +49,7 @@ pub struct DdlManager { datanode_manager: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_creator: TableMetadataAllocatorRef, + table_meta_allocator: TableMetadataAllocatorRef, } impl DdlManager { @@ -58,14 +58,14 @@ impl DdlManager { datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_creator: TableMetadataAllocatorRef, + table_meta_allocator: TableMetadataAllocatorRef, ) -> Self { Self { procedure_manager, datanode_manager: datanode_clients, cache_invalidator, table_metadata_manager, - table_creator, + table_meta_allocator, } } @@ -333,7 +333,7 @@ async fn handle_create_table_task( mut create_table_task: CreateTableTask, ) -> Result { let (table_id, region_routes) = ddl_manager - .table_creator + .table_meta_allocator .create( &TableMetadataAllocatorContext { cluster_id }, &mut create_table_task.table_info, diff --git a/src/frontend/src/delete.rs b/src/frontend/src/delete.rs index 3c9c6850865d..ec88b22346f5 100644 --- a/src/frontend/src/delete.rs +++ b/src/frontend/src/delete.rs @@ -12,37 +12,49 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use std::{iter, mem}; -use api::v1::region::region_request; -use api::v1::{DeleteRequests, RowDeleteRequest, RowDeleteRequests}; -use catalog::CatalogManager; -use client::region_handler::RegionRequestHandler; +use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader}; +use api::v1::{DeleteRequests, RowDeleteRequests}; +use catalog::CatalogManagerRef; +use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::peer::Peer; use common_query::Output; +use futures_util::future; +use metrics::counter; +use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::DeleteRequest as TableDeleteRequest; use table::TableRef; use crate::error::{ - CatalogSnafu, InvalidDeleteRequestSnafu, MissingTimeIndexColumnSnafu, RequestDatanodeSnafu, - Result, TableNotFoundSnafu, + CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu, + MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu, }; -use crate::req_convert::delete::{ColumnToRow, RowToRegion}; +use crate::region_req_factory::RegionRequestFactory; +use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion}; -pub(crate) struct Deleter<'a> { - catalog_manager: &'a dyn CatalogManager, - region_request_handler: &'a dyn RegionRequestHandler, +pub struct Deleter { + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, } -impl<'a> Deleter<'a> { +pub type DeleterRef = Arc; + +impl Deleter { pub fn new( - catalog_manager: &'a dyn CatalogManager, - region_request_handler: &'a dyn RegionRequestHandler, + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, ) -> Self { Self { catalog_manager, - region_request_handler, + partition_manager, + datanode_manager, } } @@ -67,31 +79,99 @@ impl<'a> Deleter<'a> { .map(|r| !r.rows.is_empty()) .unwrap_or_default() }); - validate_row_count_match(&requests)?; + validate_column_count_match(&requests)?; let requests = self.trim_columns(requests, &ctx).await?; - let region_request = RowToRegion::new(self.catalog_manager, &ctx) - .convert(requests) - .await?; - let region_request = region_request::Body::Deletes(region_request); + let deletes = RowToRegion::new( + self.catalog_manager.as_ref(), + self.partition_manager.as_ref(), + &ctx, + ) + .convert(requests) + .await?; - let affected_rows = self - .region_request_handler - .handle(region_request, ctx) - .await - .context(RequestDatanodeSnafu)?; + let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?; Ok(Output::AffectedRows(affected_rows as _)) } + + pub async fn handle_table_delete( + &self, + request: TableDeleteRequest, + ctx: QueryContextRef, + ) -> Result { + let catalog = request.catalog_name.as_str(); + let schema = request.schema_name.as_str(); + let table = request.table_name.as_str(); + let table = self.get_table(catalog, schema, table).await?; + let table_info = table.table_info(); + + let deletes = TableToRegion::new(&table_info, &self.partition_manager) + .convert(request) + .await?; + self.do_request(deletes, ctx.trace_id(), 0).await + } } -impl<'a> Deleter<'a> { +impl Deleter { + async fn do_request( + &self, + requests: RegionDeleteRequests, + trace_id: u64, + span_id: u64, + ) -> Result { + let header = RegionRequestHeader { trace_id, span_id }; + let request_factory = RegionRequestFactory::new(header); + + let tasks = self + .group_requests_by_peer(requests) + .await? + .into_iter() + .map(|(peer, deletes)| { + let request = request_factory.build_delete(deletes); + let datanode_manager = self.datanode_manager.clone(); + common_runtime::spawn_write(async move { + datanode_manager + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestDeletesSnafu) + }) + }); + let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows); + Ok(affected_rows) + } + + async fn group_requests_by_peer( + &self, + requests: RegionDeleteRequests, + ) -> Result> { + let mut deletes: HashMap = HashMap::new(); + + for req in requests.requests { + let peer = self + .partition_manager + .find_region_leader(req.region_id.into()) + .await + .context(FindRegionLeaderSnafu)?; + deletes.entry(peer).or_default().requests.push(req); + } + + Ok(deletes) + } + async fn trim_columns( &self, mut requests: RowDeleteRequests, ctx: &QueryContextRef, ) -> Result { for req in &mut requests.deletes { - let table = self.get_table(req, ctx).await?; + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + let table = self.get_table(catalog, schema, &req.table_name).await?; let key_column_names = self.key_column_names(&table)?; let rows = req.rows.as_mut().unwrap(); @@ -142,25 +222,25 @@ impl<'a> Deleter<'a> { Ok(key_column_names) } - async fn get_table(&self, req: &RowDeleteRequest, ctx: &QueryContextRef) -> Result { + async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result { self.catalog_manager - .table(ctx.current_catalog(), ctx.current_schema(), &req.table_name) + .table(catalog, schema, table) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: req.table_name.clone(), + table_name: common_catalog::format_full_table_name(catalog, schema, table), }) } } -fn validate_row_count_match(requests: &RowDeleteRequests) -> Result<()> { +fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> { for request in &requests.deletes { let rows = request.rows.as_ref().unwrap(); let column_count = rows.schema.len(); ensure!( rows.rows.iter().all(|r| r.values.len() == column_count), InvalidDeleteRequestSnafu { - reason: "row count mismatch" + reason: "column count mismatch" } ) } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 3fd905577759..5bbe13042ccb 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -258,6 +258,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to find leader for region, source: {}", source))] + FindRegionLeader { + source: partition::error::Error, + location: Location, + }, + #[snafu(display("Failed to create table info, source: {}", source))] CreateTableInfo { #[snafu(backtrace)] @@ -683,6 +689,9 @@ pub enum Error { column, ))] ColumnNoneDefaultValue { column: String, location: Location }, + + #[snafu(display("Invalid region request, reason: {}", reason))] + InvalidRegionRequest { reason: String }, } pub type Result = std::result::Result; @@ -761,7 +770,8 @@ impl ErrorExt for Error { | Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } | Error::VectorToGrpcColumn { .. } - | Error::MissingInsertBody { .. } => StatusCode::Internal, + | Error::MissingInsertBody { .. } + | Error::InvalidRegionRequest { .. } => StatusCode::Internal, Error::IncompleteGrpcResult { .. } | Error::ContextValueNotFound { .. } @@ -808,7 +818,8 @@ impl ErrorExt for Error { | Error::FindTablePartitionRule { source, .. } | Error::FindTableRoute { source, .. } | Error::SplitInsert { source, .. } - | Error::SplitDelete { source, .. } => source.status_code(), + | Error::SplitDelete { source, .. } + | Error::FindRegionLeader { source, .. } => source.status_code(), Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, diff --git a/src/frontend/src/insert.rs b/src/frontend/src/insert.rs index b5c82a5da041..a37809cc2cfb 100644 --- a/src/frontend/src/insert.rs +++ b/src/frontend/src/insert.rs @@ -12,48 +12,59 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::sync::Arc; + use api::v1::alter_expr::Kind; -use api::v1::region::region_request; -use api::v1::{AlterExpr, ColumnSchema, InsertRequests, RowInsertRequest, RowInsertRequests}; -use catalog::CatalogManager; -use client::region_handler::RegionRequestHandler; +use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader}; +use api::v1::{ + AlterExpr, ColumnSchema, CreateTableExpr, InsertRequests, RowInsertRequest, RowInsertRequests, +}; +use catalog::CatalogManagerRef; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; +use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::peer::Peer; use common_query::Output; -use common_telemetry::info; +use common_telemetry::{error, info}; use datatypes::schema::Schema; +use futures_util::future; +use metrics::counter; +use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::prelude::*; +use sql::statements::insert::Insert; use table::engine::TableReference; +use table::requests::InsertRequest as TableInsertRequest; use table::TableRef; use crate::error::{ - CatalogSnafu, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, RequestDatanodeSnafu, - Result, + CatalogSnafu, FindNewColumnsOnInsertionSnafu, FindRegionLeaderSnafu, InvalidInsertRequestSnafu, + JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, }; use crate::expr_factory::CreateExprFactory; -use crate::req_convert::insert::{ColumnToRow, RowToRegion}; +use crate::region_req_factory::RegionRequestFactory; +use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion}; use crate::statement::StatementExecutor; -pub(crate) struct Inserter<'a> { - catalog_manager: &'a dyn CatalogManager, - create_expr_factory: &'a CreateExprFactory, - statement_executor: &'a StatementExecutor, - region_request_handler: &'a dyn RegionRequestHandler, +pub struct Inserter { + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, } -impl<'a> Inserter<'a> { +pub type InserterRef = Arc; + +impl Inserter { pub fn new( - catalog_manager: &'a dyn CatalogManager, - create_expr_factory: &'a CreateExprFactory, - statement_executor: &'a StatementExecutor, - region_request_handler: &'a dyn RegionRequestHandler, + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, ) -> Self { Self { catalog_manager, - create_expr_factory, - statement_executor, - region_request_handler, + partition_manager, + datanode_manager, } } @@ -61,15 +72,18 @@ impl<'a> Inserter<'a> { &self, requests: InsertRequests, ctx: QueryContextRef, + statement_executor: &StatementExecutor, ) -> Result { let row_inserts = ColumnToRow::convert(requests)?; - self.handle_row_inserts(row_inserts, ctx).await + self.handle_row_inserts(row_inserts, ctx, statement_executor) + .await } pub async fn handle_row_inserts( &self, mut requests: RowInsertRequests, ctx: QueryContextRef, + statement_executor: &StatementExecutor, ) -> Result { // remove empty requests requests.inserts.retain(|req| { @@ -78,25 +92,110 @@ impl<'a> Inserter<'a> { .map(|r| !r.rows.is_empty()) .unwrap_or_default() }); - validate_row_count_match(&requests)?; + validate_column_count_match(&requests)?; - self.create_or_alter_tables_on_demand(&requests, &ctx) + self.create_or_alter_tables_on_demand(&requests, &ctx, statement_executor) .await?; - let region_request = RowToRegion::new(self.catalog_manager, &ctx) - .convert(requests) + let inserts = RowToRegion::new( + self.catalog_manager.as_ref(), + self.partition_manager.as_ref(), + &ctx, + ) + .convert(requests) + .await?; + + let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?; + Ok(Output::AffectedRows(affected_rows as _)) + } + + pub async fn handle_table_insert( + &self, + request: TableInsertRequest, + ctx: QueryContextRef, + ) -> Result { + let catalog = request.catalog_name.as_str(); + let schema = request.schema_name.as_str(); + let table_name = request.table_name.as_str(); + let table = self.get_table(catalog, schema, table_name).await?; + let table = table.with_context(|| TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name(catalog, schema, table_name), + })?; + let table_info = table.table_info(); + + let inserts = TableToRegion::new(&table_info, &self.partition_manager) + .convert(request) .await?; - let region_request = region_request::Body::Inserts(region_request); - let affected_rows = self - .region_request_handler - .handle(region_request, ctx) - .await - .context(RequestDatanodeSnafu)?; + let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?; + Ok(affected_rows as _) + } + + pub async fn handle_statement_insert( + &self, + insert: &Insert, + ctx: &QueryContextRef, + ) -> Result { + let inserts = + StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx) + .convert(insert) + .await?; + + let affected_rows = self.do_request(inserts, ctx.trace_id(), 0).await?; Ok(Output::AffectedRows(affected_rows as _)) } } -impl<'a> Inserter<'a> { +impl Inserter { + async fn do_request( + &self, + requests: RegionInsertRequests, + trace_id: u64, + span_id: u64, + ) -> Result { + let header = RegionRequestHeader { trace_id, span_id }; + let request_factory = RegionRequestFactory::new(header); + + let tasks = self + .group_requests_by_peer(requests) + .await? + .into_iter() + .map(|(peer, inserts)| { + let request = request_factory.build_insert(inserts); + let datanode_manager = self.datanode_manager.clone(); + common_runtime::spawn_write(async move { + datanode_manager + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestInsertsSnafu) + }) + }); + let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows); + Ok(affected_rows) + } + + async fn group_requests_by_peer( + &self, + requests: RegionInsertRequests, + ) -> Result> { + let mut inserts: HashMap = HashMap::new(); + + for req in requests.requests { + let peer = self + .partition_manager + .find_region_leader(req.region_id.into()) + .await + .context(FindRegionLeaderSnafu)?; + inserts.entry(peer).or_default().requests.push(req); + } + + Ok(inserts) + } + // check if tables already exist: // - if table does not exist, create table by inferred CreateExpr // - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` @@ -104,15 +203,20 @@ impl<'a> Inserter<'a> { &self, requests: &RowInsertRequests, ctx: &QueryContextRef, + statement_executor: &StatementExecutor, ) -> Result<()> { // TODO(jeremy): create and alter in batch? for req in &requests.inserts { - match self.get_table(req, ctx).await? { + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + let table = self.get_table(catalog, schema, &req.table_name).await?; + match table { Some(table) => { validate_request_with_table(req, &table)?; - self.alter_table_on_demand(req, table, ctx).await? + self.alter_table_on_demand(req, table, ctx, statement_executor) + .await? } - None => self.create_table(req, ctx).await?, + None => self.create_table(req, ctx, statement_executor).await?, } } @@ -121,11 +225,12 @@ impl<'a> Inserter<'a> { async fn get_table( &self, - req: &RowInsertRequest, - ctx: &QueryContextRef, + catalog: &str, + schema: &str, + table: &str, ) -> Result> { self.catalog_manager - .table(ctx.current_catalog(), ctx.current_schema(), &req.table_name) + .table(catalog, schema, table) .await .context(CatalogSnafu) } @@ -135,9 +240,11 @@ impl<'a> Inserter<'a> { req: &RowInsertRequest, table: TableRef, ctx: &QueryContextRef, + statement_executor: &StatementExecutor, ) -> Result<()> { let catalog_name = ctx.current_catalog(); let schema_name = ctx.current_schema(); + let table_name = table.table_info().name.clone(); let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let column_exprs = ColumnExpr::from_column_schemas(request_schema); @@ -147,7 +254,6 @@ impl<'a> Inserter<'a> { return Ok(()); }; - let table_name = table.table_info().name.clone(); info!( "Adding new columns: {:?} to table: {}.{}.{}", add_columns, catalog_name, schema_name, table_name @@ -160,54 +266,75 @@ impl<'a> Inserter<'a> { kind: Some(Kind::AddColumns(add_columns)), }; - self.statement_executor - .alter_table_inner(alter_table_expr) - .await?; + let res = statement_executor.alter_table_inner(alter_table_expr).await; - info!( - "Successfully added new columns to table: {}.{}.{}", - catalog_name, schema_name, table_name - ); - - Ok(()) + match res { + Ok(_) => { + info!( + "Successfully added new columns to table: {}.{}.{}", + catalog_name, schema_name, table_name + ); + Ok(()) + } + Err(err) => { + error!( + "Failed to add new columns to table: {}.{}.{}: {}", + catalog_name, schema_name, table_name, err + ); + Err(err) + } + } } - async fn create_table(&self, req: &RowInsertRequest, ctx: &QueryContextRef) -> Result<()> { + async fn create_table( + &self, + req: &RowInsertRequest, + ctx: &QueryContextRef, + statement_executor: &StatementExecutor, + ) -> Result<()> { let table_ref = TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name); + let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); + let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?; info!( "Table {}.{}.{} does not exist, try create table", table_ref.catalog, table_ref.schema, table_ref.table, ); - let mut create_table_expr = self - .create_expr_factory - .create_table_expr_by_column_schemas(&table_ref, request_schema, default_engine())?; - // TODO(weny): multiple regions table. - self.statement_executor - .create_table_inner(&mut create_table_expr, None) - .await?; - - info!( - "Successfully created table on insertion: {}.{}.{}", - table_ref.catalog, table_ref.schema, table_ref.table, - ); - - Ok(()) + let res = statement_executor + .create_table_inner(create_table_expr, None) + .await; + + match res { + Ok(_) => { + info!( + "Successfully created table {}.{}.{}", + table_ref.catalog, table_ref.schema, table_ref.table, + ); + Ok(()) + } + Err(err) => { + error!( + "Failed to create table {}.{}.{}: {}", + table_ref.catalog, table_ref.schema, table_ref.table, err + ); + Err(err) + } + } } } -fn validate_row_count_match(requests: &RowInsertRequests) -> Result<()> { +fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> { for request in &requests.inserts { let rows = request.rows.as_ref().unwrap(); let column_count = rows.schema.len(); ensure!( rows.rows.iter().all(|r| r.values.len() == column_count), InvalidInsertRequestSnafu { - reason: "row count mismatch" + reason: "column count mismatch" } ) } @@ -243,6 +370,13 @@ fn validate_required_columns(request_schema: &[ColumnSchema], table_schema: &Sch Ok(()) } +fn build_create_table_expr( + table: &TableReference, + request_schema: &[ColumnSchema], +) -> Result { + CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, default_engine()) +} + #[cfg(test)] mod tests { use datatypes::prelude::{ConcreteDataType, Value as DtValue}; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index fdb7daa18773..1d1e02fa18e2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -25,14 +25,12 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; -use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::local::manager::SystemTableInitializer; use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; -use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; use common_config::KvStoreConfig; use common_error::ext::BoxedError; @@ -53,6 +51,7 @@ use common_telemetry::{error, timer}; use datanode::region_server::RegionServer; use log_store::raft_engine::RaftEngineBackend; use meta_client::client::{MetaClient, MetaClientBuilder}; +use partition::manager::PartitionRuleManager; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; @@ -82,19 +81,18 @@ pub use standalone::StandaloneDatanodeManager; use table::engine::manager::MemoryTableEngineManager; use self::distributed::DistRegionRequestHandler; -use self::standalone::{StandaloneRegionRequestHandler, StandaloneTableMetadataCreator}; +use self::standalone::StandaloneTableMetadataCreator; use crate::catalog::FrontendCatalogManager; -use crate::delete::Deleter; +use crate::delete::{Deleter, DeleterRef}; use crate::error::{ self, CatalogSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, }; -use crate::expr_factory::CreateExprFactory; use crate::frontend::FrontendOptions; use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; -use crate::insert::Inserter; +use crate::insert::{Inserter, InserterRef}; use crate::metrics; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; @@ -127,13 +125,13 @@ pub struct Instance { script_executor: Arc, statement_executor: Arc, query_engine: QueryEngineRef, - region_request_handler: RegionRequestHandlerRef, - create_expr_factory: CreateExprFactory, /// plugins: this map holds extensions to customize query or auth /// behaviours. plugins: Arc, servers: Arc, heartbeat_task: Option, + inserter: InserterRef, + deleter: DeleterRef, } impl Instance { @@ -172,15 +170,27 @@ impl Instance { ) .query_engine(); - let region_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); + let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); + + let inserter = Arc::new(Inserter::new( + catalog_manager.clone(), + partition_manager.clone(), + datanode_clients.clone(), + )); + let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager, + datanode_clients, + )); let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), - region_request_handler.clone(), meta_client.clone(), meta_backend.clone(), catalog_manager.clone(), + inserter.clone(), + deleter.clone(), )); plugins.insert::(statement_executor.clone()); @@ -194,25 +204,23 @@ impl Instance { ]); let heartbeat_task = Some(HeartbeatTask::new( - meta_client, + meta_client.clone(), opts.heartbeat.clone(), Arc::new(handlers_executor), )); common_telemetry::init_node_id(opts.node_id.clone()); - let create_expr_factory = CreateExprFactory; - Ok(Instance { catalog_manager, script_executor, - create_expr_factory, statement_executor, query_engine, - region_request_handler, plugins: plugins.clone(), servers: Arc::new(HashMap::new()), heartbeat_task, + inserter, + deleter, }) } @@ -293,40 +301,51 @@ impl Instance { let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); - let region_request_handler = StandaloneRegionRequestHandler::arc(region_server.clone()); - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); let cache_invalidator = Arc::new(DummyCacheInvalidator); let ddl_executor = Arc::new(DdlManager::new( procedure_manager, - Arc::new(StandaloneDatanodeManager(region_server)), + datanode_manager.clone(), cache_invalidator.clone(), table_metadata_manager.clone(), Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), )); + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); + + let inserter = Arc::new(Inserter::new( + catalog_manager.clone(), + partition_manager.clone(), + datanode_manager.clone(), + )); + let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager, + datanode_manager, + )); + let statement_executor = Arc::new(StatementExecutor::new( catalog_manager.clone(), query_engine.clone(), - region_request_handler.clone(), ddl_executor, kv_backend.clone(), cache_invalidator, + inserter.clone(), + deleter.clone(), )); - let create_expr_factory = CreateExprFactory; - Ok(Instance { catalog_manager: catalog_manager.clone(), script_executor, - create_expr_factory, statement_executor, query_engine, - region_request_handler, plugins: Default::default(), servers: Arc::new(HashMap::new()), heartbeat_task: None, + inserter, + deleter, }) } @@ -341,62 +360,6 @@ impl Instance { &self.catalog_manager } - // Handle batch inserts with row-format - pub async fn handle_row_inserts( - &self, - requests: RowInsertRequests, - ctx: QueryContextRef, - ) -> Result { - let inserter = Inserter::new( - self.catalog_manager.as_ref(), - &self.create_expr_factory, - &self.statement_executor, - self.region_request_handler.as_ref(), - ); - inserter.handle_row_inserts(requests, ctx).await - } - - /// Handle batch inserts - pub async fn handle_inserts( - &self, - requests: InsertRequests, - ctx: QueryContextRef, - ) -> Result { - let inserter = Inserter::new( - self.catalog_manager.as_ref(), - &self.create_expr_factory, - &self.statement_executor, - self.region_request_handler.as_ref(), - ); - inserter.handle_column_inserts(requests, ctx).await - } - - /// Handle batch deletes with row-format - pub async fn handle_row_deletes( - &self, - requests: RowDeleteRequests, - ctx: QueryContextRef, - ) -> Result { - let deleter = Deleter::new( - self.catalog_manager.as_ref(), - self.region_request_handler.as_ref(), - ); - deleter.handle_row_deletes(requests, ctx).await - } - - /// Handle batch deletes - pub async fn handle_deletes( - &self, - requests: DeleteRequests, - ctx: QueryContextRef, - ) -> Result { - let deleter = Deleter::new( - self.catalog_manager.as_ref(), - self.region_request_handler.as_ref(), - ); - deleter.handle_column_deletes(requests, ctx).await - } - pub fn set_plugins(&mut self, map: Arc) { self.plugins = map; } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index bff5d87520aa..4a99b9d2ef19 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -12,28 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod deleter; -pub(crate) mod inserter; - use std::sync::Arc; -use api::v1::region::{region_request, QueryRequest}; +use api::v1::region::QueryRequest; use async_trait::async_trait; use client::error::{HandleRequestSnafu, Result as ClientResult}; use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; -use common_meta::datanode_manager::AffectedRows; use common_recordbatch::SendableRecordBatchStream; -use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use crate::catalog::FrontendCatalogManager; -use crate::error::{ - FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, RequestQuerySnafu, Result, -}; -use crate::instance::distributed::deleter::DistDeleter; -use crate::instance::distributed::inserter::DistInserter; +use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result}; pub(crate) struct DistRegionRequestHandler { catalog_manager: Arc, @@ -47,17 +38,6 @@ impl DistRegionRequestHandler { #[async_trait] impl RegionRequestHandler for DistRegionRequestHandler { - async fn handle( - &self, - request: region_request::Body, - ctx: QueryContextRef, - ) -> ClientResult { - self.handle_inner(request, ctx) - .await - .map_err(BoxedError::new) - .context(HandleRequestSnafu) - } - async fn do_get(&self, request: QueryRequest) -> ClientResult { self.do_get_inner(request) .await @@ -67,52 +47,6 @@ impl RegionRequestHandler for DistRegionRequestHandler { } impl DistRegionRequestHandler { - async fn handle_inner( - &self, - request: region_request::Body, - ctx: QueryContextRef, - ) -> Result { - match request { - region_request::Body::Inserts(inserts) => { - let inserter = - DistInserter::new(&self.catalog_manager).with_trace_id(ctx.trace_id()); - inserter.insert(inserts).await - } - region_request::Body::Deletes(deletes) => { - let deleter = DistDeleter::new(&self.catalog_manager).with_trace_id(ctx.trace_id()); - deleter.delete(deletes).await - } - region_request::Body::Create(_) => NotSupportedSnafu { - feat: "region create", - } - .fail(), - region_request::Body::Drop(_) => NotSupportedSnafu { - feat: "region drop", - } - .fail(), - region_request::Body::Open(_) => NotSupportedSnafu { - feat: "region open", - } - .fail(), - region_request::Body::Close(_) => NotSupportedSnafu { - feat: "region close", - } - .fail(), - region_request::Body::Alter(_) => NotSupportedSnafu { - feat: "region alter", - } - .fail(), - region_request::Body::Flush(_) => NotSupportedSnafu { - feat: "region flush", - } - .fail(), - region_request::Body::Compact(_) => NotSupportedSnafu { - feat: "region compact", - } - .fail(), - } - } - async fn do_get_inner(&self, request: QueryRequest) -> Result { let region_id = RegionId::from_u64(request.region_id); diff --git a/src/frontend/src/instance/distributed/deleter.rs b/src/frontend/src/instance/distributed/deleter.rs deleted file mode 100644 index befcda45740a..000000000000 --- a/src/frontend/src/instance/distributed/deleter.rs +++ /dev/null @@ -1,284 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use api::v1::region::{region_request, DeleteRequests, RegionRequest, RegionRequestHeader}; -use common_meta::datanode_manager::AffectedRows; -use common_meta::peer::Peer; -use futures::future; -use metrics::counter; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; - -use crate::catalog::FrontendCatalogManager; -use crate::error::{ - FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDeletesSnafu, Result, - SplitDeleteSnafu, -}; - -/// A distributed deleter. It ingests gRPC [DeleteRequests]. -/// -/// Table data partitioning and Datanode requests batching are handled inside. -pub struct DistDeleter<'a> { - catalog_manager: &'a FrontendCatalogManager, - trace_id: Option, - span_id: Option, -} - -impl<'a> DistDeleter<'a> { - pub(crate) fn new(catalog_manager: &'a FrontendCatalogManager) -> Self { - Self { - catalog_manager, - trace_id: None, - span_id: None, - } - } - - pub fn with_trace_id(mut self, trace_id: u64) -> Self { - self.trace_id = Some(trace_id); - self - } - - #[allow(dead_code)] - pub fn with_span_id(mut self, span_id: u64) -> Self { - self.span_id = Some(span_id); - self - } - - pub(crate) async fn delete(&self, requests: DeleteRequests) -> Result { - let requests = self.split(requests).await?; - let trace_id = self.trace_id.unwrap_or_default(); - let span_id = self.span_id.unwrap_or_default(); - let results = future::try_join_all(requests.into_iter().map(|(peer, deletes)| { - let datanode_clients = self.catalog_manager.datanode_manager(); - common_runtime::spawn_write(async move { - let request = RegionRequest { - header: Some(RegionRequestHeader { trace_id, span_id }), - body: Some(region_request::Body::Deletes(deletes)), - }; - datanode_clients - .datanode(&peer) - .await - .handle(request) - .await - .context(RequestDeletesSnafu) - }) - })) - .await - .context(JoinTaskSnafu)?; - - let affected_rows = results.into_iter().sum::>()?; - counter!(crate::metrics::DIST_DELETE_ROW_COUNT, affected_rows); - Ok(affected_rows) - } - - /// Splits gRPC [DeleteRequests] into multiple gRPC [DeleteRequests]s, each of which - /// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write - /// method in Datanode. - async fn split(&self, requests: DeleteRequests) -> Result> { - let partition_manager = self.catalog_manager.partition_manager(); - let mut deletes: HashMap = HashMap::new(); - - for req in requests.requests { - let table_id = RegionId::from_u64(req.region_id).table_id(); - - let req_splits = partition_manager - .split_delete_request(table_id, req) - .await - .context(SplitDeleteSnafu)?; - let table_route = partition_manager - .find_table_route(table_id) - .await - .context(FindTableRouteSnafu { table_id })?; - - for (region_number, delete) in req_splits { - let peer = - table_route - .find_region_leader(region_number) - .context(FindDatanodeSnafu { - region: region_number, - })?; - deletes - .entry(peer.clone()) - .or_default() - .requests - .push(delete); - } - } - - Ok(deletes) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::helper::vectors_to_rows; - use api::v1::region::DeleteRequest; - use api::v1::value::ValueData; - use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; - use client::client_manager::DatanodeClients; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::helper::{CatalogValue, SchemaValue}; - use common_meta::key::catalog_name::CatalogNameKey; - use common_meta::key::schema_name::SchemaNameKey; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::{KvBackend, KvBackendRef}; - use common_meta::rpc::store::PutRequest; - use datatypes::prelude::VectorRef; - use datatypes::vectors::Int32Vector; - - use super::*; - use crate::heartbeat::handler::tests::MockKvCacheInvalidator; - use crate::table::test::create_partition_rule_manager; - - async fn prepare_mocked_backend() -> KvBackendRef { - let backend = Arc::new(MemoryKvBackend::default()); - - let default_catalog = CatalogNameKey { - catalog: DEFAULT_CATALOG_NAME, - } - .to_string(); - let req = PutRequest::new() - .with_key(default_catalog.as_bytes()) - .with_value(CatalogValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); - - let default_schema = SchemaNameKey { - catalog: DEFAULT_CATALOG_NAME, - schema: DEFAULT_SCHEMA_NAME, - } - .to_string(); - let req = PutRequest::new() - .with_key(default_schema.as_bytes()) - .with_value(SchemaValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); - - backend - } - - #[tokio::test] - async fn test_split_deletes() { - let backend = prepare_mocked_backend().await; - create_partition_rule_manager(backend.clone()).await; - - let catalog_manager = Arc::new(FrontendCatalogManager::new( - backend, - Arc::new(MockKvCacheInvalidator::default()), - Arc::new(DatanodeClients::default()), - )); - - let new_delete_request = |vector: VectorRef| -> DeleteRequest { - let row_count = vector.len(); - DeleteRequest { - region_id: RegionId::new(1, 0).into(), - rows: Some(Rows { - schema: vec![ColumnSchema { - column_name: "a".to_string(), - datatype: ColumnDataType::Int32 as i32, - semantic_type: SemanticType::Tag as i32, - }], - rows: vectors_to_rows([vector].iter(), row_count), - }), - } - }; - let requests = DeleteRequests { - requests: vec![ - new_delete_request(Arc::new(Int32Vector::from(vec![ - Some(1), - Some(11), - Some(50), - ]))), - new_delete_request(Arc::new(Int32Vector::from(vec![ - Some(2), - Some(12), - Some(102), - ]))), - ], - }; - - let deleter = DistDeleter::new(&catalog_manager); - let mut deletes = deleter.split(requests).await.unwrap(); - - assert_eq!(deletes.len(), 3); - - let new_split_delete_request = - |rows: Vec>, region_id: RegionId| -> DeleteRequest { - DeleteRequest { - region_id: region_id.into(), - rows: Some(Rows { - schema: vec![ColumnSchema { - column_name: "a".to_string(), - datatype: ColumnDataType::Int32 as i32, - semantic_type: SemanticType::Tag as i32, - }], - rows: rows - .into_iter() - .map(|v| Row { - values: vec![Value { - value_data: v.map(ValueData::I32Value), - }], - }) - .collect(), - }), - } - }; - - // region to datanode placement: - // 1 -> 1 - // 2 -> 2 - // 3 -> 3 - // - // region value ranges: - // 1 -> [50, max) - // 2 -> [10, 50) - // 3 -> (min, 10) - - let datanode_deletes = deletes.remove(&Peer::new(1, "")).unwrap().requests; - assert_eq!(datanode_deletes.len(), 2); - - assert_eq!( - datanode_deletes[0], - new_split_delete_request(vec![Some(50)], RegionId::new(1, 1)) - ); - assert_eq!( - datanode_deletes[1], - new_split_delete_request(vec![Some(102)], RegionId::new(1, 1)) - ); - - let datanode_deletes = deletes.remove(&Peer::new(2, "")).unwrap().requests; - assert_eq!(datanode_deletes.len(), 2); - assert_eq!( - datanode_deletes[0], - new_split_delete_request(vec![Some(11)], RegionId::new(1, 2)) - ); - assert_eq!( - datanode_deletes[1], - new_split_delete_request(vec![Some(12)], RegionId::new(1, 2)) - ); - - let datanode_deletes = deletes.remove(&Peer::new(3, "")).unwrap().requests; - assert_eq!(datanode_deletes.len(), 2); - assert_eq!( - datanode_deletes[0], - new_split_delete_request(vec![Some(1)], RegionId::new(1, 3)) - ); - assert_eq!( - datanode_deletes[1], - new_split_delete_request(vec![Some(2)], RegionId::new(1, 3)) - ); - } -} diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs deleted file mode 100644 index 2be5b3fa9440..000000000000 --- a/src/frontend/src/instance/distributed/inserter.rs +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use api::v1::region::{region_request, InsertRequests, RegionRequest, RegionRequestHeader}; -use common_meta::datanode_manager::AffectedRows; -use common_meta::peer::Peer; -use futures_util::future; -use metrics::counter; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; - -use crate::catalog::FrontendCatalogManager; -use crate::error::{ - FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, - SplitInsertSnafu, -}; - -/// A distributed inserter. It ingests gRPC [InsertRequests]. -/// -/// Table data partitioning and Datanode requests batching are handled inside. -pub struct DistInserter<'a> { - catalog_manager: &'a FrontendCatalogManager, - trace_id: Option, - span_id: Option, -} - -impl<'a> DistInserter<'a> { - pub fn new(catalog_manager: &'a FrontendCatalogManager) -> Self { - Self { - catalog_manager, - trace_id: None, - span_id: None, - } - } - - pub fn with_trace_id(mut self, trace_id: u64) -> Self { - self.trace_id = Some(trace_id); - self - } - - #[allow(dead_code)] - pub fn with_span_id(mut self, span_id: u64) -> Self { - self.span_id = Some(span_id); - self - } - - pub(crate) async fn insert(&self, requests: InsertRequests) -> Result { - let requests = self.split(requests).await?; - let trace_id = self.trace_id.unwrap_or_default(); - let span_id = self.span_id.unwrap_or_default(); - let results = future::try_join_all(requests.into_iter().map(|(peer, inserts)| { - let datanode_clients = self.catalog_manager.datanode_manager(); - common_runtime::spawn_write(async move { - let request = RegionRequest { - header: Some(RegionRequestHeader { trace_id, span_id }), - body: Some(region_request::Body::Inserts(inserts)), - }; - datanode_clients - .datanode(&peer) - .await - .handle(request) - .await - .context(RequestInsertsSnafu) - }) - })) - .await - .context(JoinTaskSnafu)?; - - let affected_rows = results.into_iter().sum::>()?; - counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows); - Ok(affected_rows) - } - - /// Splits gRPC [InsertRequests] into multiple gRPC [InsertRequests]s, each of which - /// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write - /// method in Datanode. - async fn split(&self, requests: InsertRequests) -> Result> { - let partition_manager = self.catalog_manager.partition_manager(); - let mut inserts: HashMap = HashMap::new(); - - for req in requests.requests { - let table_id = RegionId::from_u64(req.region_id).table_id(); - - let req_splits = partition_manager - .split_insert_request(table_id, req) - .await - .context(SplitInsertSnafu)?; - let table_route = partition_manager - .find_table_route(table_id) - .await - .context(FindTableRouteSnafu { table_id })?; - let region_map = table_route.region_map(); - - for (region_number, insert) in req_splits { - let peer = *region_map.get(®ion_number).context(FindDatanodeSnafu { - region: region_number, - })?; - inserts - .entry(peer.clone()) - .or_default() - .requests - .push(insert); - } - } - - Ok(inserts) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::helper::vectors_to_rows; - use api::v1::region::InsertRequest; - use api::v1::value::ValueData; - use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; - use client::client_manager::DatanodeClients; - use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; - use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackendRef; - use datatypes::prelude::VectorRef; - use datatypes::vectors::Int32Vector; - - use super::*; - use crate::heartbeat::handler::tests::MockKvCacheInvalidator; - use crate::table::test::create_partition_rule_manager; - - async fn prepare_mocked_backend() -> KvBackendRef { - let backend = Arc::new(MemoryKvBackend::default()); - - let catalog_manager = CatalogManager::new(backend.clone()); - let schema_manager = SchemaManager::new(backend.clone()); - - catalog_manager - .create(CatalogNameKey::default()) - .await - .unwrap(); - schema_manager - .create(SchemaNameKey::default(), None) - .await - .unwrap(); - - backend - } - - #[tokio::test] - async fn test_split_inserts() { - let backend = prepare_mocked_backend().await; - create_partition_rule_manager(backend.clone()).await; - - let catalog_manager = Arc::new(FrontendCatalogManager::new( - backend, - Arc::new(MockKvCacheInvalidator::default()), - Arc::new(DatanodeClients::default()), - )); - - let inserter = DistInserter::new(&catalog_manager); - - let new_insert_request = |vector: VectorRef| -> InsertRequest { - let row_count = vector.len(); - InsertRequest { - region_id: RegionId::new(1, 0).into(), - rows: Some(Rows { - schema: vec![ColumnSchema { - column_name: "a".to_string(), - datatype: ColumnDataType::Int32 as i32, - semantic_type: SemanticType::Field as i32, - }], - rows: vectors_to_rows([vector].iter(), row_count), - }), - } - }; - - let requests = InsertRequests { - requests: vec![ - new_insert_request(Arc::new(Int32Vector::from(vec![ - Some(1), - None, - Some(11), - Some(101), - ]))), - new_insert_request(Arc::new(Int32Vector::from(vec![ - Some(2), - Some(12), - None, - Some(102), - ]))), - ], - }; - - let mut inserts = inserter.split(requests).await.unwrap(); - - assert_eq!(inserts.len(), 3); - - let new_split_insert_request = - |rows: Vec>, region_id: RegionId| -> InsertRequest { - InsertRequest { - region_id: region_id.into(), - rows: Some(Rows { - schema: vec![ColumnSchema { - column_name: "a".to_string(), - datatype: ColumnDataType::Int32 as i32, - semantic_type: SemanticType::Field as i32, - }], - rows: rows - .into_iter() - .map(|v| Row { - values: vec![Value { - value_data: v.map(ValueData::I32Value), - }], - }) - .collect(), - }), - } - }; - - // region to datanode placement: - // 1 -> 1 - // 2 -> 2 - // 3 -> 3 - // - // region value ranges: - // 1 -> [50, max) - // 2 -> [10, 50) - // 3 -> (min, 10) - - let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().requests; - assert_eq!(datanode_inserts.len(), 2); - assert_eq!( - datanode_inserts[0], - new_split_insert_request(vec![Some(101)], RegionId::new(1, 1)) - ); - assert_eq!( - datanode_inserts[1], - new_split_insert_request(vec![Some(102)], RegionId::new(1, 1)) - ); - - let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().requests; - assert_eq!(datanode_inserts.len(), 2); - assert_eq!( - datanode_inserts[0], - new_split_insert_request(vec![Some(11)], RegionId::new(1, 2)) - ); - assert_eq!( - datanode_inserts[1], - new_split_insert_request(vec![Some(12)], RegionId::new(1, 2)) - ); - - let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().requests; - assert_eq!(datanode_inserts.len(), 2); - assert_eq!( - datanode_inserts[0], - new_split_insert_request(vec![Some(1), None], RegionId::new(1, 3)) - ); - assert_eq!( - datanode_inserts[1], - new_split_insert_request(vec![Some(2), None], RegionId::new(1, 3)) - ); - } -} diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 5dd76f7eab6c..d24598fbd1c2 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -15,6 +15,7 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; +use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_meta::table_name::TableName; @@ -133,3 +134,41 @@ impl GrpcQueryHandler for Instance { Ok(output) } } + +impl Instance { + pub async fn handle_inserts( + &self, + requests: InsertRequests, + ctx: QueryContextRef, + ) -> Result { + self.inserter + .handle_column_inserts(requests, ctx, self.statement_executor.as_ref()) + .await + } + + pub async fn handle_row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + self.inserter + .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) + .await + } + + pub async fn handle_deletes( + &self, + requests: DeleteRequests, + ctx: QueryContextRef, + ) -> Result { + self.deleter.handle_column_deletes(requests, ctx).await + } + + pub async fn handle_row_deletes( + &self, + requests: RowDeleteRequests, + ctx: QueryContextRef, + ) -> Result { + self.deleter.handle_row_deletes(requests, ctx).await + } +} diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index fb6b4a205349..9e3881ecf504 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::v1::meta::Partition; -use api::v1::region::{region_request, QueryRequest, RegionRequest}; +use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use async_trait::async_trait; use client::error::{HandleRequestSnafu, Result as ClientResult}; use client::region::check_response_header; @@ -31,12 +31,11 @@ use common_meta::sequence::{Sequence, SequenceRef}; use common_recordbatch::SendableRecordBatchStream; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; -use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; use table::metadata::RawTableInfo; -use crate::error::InvokeRegionServerSnafu; +use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; const TABLE_ID_SEQ: &str = "table_id"; @@ -45,31 +44,24 @@ pub(crate) struct StandaloneRegionRequestHandler { } impl StandaloneRegionRequestHandler { - #[allow(dead_code)] pub fn arc(region_server: RegionServer) -> Arc { Arc::new(Self { region_server }) } -} -#[async_trait] -impl RegionRequestHandler for StandaloneRegionRequestHandler { - async fn handle( - &self, - request: region_request::Body, - _ctx: QueryContextRef, - ) -> ClientResult { - let response = self - .region_server - .handle(request) + async fn handle_inner(&self, request: RegionRequest) -> Result { + let body = request.body.with_context(|| InvalidRegionRequestSnafu { + reason: "body not found", + })?; + + self.region_server + .handle(body) .await .context(InvokeRegionServerSnafu) - .map_err(BoxedError::new) - .context(HandleRequestSnafu)?; - - check_response_header(response.header)?; - Ok(response.affected_rows) } +} +#[async_trait] +impl RegionRequestHandler for StandaloneRegionRequestHandler { async fn do_get(&self, request: QueryRequest) -> ClientResult { self.region_server .handle_read(request) @@ -79,26 +71,22 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler { } } -pub(crate) struct StandaloneDatanode(pub(crate) RegionServer); - #[async_trait] -impl Datanode for StandaloneDatanode { +impl Datanode for StandaloneRegionRequestHandler { async fn handle(&self, request: RegionRequest) -> MetaResult { - let body = request.body.context(meta_error::UnexpectedSnafu { - err_msg: "body not found", - })?; - let resp = self - .0 - .handle(body) + let response = self + .handle_inner(request) .await .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - - Ok(resp.affected_rows) + check_response_header(response.header) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; + Ok(response.affected_rows) } async fn handle_query(&self, request: QueryRequest) -> MetaResult { - self.0 + self.region_server .handle_read(request) .await .map_err(BoxedError::new) @@ -111,7 +99,7 @@ pub struct StandaloneDatanodeManager(pub RegionServer); #[async_trait] impl DatanodeManager for StandaloneDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { - Arc::new(StandaloneDatanode(self.0.clone())) + StandaloneRegionRequestHandler::arc(self.0.clone()) } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 52d355549ad0..bfe341b615e8 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -24,6 +24,7 @@ pub mod heartbeat; pub(crate) mod insert; pub mod instance; pub(crate) mod metrics; +pub(crate) mod region_req_factory; pub(crate) mod req_convert; mod script; mod server; diff --git a/src/frontend/src/region_req_factory.rs b/src/frontend/src/region_req_factory.rs new file mode 100644 index 000000000000..d033216bb17e --- /dev/null +++ b/src/frontend/src/region_req_factory.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::region_request::Body; +use api::v1::region::{ + DeleteRequests as RegionDeleteRequests, InsertRequests as RegionInsertRequests, RegionRequest, + RegionRequestHeader, +}; + +pub struct RegionRequestFactory { + header: RegionRequestHeader, +} + +impl RegionRequestFactory { + pub fn new(header: RegionRequestHeader) -> Self { + Self { header } + } + + pub fn build_insert(&self, requests: RegionInsertRequests) -> RegionRequest { + RegionRequest { + header: Some(self.header.clone()), + body: Some(Body::Inserts(requests)), + } + } + + pub fn build_delete(&self, requests: RegionDeleteRequests) -> RegionRequest { + RegionRequest { + header: Some(self.header.clone()), + body: Some(Body::Deletes(requests)), + } + } +} diff --git a/src/frontend/src/req_convert/common.rs b/src/frontend/src/req_convert/common.rs index 73e5c4eadcd2..00a07fdae473 100644 --- a/src/frontend/src/req_convert/common.rs +++ b/src/frontend/src/req_convert/common.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod partitioner; + use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; diff --git a/src/frontend/src/req_convert/common/partitioner.rs b/src/frontend/src/req_convert/common/partitioner.rs new file mode 100644 index 000000000000..b9f2117c7bab --- /dev/null +++ b/src/frontend/src/req_convert/common/partitioner.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::{DeleteRequest, InsertRequest}; +use api::v1::Rows; +use partition::manager::PartitionRuleManager; +use snafu::ResultExt; +use store_api::storage::{RegionId, TableId}; + +use crate::error::{Result, SplitDeleteSnafu, SplitInsertSnafu}; + +pub struct Partitioner<'a> { + partition_manager: &'a PartitionRuleManager, +} + +impl<'a> Partitioner<'a> { + pub fn new(partition_manager: &'a PartitionRuleManager) -> Self { + Self { partition_manager } + } + + pub async fn partition_insert_requests( + &self, + table_id: TableId, + rows: Rows, + ) -> Result> { + let requests = self + .partition_manager + .split_rows(table_id, rows) + .await + .context(SplitInsertSnafu)? + .into_iter() + .map(|(region_number, rows)| InsertRequest { + region_id: RegionId::new(table_id, region_number).into(), + rows: Some(rows), + }) + .collect(); + Ok(requests) + } + + pub async fn partition_delete_requests( + &self, + table_id: TableId, + rows: Rows, + ) -> Result> { + let requests = self + .partition_manager + .split_rows(table_id, rows) + .await + .context(SplitDeleteSnafu)? + .into_iter() + .map(|(region_number, rows)| DeleteRequest { + region_id: RegionId::new(table_id, region_number).into(), + rows: Some(rows), + }) + .collect(); + Ok(requests) + } +} diff --git a/src/frontend/src/req_convert/delete/column_to_row.rs b/src/frontend/src/req_convert/delete/column_to_row.rs index 7e1cc3fda4b9..610d9be48e2f 100644 --- a/src/frontend/src/req_convert/delete/column_to_row.rs +++ b/src/frontend/src/req_convert/delete/column_to_row.rs @@ -35,6 +35,6 @@ fn request_column_to_row(request: DeleteRequest) -> Result { Ok(RowDeleteRequest { table_name: request.table_name, rows: Some(rows), - region_number: request.region_number, + region_number: 0, // FIXME(zhongzc): deprecated field }) } diff --git a/src/frontend/src/req_convert/delete/row_to_region.rs b/src/frontend/src/req_convert/delete/row_to_region.rs index 5d09beffdf79..e69442115206 100644 --- a/src/frontend/src/req_convert/delete/row_to_region.rs +++ b/src/frontend/src/req_convert/delete/row_to_region.rs @@ -12,27 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{ - DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests, -}; +use api::v1::region::DeleteRequests as RegionDeleteRequests; use api::v1::RowDeleteRequests; use catalog::CatalogManager; +use partition::manager::PartitionRuleManager; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; use table::TableRef; use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu}; +use crate::req_convert::common::partitioner::Partitioner; pub struct RowToRegion<'a> { catalog_manager: &'a dyn CatalogManager, + partition_manager: &'a PartitionRuleManager, ctx: &'a QueryContext, } impl<'a> RowToRegion<'a> { - pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self { + pub fn new( + catalog_manager: &'a dyn CatalogManager, + partition_manager: &'a PartitionRuleManager, + ctx: &'a QueryContext, + ) -> Self { Self { catalog_manager, + partition_manager, ctx, } } @@ -41,13 +46,13 @@ impl<'a> RowToRegion<'a> { let mut region_request = Vec::with_capacity(requests.deletes.len()); for request in requests.deletes { let table = self.get_table(&request.table_name).await?; + let table_id = table.table_info().table_id(); - let region_id = RegionId::new(table.table_info().table_id(), request.region_number); - let insert_request = RegionDeleteRequest { - region_id: region_id.into(), - rows: request.rows, - }; - region_request.push(insert_request); + let requests = Partitioner::new(self.partition_manager) + .partition_delete_requests(table_id, request.rows.unwrap_or_default()) + .await?; + + region_request.extend(requests); } Ok(RegionDeleteRequests { @@ -63,7 +68,11 @@ impl<'a> RowToRegion<'a> { .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), + table_name: common_catalog::format_full_table_name( + catalog_name, + schema_name, + table_name, + ), }) } } diff --git a/src/frontend/src/req_convert/delete/table_to_region.rs b/src/frontend/src/req_convert/delete/table_to_region.rs index ab51de97e083..fd5e983458f6 100644 --- a/src/frontend/src/req_convert/delete/table_to_region.rs +++ b/src/frontend/src/req_convert/delete/table_to_region.rs @@ -12,37 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{ - DeleteRequest as RegionDeleteRequest, DeleteRequests as RegionDeleteRequests, -}; +use api::v1::region::DeleteRequests as RegionDeleteRequests; use api::v1::Rows; -use store_api::storage::RegionId; +use partition::manager::PartitionRuleManager; use table::metadata::TableInfo; use table::requests::DeleteRequest as TableDeleteRequest; use crate::error::Result; +use crate::req_convert::common::partitioner::Partitioner; use crate::req_convert::common::{column_schema, row_count}; pub struct TableToRegion<'a> { table_info: &'a TableInfo, + partition_manager: &'a PartitionRuleManager, } impl<'a> TableToRegion<'a> { - pub fn new(table_info: &'a TableInfo) -> Self { - Self { table_info } + pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self { + Self { + table_info, + partition_manager, + } } - pub fn convert(&self, request: TableDeleteRequest) -> Result { - let region_id = RegionId::new(self.table_info.table_id(), 0).into(); + pub async fn convert(&self, request: TableDeleteRequest) -> Result { let row_count = row_count(&request.key_column_values)?; let schema = column_schema(self.table_info, &request.key_column_values)?; let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count); - Ok(RegionDeleteRequests { - requests: vec![RegionDeleteRequest { - region_id, - rows: Some(Rows { schema, rows }), - }], - }) + let rows = Rows { schema, rows }; + + let requests = Partitioner::new(self.partition_manager) + .partition_delete_requests(self.table_info.table_id(), rows) + .await?; + Ok(RegionDeleteRequests { requests }) } } @@ -51,114 +53,119 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; + use api::v1::region::DeleteRequest as RegionDeleteRequest; use api::v1::value::ValueData; - use api::v1::{ColumnDataType, SemanticType}; + use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use datatypes::prelude::ConcreteDataType; - use datatypes::scalars::ScalarVectorBuilder; - use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema}; - use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder}; - use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; + use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::KvBackendRef; + use datatypes::vectors::{Int32Vector, VectorRef}; + use store_api::storage::RegionId; use super::*; + use crate::table::test::{create_partition_rule_manager, new_test_table_info}; - #[test] - fn test_delete_request_table_to_region() { - let schema = Schema::new(vec![ - DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true), - DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false), - DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ]); - - let table_meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![1, 2]) - .next_column_id(3) - .build() - .unwrap(); + async fn prepare_mocked_backend() -> KvBackendRef { + let backend = Arc::new(MemoryKvBackend::default()); - let table_info = Arc::new( - TableInfoBuilder::default() - .name("demo") - .meta(table_meta) - .table_id(1) - .build() - .unwrap(), - ); + let catalog_manager = CatalogManager::new(backend.clone()); + let schema_manager = SchemaManager::new(backend.clone()); - let delete_request = mock_delete_request(); - let mut request = TableToRegion::new(&table_info) - .convert(delete_request) + catalog_manager + .create(CatalogNameKey::default()) + .await + .unwrap(); + schema_manager + .create(SchemaNameKey::default(), None) + .await .unwrap(); - assert_eq!(request.requests.len(), 1); - verify_region_insert_request(request.requests.pop().unwrap()); + backend } - fn mock_delete_request() -> TableDeleteRequest { - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let host = builder.to_vector(); + #[tokio::test] + async fn test_delete_request_table_to_region() { + // region to datanode placement: + // 1 -> 1 + // 2 -> 2 + // 3 -> 3 + // + // region value ranges: + // 1 -> [50, max) + // 2 -> [10, 50) + // 3 -> (min, 10) + + let backend = prepare_mocked_backend().await; + let partition_manager = create_partition_rule_manager(backend.clone()).await; + let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()); + + let converter = TableToRegion::new(&table_info, &partition_manager); + + let table_request = build_table_request(Arc::new(Int32Vector::from(vec![ + Some(1), + None, + Some(11), + Some(101), + ]))); + + let region_requests = converter.convert(table_request).await.unwrap(); + let mut region_id_to_region_requests = region_requests + .requests + .into_iter() + .map(|r| (r.region_id, r)) + .collect::>(); + + let region_id = RegionId::new(1, 1).as_u64(); + let region_request = region_id_to_region_requests.remove(®ion_id).unwrap(); + assert_eq!( + region_request, + build_region_request(vec![Some(101)], region_id) + ); - let mut builder = Int16VectorBuilder::with_capacity(3); - builder.push(Some(1_i16)); - builder.push(Some(2_i16)); - builder.push(Some(3_i16)); - let id = builder.to_vector(); + let region_id = RegionId::new(1, 2).as_u64(); + let region_request = region_id_to_region_requests.remove(®ion_id).unwrap(); + assert_eq!( + region_request, + build_region_request(vec![Some(11)], region_id) + ); - let key_column_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]); + let region_id = RegionId::new(1, 3).as_u64(); + let region_request = region_id_to_region_requests.remove(®ion_id).unwrap(); + assert_eq!( + region_request, + build_region_request(vec![Some(1), None], region_id) + ); + } + fn build_table_request(vector: VectorRef) -> TableDeleteRequest { TableDeleteRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - key_column_values, + table_name: "table_1".to_string(), + key_column_values: HashMap::from([("a".to_string(), vector)]), } } - fn verify_region_insert_request(request: RegionDeleteRequest) { - assert_eq!(request.region_id, RegionId::new(1, 0).as_u64()); - - let rows = request.rows.unwrap(); - for (i, column) in rows.schema.iter().enumerate() { - let name = &column.column_name; - if name == "id" { - assert_eq!(ColumnDataType::Int16 as i32, column.datatype); - assert_eq!(SemanticType::Tag as i32, column.semantic_type); - let values = rows - .rows - .iter() - .map(|row| row.values[i].value_data.clone()) - .collect::>(); - assert_eq!( - vec![ - Some(ValueData::I16Value(1)), - Some(ValueData::I16Value(2)), - Some(ValueData::I16Value(3)) - ], - values - ); - } - if name == "host" { - assert_eq!(ColumnDataType::String as i32, column.datatype); - assert_eq!(SemanticType::Tag as i32, column.semantic_type); - let values = rows - .rows - .iter() - .map(|row| row.values[i].value_data.clone()) - .collect::>(); - assert_eq!( - vec![ - Some(ValueData::StringValue("host1".to_string())), - None, - Some(ValueData::StringValue("host3".to_string())) - ], - values - ); - } + fn build_region_request(rows: Vec>, region_id: u64) -> RegionDeleteRequest { + RegionDeleteRequest { + region_id, + rows: Some(Rows { + schema: vec![ColumnSchema { + column_name: "a".to_string(), + datatype: ColumnDataType::Int32 as i32, + semantic_type: SemanticType::Tag as i32, + }], + rows: rows + .into_iter() + .map(|v| Row { + values: vec![Value { + value_data: v.map(ValueData::I32Value), + }], + }) + .collect(), + }), } } } diff --git a/src/frontend/src/req_convert/insert/column_to_row.rs b/src/frontend/src/req_convert/insert/column_to_row.rs index f3d4c50b366b..adc129219666 100644 --- a/src/frontend/src/req_convert/insert/column_to_row.rs +++ b/src/frontend/src/req_convert/insert/column_to_row.rs @@ -35,6 +35,6 @@ fn request_column_to_row(request: InsertRequest) -> Result { Ok(RowInsertRequest { table_name: request.table_name, rows: Some(rows), - region_number: request.region_number, + region_number: 0, // FIXME(zhongzc): deprecated field }) } diff --git a/src/frontend/src/req_convert/insert/row_to_region.rs b/src/frontend/src/req_convert/insert/row_to_region.rs index 5cd82a4b6396..388b13e9addd 100644 --- a/src/frontend/src/req_convert/insert/row_to_region.rs +++ b/src/frontend/src/req_convert/insert/row_to_region.rs @@ -12,27 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{ - InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, -}; +use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::RowInsertRequests; use catalog::CatalogManager; +use partition::manager::PartitionRuleManager; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; use table::TableRef; use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu}; +use crate::req_convert::common::partitioner::Partitioner; pub struct RowToRegion<'a> { catalog_manager: &'a dyn CatalogManager, + partition_manager: &'a PartitionRuleManager, ctx: &'a QueryContext, } impl<'a> RowToRegion<'a> { - pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self { + pub fn new( + catalog_manager: &'a dyn CatalogManager, + partition_manager: &'a PartitionRuleManager, + ctx: &'a QueryContext, + ) -> Self { Self { catalog_manager, + partition_manager, ctx, } } @@ -41,13 +46,13 @@ impl<'a> RowToRegion<'a> { let mut region_request = Vec::with_capacity(requests.inserts.len()); for request in requests.inserts { let table = self.get_table(&request.table_name).await?; + let table_id = table.table_info().table_id(); - let region_id = RegionId::new(table.table_info().table_id(), request.region_number); - let insert_request = RegionInsertRequest { - region_id: region_id.into(), - rows: request.rows, - }; - region_request.push(insert_request); + let requests = Partitioner::new(self.partition_manager) + .partition_insert_requests(table_id, request.rows.unwrap_or_default()) + .await?; + + region_request.extend(requests); } Ok(RegionInsertRequests { @@ -63,7 +68,11 @@ impl<'a> RowToRegion<'a> { .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), + table_name: common_catalog::format_full_table_name( + catalog_name, + schema_name, + table_name, + ), }) } } diff --git a/src/frontend/src/req_convert/insert/stmt_to_region.rs b/src/frontend/src/req_convert/insert/stmt_to_region.rs index 167e23fbe7ff..389640659294 100644 --- a/src/frontend/src/req_convert/insert/stmt_to_region.rs +++ b/src/frontend/src/req_convert/insert/stmt_to_region.rs @@ -13,18 +13,16 @@ // limitations under the License. use api::helper::value_to_grpc_value; -use api::v1::region::{ - InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, -}; +use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue}; use catalog::CatalogManager; use datatypes::schema::{ColumnSchema, SchemaRef}; +use partition::manager::PartitionRuleManager; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements; use sql::statements::insert::Insert; use sqlparser::ast::{ObjectName, Value as SqlValue}; -use store_api::storage::RegionId; use table::TableRef; use super::{data_type, semantic_type}; @@ -32,18 +30,25 @@ use crate::error::{ CatalogSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu, ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result, TableNotFoundSnafu, }; +use crate::req_convert::common::partitioner::Partitioner; const DEFAULT_PLACEHOLDER_VALUE: &str = "default"; pub struct StatementToRegion<'a> { catalog_manager: &'a dyn CatalogManager, + partition_manager: &'a PartitionRuleManager, ctx: &'a QueryContext, } impl<'a> StatementToRegion<'a> { - pub fn new(catalog_manager: &'a dyn CatalogManager, ctx: &'a QueryContext) -> Self { + pub fn new( + catalog_manager: &'a dyn CatalogManager, + partition_manager: &'a PartitionRuleManager, + ctx: &'a QueryContext, + ) -> Self { Self { catalog_manager, + partition_manager, ctx, } } @@ -63,7 +68,7 @@ impl<'a> StatementToRegion<'a> { ensure!( sql_rows.iter().all(|row| row.len() == column_count), InvalidSqlSnafu { - err_msg: "The column count of the row is not the same as columns." + err_msg: "column count mismatch" } ); @@ -98,12 +103,10 @@ impl<'a> StatementToRegion<'a> { } } - Ok(RegionInsertRequests { - requests: vec![RegionInsertRequest { - region_id: RegionId::new(table_info.table_id(), 0).into(), - rows: Some(Rows { schema, rows }), - }], - }) + let requests = Partitioner::new(self.partition_manager) + .partition_insert_requests(table_info.table_id(), Rows { schema, rows }) + .await?; + Ok(RegionInsertRequests { requests }) } async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result { @@ -112,7 +115,7 @@ impl<'a> StatementToRegion<'a> { .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: format!("{}.{}.{}", catalog, schema, table), + table_name: common_catalog::format_full_table_name(catalog, schema, table), }) } diff --git a/src/frontend/src/req_convert/insert/table_to_region.rs b/src/frontend/src/req_convert/insert/table_to_region.rs index 6ea09b3ad4e2..3160a58d0396 100644 --- a/src/frontend/src/req_convert/insert/table_to_region.rs +++ b/src/frontend/src/req_convert/insert/table_to_region.rs @@ -12,37 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::region::{ - InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests, -}; +use api::v1::region::InsertRequests as RegionInsertRequests; use api::v1::Rows; -use store_api::storage::RegionId; +use partition::manager::PartitionRuleManager; use table::metadata::TableInfo; use table::requests::InsertRequest as TableInsertRequest; use crate::error::Result; +use crate::req_convert::common::partitioner::Partitioner; use crate::req_convert::common::{column_schema, row_count}; pub struct TableToRegion<'a> { table_info: &'a TableInfo, + partition_manager: &'a PartitionRuleManager, } impl<'a> TableToRegion<'a> { - pub fn new(table_info: &'a TableInfo) -> Self { - Self { table_info } + pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self { + Self { + table_info, + partition_manager, + } } - pub fn convert(&self, request: TableInsertRequest) -> Result { - let region_id = RegionId::new(self.table_info.table_id(), request.region_number).into(); + pub async fn convert(&self, request: TableInsertRequest) -> Result { let row_count = row_count(&request.columns_values)?; let schema = column_schema(self.table_info, &request.columns_values)?; let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count); - Ok(RegionInsertRequests { - requests: vec![RegionInsertRequest { - region_id, - rows: Some(Rows { schema, rows }), - }], - }) + + let rows = Rows { schema, rows }; + let requests = Partitioner::new(self.partition_manager) + .partition_insert_requests(self.table_info.table_id(), rows) + .await?; + Ok(RegionInsertRequests { requests }) } } @@ -51,115 +53,120 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; + use api::v1::region::InsertRequest as RegionInsertRequest; use api::v1::value::ValueData; - use api::v1::{ColumnDataType, SemanticType}; + use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use datatypes::prelude::ConcreteDataType; - use datatypes::scalars::ScalarVectorBuilder; - use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema}; - use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder}; - use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; + use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::KvBackendRef; + use datatypes::vectors::{Int32Vector, VectorRef}; + use store_api::storage::RegionId; use super::*; + use crate::table::test::{create_partition_rule_manager, new_test_table_info}; - #[test] - fn test_insert_request_table_to_region() { - let schema = Schema::new(vec![ - DtColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) - .with_time_index(true), - DtColumnSchema::new("id", ConcreteDataType::int16_datatype(), false), - DtColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ]); - - let table_meta = TableMetaBuilder::default() - .schema(Arc::new(schema)) - .primary_key_indices(vec![2]) - .next_column_id(3) - .build() - .unwrap(); + async fn prepare_mocked_backend() -> KvBackendRef { + let backend = Arc::new(MemoryKvBackend::default()); - let table_info = Arc::new( - TableInfoBuilder::default() - .name("demo") - .meta(table_meta) - .table_id(1) - .build() - .unwrap(), - ); + let catalog_manager = CatalogManager::new(backend.clone()); + let schema_manager = SchemaManager::new(backend.clone()); - let insert_request = mock_insert_request(); - let mut request = TableToRegion::new(&table_info) - .convert(insert_request) + catalog_manager + .create(CatalogNameKey::default()) + .await + .unwrap(); + schema_manager + .create(SchemaNameKey::default(), None) + .await .unwrap(); - assert_eq!(request.requests.len(), 1); - verify_region_insert_request(request.requests.pop().unwrap()); + backend } - fn mock_insert_request() -> TableInsertRequest { - let mut builder = StringVectorBuilder::with_capacity(3); - builder.push(Some("host1")); - builder.push(None); - builder.push(Some("host3")); - let host = builder.to_vector(); + #[tokio::test] + async fn test_insert_request_table_to_region() { + // region to datanode placement: + // 1 -> 1 + // 2 -> 2 + // 3 -> 3 + // + // region value ranges: + // 1 -> [50, max) + // 2 -> [10, 50) + // 3 -> (min, 10) + + let backend = prepare_mocked_backend().await; + let partition_manager = create_partition_rule_manager(backend.clone()).await; + let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()); + + let converter = TableToRegion::new(&table_info, &partition_manager); + + let table_request = build_table_request(Arc::new(Int32Vector::from(vec![ + Some(1), + None, + Some(11), + Some(101), + ]))); + + let region_requests = converter.convert(table_request).await.unwrap(); + let mut region_id_to_region_requests = region_requests + .requests + .into_iter() + .map(|r| (r.region_id, r)) + .collect::>(); + + let region_id = RegionId::new(1, 1).as_u64(); + let region_request = region_id_to_region_requests.remove(®ion_id).unwrap(); + assert_eq!( + region_request, + build_region_request(vec![Some(101)], region_id) + ); - let mut builder = Int16VectorBuilder::with_capacity(3); - builder.push(Some(1_i16)); - builder.push(Some(2_i16)); - builder.push(Some(3_i16)); - let id = builder.to_vector(); + let region_id = RegionId::new(1, 2).as_u64(); + let region_request = region_id_to_region_requests.remove(®ion_id).unwrap(); + assert_eq!( + region_request, + build_region_request(vec![Some(11)], region_id) + ); - let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]); + let region_id = RegionId::new(1, 3).as_u64(); + let region_request = region_id_to_region_requests.remove(®ion_id).unwrap(); + assert_eq!( + region_request, + build_region_request(vec![Some(1), None], region_id) + ); + } + fn build_table_request(vector: VectorRef) -> TableInsertRequest { TableInsertRequest { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - columns_values, + table_name: "table_1".to_string(), + columns_values: HashMap::from([("a".to_string(), vector)]), region_number: 0, } } - fn verify_region_insert_request(request: RegionInsertRequest) { - assert_eq!(request.region_id, RegionId::new(1, 0).as_u64()); - - let rows = request.rows.unwrap(); - for (i, column) in rows.schema.iter().enumerate() { - let name = &column.column_name; - if name == "id" { - assert_eq!(ColumnDataType::Int16 as i32, column.datatype); - assert_eq!(SemanticType::Field as i32, column.semantic_type); - let values = rows - .rows - .iter() - .map(|row| row.values[i].value_data.clone()) - .collect::>(); - assert_eq!( - vec![ - Some(ValueData::I16Value(1)), - Some(ValueData::I16Value(2)), - Some(ValueData::I16Value(3)) - ], - values - ); - } - if name == "host" { - assert_eq!(ColumnDataType::String as i32, column.datatype); - assert_eq!(SemanticType::Tag as i32, column.semantic_type); - let values = rows - .rows - .iter() - .map(|row| row.values[i].value_data.clone()) - .collect::>(); - assert_eq!( - vec![ - Some(ValueData::StringValue("host1".to_string())), - None, - Some(ValueData::StringValue("host3".to_string())) - ], - values - ); - } + fn build_region_request(rows: Vec>, region_id: u64) -> RegionInsertRequest { + RegionInsertRequest { + region_id, + rows: Some(Rows { + schema: vec![ColumnSchema { + column_name: "a".to_string(), + datatype: ColumnDataType::Int32 as i32, + semantic_type: SemanticType::Tag as i32, + }], + rows: rows + .into_iter() + .map(|v| Row { + values: vec![Value { + value_data: v.map(ValueData::I32Value), + }], + }) + .collect(), + }), } } } diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index 7ff2228c7a25..ae71a6b31a1f 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -25,9 +25,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use api::v1::region::region_request; use catalog::CatalogManagerRef; -use client::region_handler::RegionRequestHandlerRef; use common_error::ext::BoxedError; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::DdlTaskExecutorRef; @@ -47,16 +45,15 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; use table::engine::TableReference; -use table::requests::{ - CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest, -}; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use table::TableRef; +use crate::delete::DeleterRef; use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu, - RequestDatanodeSnafu, Result, TableNotFoundSnafu, + Result, TableNotFoundSnafu, }; -use crate::req_convert::{delete, insert}; +use crate::insert::InserterRef; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; use crate::table::table_idents_to_full_name; @@ -64,30 +61,33 @@ use crate::table::table_idents_to_full_name; pub struct StatementExecutor { catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, - region_request_handler: RegionRequestHandlerRef, ddl_executor: DdlTaskExecutorRef, table_metadata_manager: TableMetadataManagerRef, partition_manager: PartitionRuleManagerRef, cache_invalidator: CacheInvalidatorRef, + inserter: InserterRef, + deleter: DeleterRef, } impl StatementExecutor { pub(crate) fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, - region_request_handler: RegionRequestHandlerRef, ddl_task_executor: DdlTaskExecutorRef, kv_backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef, + inserter: InserterRef, + deleter: DeleterRef, ) -> Self { Self { catalog_manager, query_engine, - region_request_handler, ddl_executor: ddl_task_executor, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)), cache_invalidator, + inserter, + deleter, } } @@ -224,50 +224,6 @@ impl StatementExecutor { table_name: table_ref.to_string(), }) } - - async fn handle_table_insert_request( - &self, - request: InsertRequest, - query_ctx: QueryContextRef, - ) -> Result { - let table_ref = TableReference::full( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ); - let table = self.get_table(&table_ref).await?; - let table_info = table.table_info(); - - let request = insert::TableToRegion::new(&table_info).convert(request)?; - let affected_rows = self - .region_request_handler - .handle(region_request::Body::Inserts(request), query_ctx) - .await - .context(RequestDatanodeSnafu)?; - Ok(affected_rows as _) - } - - async fn handle_table_delete_request( - &self, - request: DeleteRequest, - query_ctx: QueryContextRef, - ) -> Result { - let table_ref = TableReference::full( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ); - let table = self.get_table(&table_ref).await?; - let table_info = table.table_info(); - - let request = delete::TableToRegion::new(&table_info).convert(request)?; - let affected_rows = self - .region_request_handler - .handle(region_request::Body::Deletes(request), query_ctx) - .await - .context(RequestDatanodeSnafu)?; - Ok(affected_rows as _) - } } fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result { diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 034443f0ec1d..cb36d41823d3 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -327,13 +327,12 @@ impl StatementExecutor { .zip(vectors) .collect::>(); - pending.push(self.handle_table_insert_request( + pending.push(self.inserter.handle_table_insert( InsertRequest { catalog_name: req.catalog_name.to_string(), schema_name: req.schema_name.to_string(), table_name: req.table_name.to_string(), columns_values, - // TODO: support multi-regions region_number: 0, }, query_ctx.clone(), diff --git a/src/frontend/src/statement/dml.rs b/src/frontend/src/statement/dml.rs index d5730fda32e1..f127dda4d48b 100644 --- a/src/frontend/src/statement/dml.rs +++ b/src/frontend/src/statement/dml.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; -use api::v1::region::region_request; use common_query::Output; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; @@ -35,23 +34,16 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, MissingTimeIndexColumnSnafu, - ReadRecordBatchSnafu, RequestDatanodeSnafu, Result, UnexpectedSnafu, + ReadRecordBatchSnafu, Result, UnexpectedSnafu, }; -use crate::req_convert::insert::StatementToRegion; impl StatementExecutor { pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { if insert.can_extract_values() { // Fast path: plain insert ("insert with literal values") is executed directly - let request = StatementToRegion::new(self.catalog_manager.as_ref(), &query_ctx) - .convert(&insert) - .await?; - let affected_rows = self - .region_request_handler - .handle(region_request::Body::Inserts(request), query_ctx) + self.inserter + .handle_statement_insert(insert.as_ref(), &query_ctx) .await - .context(RequestDatanodeSnafu)?; - Ok(Output::AffectedRows(affected_rows as _)) } else { // Slow path: insert with subquery. Execute the subquery first, via query engine. Then // insert the results by sending insert requests. @@ -82,7 +74,8 @@ impl StatementExecutor { let insert_request = build_insert_request(record_batch, table.schema(), &table_info)?; affected_rows += self - .handle_table_insert_request(insert_request, query_ctx.clone()) + .inserter + .handle_table_insert(insert_request, query_ctx.clone()) .await?; } @@ -114,13 +107,14 @@ impl StatementExecutor { let table_info = table.table_info(); while let Some(batch) = stream.next().await { let record_batch = batch.context(ReadRecordBatchSnafu)?; - let delete_request = build_delete_request(record_batch, table.schema(), &table_info)?; + let request = build_delete_request(record_batch, table.schema(), &table_info)?; affected_rows += self - .handle_table_delete_request(delete_request, query_ctx.clone()) + .deleter + .handle_table_delete(request, query_ctx.clone()) .await?; } - Ok(Output::AffectedRows(affected_rows)) + Ok(Output::AffectedRows(affected_rows as _)) } async fn execute_dml_subquery( diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 8a19e3627d45..19e33ace2345 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -116,20 +116,20 @@ pub(crate) mod test { use super::*; - fn new_test_table_info( + pub fn new_test_table_info( table_id: u32, table_name: &str, region_numbers: impl Iterator, ) -> TableInfo { let column_schemas = vec![ - ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), ColumnSchema::new( "ts", ConcreteDataType::timestamp_millisecond_datatype(), false, ) .with_time_index(true), - ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("b", ConcreteDataType::int32_datatype(), true), ]; let schema = SchemaBuilder::try_from(column_schemas) .unwrap() diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index a340fb205bf2..4e5b26e1f532 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -34,7 +34,7 @@ pub mod procedure; pub mod pubsub; pub mod selector; pub mod service; -pub mod table_creator; +pub mod table_meta_alloc; pub mod table_routes; pub use crate::error::Result; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ae308a8c7607..1c630a30ab59 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -54,7 +54,7 @@ use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; -use crate::table_creator::MetaSrvTableMetadataAllocator; +use crate::table_meta_alloc::MetaSrvTableMetadataAllocator; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { @@ -366,7 +366,7 @@ fn build_ddl_manager( }, )); - let table_creator = Arc::new(MetaSrvTableMetadataAllocator::new( + let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new( selector_ctx.clone(), selector.clone(), table_id_sequence.clone(), @@ -377,7 +377,7 @@ fn build_ddl_manager( datanode_clients, cache_invalidator, table_metadata_manager.clone(), - table_creator, + table_meta_allocator, )) } diff --git a/src/meta-srv/src/table_creator.rs b/src/meta-srv/src/table_meta_alloc.rs similarity index 100% rename from src/meta-srv/src/table_creator.rs rename to src/meta-srv/src/table_meta_alloc.rs diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 297447f7942b..cb02db0d22ee 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use api::v1::region::{DeleteRequest, InsertRequest}; +use api::v1::Rows; use common_meta::key::table_route::TableRouteManager; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; @@ -31,7 +31,7 @@ use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; -use crate::splitter::{DeleteRequestSplits, InsertRequestSplits, RowSplitter}; +use crate::splitter::RowSplitter; use crate::{error, PartitionRuleRef}; #[async_trait::async_trait] @@ -247,26 +247,24 @@ impl PartitionRuleManager { Ok(regions) } - /// Split [InsertRequest] into [InsertRequestSplits] according to the partition rule - /// of given table. - pub async fn split_insert_request( - &self, - table: TableId, - req: InsertRequest, - ) -> Result { - let partition_rule = self.find_table_partition_rule(table).await?; - RowSplitter::new(partition_rule).split_insert(req) + pub async fn find_region_leader(&self, region_id: RegionId) -> Result { + let table_route = self.find_table_route(region_id.table_id()).await?; + let peer = table_route + .find_region_leader(region_id.region_number()) + .with_context(|| FindLeaderSnafu { + region_id, + table_id: region_id.table_id(), + })?; + Ok(peer.clone()) } - /// Split [DeleteRequest] into [DeleteRequestSplits] according to the partition rule - /// of given table. - pub async fn split_delete_request( + pub async fn split_rows( &self, - table: TableId, - req: DeleteRequest, - ) -> Result { - let partition_rule = self.find_table_partition_rule(table).await?; - RowSplitter::new(partition_rule).split_delete(req) + table_id: TableId, + rows: Rows, + ) -> Result> { + let partition_rule = self.find_table_partition_rule(table_id).await?; + RowSplitter::new(partition_rule).split(rows) } } diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 2bf2b0ce1b17..f7c64d8ba117 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -15,17 +15,13 @@ use std::collections::HashMap; use api::helper; -use api::v1::region::{DeleteRequest, InsertRequest}; use api::v1::{ColumnSchema, Row, Rows}; use datatypes::value::Value; -use store_api::storage::{RegionId, RegionNumber}; +use store_api::storage::RegionNumber; use crate::error::Result; use crate::PartitionRuleRef; -pub type InsertRequestSplits = HashMap; -pub type DeleteRequestSplits = HashMap; - pub struct RowSplitter { partition_rule: PartitionRuleRef, } @@ -35,43 +31,8 @@ impl RowSplitter { Self { partition_rule } } - pub fn split_insert(&self, req: InsertRequest) -> Result { - let table_id = RegionId::from_u64(req.region_id).table_id(); - Ok(self - .split(req.rows)? - .into_iter() - .map(|(region_number, rows)| { - let region_id = RegionId::new(table_id, region_number); - let req = InsertRequest { - rows: Some(rows), - region_id: region_id.into(), - }; - (region_number, req) - }) - .collect()) - } - - pub fn split_delete(&self, req: DeleteRequest) -> Result { - let table_id = RegionId::from_u64(req.region_id).table_id(); - Ok(self - .split(req.rows)? - .into_iter() - .map(|(region_number, rows)| { - let region_id = RegionId::new(table_id, region_number); - let req = DeleteRequest { - rows: Some(rows), - region_id: region_id.into(), - }; - (region_number, req) - }) - .collect()) - } - - fn split(&self, rows: Option) -> Result> { + pub fn split(&self, rows: Rows) -> Result> { // No data - let Some(rows) = rows else { - return Ok(HashMap::new()); - }; if rows.rows.is_empty() { return Ok(HashMap::new()); } @@ -177,7 +138,7 @@ mod tests { use crate::partition::PartitionExpr; use crate::PartitionRule; - fn mock_insert_request() -> InsertRequest { + fn mock_rows() -> Rows { let schema = vec![ ColumnSchema { column_name: "id".to_string(), @@ -218,10 +179,7 @@ mod tests { ], }, ]; - InsertRequest { - rows: Some(Rows { schema, rows }), - region_id: 0, - } + Rows { schema, rows } } #[derive(Debug, Serialize, Deserialize)] @@ -301,53 +259,42 @@ mod tests { #[test] fn test_writer_splitter() { - let insert_request = mock_insert_request(); + let rows = mock_rows(); let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; let splitter = RowSplitter::new(rule); - let splits = splitter.split_insert(insert_request).unwrap(); + let mut splits = splitter.split(rows).unwrap(); assert_eq!(splits.len(), 2); - let req0 = &splits[&0]; - let req1 = &splits[&1]; - assert_eq!(req0.region_id, 0); - assert_eq!(req1.region_id, 1); - - let rows0 = req0.rows.as_ref().unwrap(); - let rows1 = req1.rows.as_ref().unwrap(); - assert_eq!(rows0.rows.len(), 1); - assert_eq!(rows1.rows.len(), 2); + let rows0 = splits.remove(&0).unwrap().rows; + let rows1 = splits.remove(&1).unwrap().rows; + assert_eq!(rows0.len(), 1); + assert_eq!(rows1.len(), 2); } #[test] fn test_missed_col_writer_splitter() { - let insert_request = mock_insert_request(); + let rows = mock_rows(); let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef; - let splitter = RowSplitter::new(rule); - let splits = splitter.split_insert(insert_request).unwrap(); + let splitter = RowSplitter::new(rule); + let mut splits = splitter.split(rows).unwrap(); assert_eq!(splits.len(), 1); - let req = &splits[&1]; - assert_eq!(req.region_id, 1); - - let rows = req.rows.as_ref().unwrap(); - assert_eq!(rows.rows.len(), 3); + let rows = splits.remove(&1).unwrap().rows; + assert_eq!(rows.len(), 3); } #[test] fn test_empty_partition_rule_writer_splitter() { - let insert_request = mock_insert_request(); + let rows = mock_rows(); let rule = Arc::new(EmptyPartitionRule) as PartitionRuleRef; let splitter = RowSplitter::new(rule); - let splits = splitter.split_insert(insert_request).unwrap(); + let mut splits = splitter.split(rows).unwrap(); assert_eq!(splits.len(), 1); - let req = &splits[&0]; - assert_eq!(req.region_id, 0); - - let rows = req.rows.as_ref().unwrap(); - assert_eq!(rows.rows.len(), 3); + let rows = splits.remove(&0).unwrap().rows; + assert_eq!(rows.len(), 3); } }