diff --git a/Cargo.lock b/Cargo.lock index 519b7af08e94..689d6b030d6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1768,6 +1768,7 @@ dependencies = [ "arc-swap", "async-trait", "chrono-tz 0.6.3", + "common-base", "common-catalog", "common-error", "common-macro", @@ -1900,6 +1901,7 @@ dependencies = [ "base64 0.21.5", "bytes", "chrono", + "common-base", "common-catalog", "common-error", "common-grpc-expr", diff --git a/src/client/src/region.rs b/src/client/src/region.rs index a9a337808e37..819c4453f9a2 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -197,7 +197,7 @@ impl RegionRequester { check_response_header(header)?; - Ok(affected_rows) + Ok(affected_rows as _) } pub async fn handle(&self, request: RegionRequest) -> Result { diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index a111bf3c7376..506c273c1e2d 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -21,6 +21,8 @@ pub mod readable_size; use core::any::Any; use std::sync::{Arc, Mutex, MutexGuard}; +pub type AffectedRows = usize; + pub use bit_vec::BitVec; /// [`Plugins`] is a wrapper of Arc contents. diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 93fef8593cc1..e2714bf99c0c 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -12,6 +12,7 @@ api.workspace = true arc-swap = "1.0" async-trait.workspace = true chrono-tz = "0.6" +common-base.workspace = true common-catalog.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 629f55e32235..61b501be7019 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -15,12 +15,11 @@ use std::sync::Arc; use async_trait::async_trait; +use common_base::AffectedRows; use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; use session::context::QueryContextRef; -use table::requests::{DeleteRequest, InsertRequest}; - -pub type AffectedRows = usize; +use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; /// A trait for handling table mutations in `QueryEngine`. #[async_trait] @@ -30,6 +29,17 @@ pub trait TableMutationHandler: Send + Sync { /// Delete rows from the table. async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result; + + /// Trigger a flush task for table. + async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef) + -> Result; + + /// Trigger a compaction task for table. + async fn compact( + &self, + request: CompactTableRequest, + ctx: QueryContextRef, + ) -> Result; } /// A trait for handling procedure service requests in `QueryEngine`. diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 554b0d6d795d..fdab5eae0371 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -18,6 +18,7 @@ async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true +common-base.workspace = true common-catalog.workspace = true common-error.workspace = true common-grpc-expr.workspace = true diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 198ce6911c35..4795512f25ec 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -15,23 +15,25 @@ use std::sync::Arc; use api::v1::region::{QueryRequest, RegionRequest}; +pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; use crate::peer::Peer; -pub type AffectedRows = u64; - +/// The trait for handling requests to datanode. #[async_trait::async_trait] pub trait Datanode: Send + Sync { /// Handles DML, and DDL requests. async fn handle(&self, request: RegionRequest) -> Result; + /// Handles query requests async fn handle_query(&self, request: QueryRequest) -> Result; } pub type DatanodeRef = Arc; +/// Datanode manager #[async_trait::async_trait] pub trait DatanodeManager: Send + Sync { /// Retrieves a target `datanode`. diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 20a006ebce49..605be9dfe320 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::region::{QueryRequest, RegionRequest}; +pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; @@ -29,8 +30,6 @@ use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; use crate::wal_options_allocator::WalOptionsAllocator; -pub type AffectedRows = u64; - #[async_trait::async_trait] pub trait MockDatanodeHandler: Sync + Send + Clone { async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index c890eeba71fa..c682efa5b962 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -25,6 +25,7 @@ use common_meta::kv_backend::KvBackendRef; use operator::delete::Deleter; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; +use operator::request::Requester; use operator::statement::StatementExecutor; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; @@ -105,6 +106,11 @@ impl FrontendBuilder { datanode_manager.clone(), )); let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager.clone(), + datanode_manager.clone(), + )); + let requester = Arc::new(Requester::new( catalog_manager.clone(), partition_manager, datanode_manager.clone(), @@ -112,6 +118,7 @@ impl FrontendBuilder { let table_mutation_handler = Arc::new(TableMutationOperator::new( inserter.clone(), deleter.clone(), + requester, )); let procedure_service_handler = Arc::new(ProcedureServiceOperator::new( diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 4ff928fe6d4d..28758504257b 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -79,7 +79,7 @@ impl Datanode for RegionInvoker { check_response_header(response.header) .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - Ok(response.affected_rows) + Ok(response.affected_rows as _) } async fn handle_query(&self, request: QueryRequest) -> MetaResult { diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 4b4feb10b0ea..f4e0e20e8c95 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -98,7 +98,7 @@ impl Deleter { &self, request: TableDeleteRequest, ctx: QueryContextRef, - ) -> Result { + ) -> Result { let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table = request.table_name.as_str(); @@ -143,8 +143,8 @@ impl Deleter { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows); + let affected_rows = results.into_iter().sum::>()?; + crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64); Ok(affected_rows) } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 9256d76f74cf..7aae77df9593 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -156,7 +156,7 @@ impl Inserter { &self, request: TableInsertRequest, ctx: QueryContextRef, - ) -> Result { + ) -> Result { let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table_name = request.table_name.as_str(); @@ -219,8 +219,8 @@ impl Inserter { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; - crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows); + let affected_rows = results.into_iter().sum::>()?; + crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64); Ok(affected_rows) } diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index e672b488a9ac..6634bc530401 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -20,6 +20,7 @@ pub mod metrics; pub mod procedure; pub mod region_req_factory; pub mod req_convert; +pub mod request; pub mod statement; pub mod table; #[cfg(test)] diff --git a/src/operator/src/region_req_factory.rs b/src/operator/src/region_req_factory.rs index d033216bb17e..21cc90dbb67a 100644 --- a/src/operator/src/region_req_factory.rs +++ b/src/operator/src/region_req_factory.rs @@ -40,4 +40,11 @@ impl RegionRequestFactory { body: Some(Body::Deletes(requests)), } } + + pub fn build_request(&self, body: Body) -> RegionRequest { + RegionRequest { + header: Some(self.header.clone()), + body: Some(body), + } + } } diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs new file mode 100644 index 000000000000..641dd570f0dc --- /dev/null +++ b/src/operator/src/request.rs @@ -0,0 +1,233 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::region::region_request::Body as RegionRequestBody; +use api::v1::region::{CompactRequest, FlushRequest, RegionRequestHeader}; +use catalog::CatalogManagerRef; +use common_catalog::build_db_string; +use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; +use common_meta::peer::Peer; +use common_telemetry::tracing_context::TracingContext; +use futures_util::future; +use partition::manager::{PartitionInfo, PartitionRuleManagerRef}; +use session::context::QueryContextRef; +use snafu::prelude::*; +use store_api::storage::RegionId; +use table::requests::{CompactTableRequest, FlushTableRequest}; + +use crate::error::{ + CatalogSnafu, FindRegionLeaderSnafu, FindTablePartitionRuleSnafu, JoinTaskSnafu, + RequestInsertsSnafu, Result, TableNotFoundSnafu, +}; +use crate::region_req_factory::RegionRequestFactory; + +/// Region requester which processes flush, compact requests etc. +pub struct Requester { + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, +} + +pub type RequesterRef = Arc; + +impl Requester { + pub fn new( + catalog_manager: CatalogManagerRef, + partition_manager: PartitionRuleManagerRef, + datanode_manager: DatanodeManagerRef, + ) -> Self { + Self { + catalog_manager, + partition_manager, + datanode_manager, + } + } + + /// Handle the request to flush table. + pub async fn handle_table_flush( + &self, + request: FlushTableRequest, + ctx: QueryContextRef, + ) -> Result { + let partitions = self + .get_table_partitions( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await?; + + let requests = partitions + .into_iter() + .map(|partition| { + RegionRequestBody::Flush(FlushRequest { + region_id: partition.id.into(), + }) + }) + .collect(); + + self.do_request( + requests, + Some(build_db_string(&request.catalog_name, &request.schema_name)), + &ctx, + ) + .await + } + + /// Handle the request to compact table. + pub async fn handle_table_compaction( + &self, + request: CompactTableRequest, + ctx: QueryContextRef, + ) -> Result { + let partitions = self + .get_table_partitions( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await?; + + let requests = partitions + .into_iter() + .map(|partition| { + RegionRequestBody::Compact(CompactRequest { + region_id: partition.id.into(), + }) + }) + .collect(); + + self.do_request( + requests, + Some(build_db_string(&request.catalog_name, &request.schema_name)), + &ctx, + ) + .await + } + + /// Handle the request to flush the region. + pub async fn handle_region_flush( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> Result { + let request = RegionRequestBody::Flush(FlushRequest { + region_id: region_id.into(), + }); + + self.do_request(vec![request], None, &ctx).await + } + + /// Handle the request to compact the region. + pub async fn handle_region_compaction( + &self, + region_id: RegionId, + ctx: QueryContextRef, + ) -> Result { + let request = RegionRequestBody::Compact(CompactRequest { + region_id: region_id.into(), + }); + + self.do_request(vec![request], None, &ctx).await + } +} + +impl Requester { + async fn do_request( + &self, + requests: Vec, + db_string: Option, + ctx: &QueryContextRef, + ) -> Result { + let request_factory = RegionRequestFactory::new(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + dbname: db_string.unwrap_or_else(|| ctx.get_db_string()), + }); + + let tasks = self + .group_requests_by_peer(requests) + .await? + .into_iter() + .map(|(peer, body)| { + let request = request_factory.build_request(body); + let datanode_manager = self.datanode_manager.clone(); + common_runtime::spawn_write(async move { + datanode_manager + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestInsertsSnafu) + }) + }); + let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + + Ok(affected_rows) + } + + async fn group_requests_by_peer( + &self, + requests: Vec, + ) -> Result> { + let mut inserts: HashMap = HashMap::new(); + + for req in requests { + let region_id = match &req { + RegionRequestBody::Flush(req) => req.region_id, + RegionRequestBody::Compact(req) => req.region_id, + _ => todo!(), + }; + + let peer = self + .partition_manager + .find_region_leader(region_id.into()) + .await + .context(FindRegionLeaderSnafu)?; + + inserts.insert(peer, req); + } + + Ok(inserts) + } + + async fn get_table_partitions( + &self, + catalog: &str, + schema: &str, + table_name: &str, + ) -> Result> { + let table = self + .catalog_manager + .table(catalog, schema, table_name) + .await + .context(CatalogSnafu)?; + + let table = table.with_context(|| TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name(catalog, schema, table_name), + })?; + let table_info = table.table_info(); + + self.partition_manager + .find_table_partitions(table_info.ident.table_id) + .await + .with_context(|_| FindTablePartitionRuleSnafu { + table_name: common_catalog::format_full_table_name(catalog, schema, table_name), + }) + } +} diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index 38271abb87af..60d96b776360 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -13,18 +13,23 @@ // limitations under the License. use async_trait::async_trait; +use common_base::AffectedRows; use common_error::ext::BoxedError; -use common_function::handlers::{AffectedRows, TableMutationHandler}; +use common_function::handlers::TableMutationHandler; use common_query::error as query_error; use common_query::error::Result as QueryResult; use session::context::QueryContextRef; use snafu::ResultExt; use sqlparser::ast::ObjectName; -use table::requests::{DeleteRequest as TableDeleteRequest, InsertRequest as TableInsertRequest}; +use table::requests::{ + CompactTableRequest, DeleteRequest as TableDeleteRequest, FlushTableRequest, + InsertRequest as TableInsertRequest, +}; use crate::delete::DeleterRef; use crate::error::{InvalidSqlSnafu, Result}; use crate::insert::InserterRef; +use crate::request::RequesterRef; // TODO(LFC): Refactor consideration: move this function to some helper mod, // could be done together or after `TableReference`'s refactoring, when issue #559 is resolved. @@ -60,11 +65,16 @@ pub fn table_idents_to_full_name( pub struct TableMutationOperator { inserter: InserterRef, deleter: DeleterRef, + requester: RequesterRef, } impl TableMutationOperator { - pub fn new(inserter: InserterRef, deleter: DeleterRef) -> Self { - Self { inserter, deleter } + pub fn new(inserter: InserterRef, deleter: DeleterRef, requester: RequesterRef) -> Self { + Self { + inserter, + deleter, + requester, + } } } @@ -93,4 +103,28 @@ impl TableMutationHandler for TableMutationOperator { .map_err(BoxedError::new) .context(query_error::TableMutationSnafu) } + + async fn flush( + &self, + request: FlushTableRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_table_flush(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } + + async fn compact( + &self, + request: CompactTableRequest, + ctx: QueryContextRef, + ) -> QueryResult { + self.requester + .handle_table_compaction(request, ctx) + .await + .map_err(BoxedError::new) + .context(query_error::TableMutationSnafu) + } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index d41b885f9465..ce22a29027c7 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -22,6 +22,7 @@ use api::v1::region::{ InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{self, Rows, SemanticType}; +pub use common_base::AffectedRows; use snafu::{ensure, OptionExt}; use strum::IntoStaticStr; @@ -33,8 +34,6 @@ use crate::metadata::{ use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; -pub type AffectedRows = usize; - #[derive(Debug, IntoStaticStr)] pub enum RegionRequest { Put(RegionPutRequest), diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 7b0e8a625f1f..bdd40a7282c8 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -286,20 +286,14 @@ pub struct CopyTableRequest { pub struct FlushTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: Option, - pub region_number: Option, - /// Wait until the flush is done. - pub wait: Option, + pub table_name: String, } #[derive(Debug, Clone, Default)] pub struct CompactTableRequest { pub catalog_name: String, pub schema_name: String, - pub table_name: Option, - pub region_number: Option, - /// Wait until the compaction is done. - pub wait: Option, + pub table_name: String, } /// Truncate table request