diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index b44a1f2ad37a..26ee518f689b 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -16,7 +16,7 @@ mod backup; mod copy_table_from; mod copy_table_to; mod describe; -mod insert; +mod dml; mod show; mod tql; @@ -31,6 +31,7 @@ use common_time::range::TimestampRange; use common_time::Timestamp; use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name}; use query::parser::QueryStatement; +use query::plan::LogicalPlan; use query::query_engine::SqlStatementExecutorRef; use query::QueryEngineRef; use session::context::QueryContextRef; @@ -39,7 +40,9 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use table::engine::TableReference; use table::error::TableOperationSnafu; -use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest, InsertRequest}; +use table::requests::{ + CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest, +}; use table::TableRef; use crate::catalog::FrontendCatalogManager; @@ -47,6 +50,7 @@ use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu, PlanStatementSnafu, Result, TableNotFoundSnafu, }; +use crate::instance::distributed::deleter::DistDeleter; use crate::instance::distributed::inserter::DistInserter; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; @@ -83,12 +87,14 @@ impl StatementExecutor { pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { match stmt { - Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { + Statement::Query(_) | Statement::Explain(_) => { self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await } Statement::Insert(insert) => self.insert(insert, query_ctx).await, + Statement::Delete(delete) => self.delete(delete, query_ctx).await, + Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await, Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await, @@ -128,12 +134,16 @@ impl StatementExecutor { } } - async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { - let planner = self.query_engine.planner(); - let plan = planner - .plan(stmt, query_ctx.clone()) + async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { + self.query_engine + .planner() + .plan(stmt, query_ctx) .await - .context(PlanStatementSnafu)?; + .context(PlanStatementSnafu) + } + + async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { + let plan = self.plan(stmt, query_ctx.clone()).await?; self.query_engine .execute(plan, query_ctx) .await @@ -195,6 +205,47 @@ impl StatementExecutor { } } } + + // TODO(zhongzc): A middle state that eliminates calls to table.delete, + // For DistTable, its delete is not invoked; for MitoTable, it is still called but eventually eliminated. + async fn send_delete_request(&self, request: DeleteRequest) -> Result { + let frontend_catalog_manager = self + .catalog_manager + .as_any() + .downcast_ref::(); + + let table_name = request.table_name.clone(); + match frontend_catalog_manager { + Some(frontend_catalog_manager) => { + let inserter = DistDeleter::new( + request.catalog_name.clone(), + request.schema_name.clone(), + Arc::new(frontend_catalog_manager.clone()), + ); + let affected_rows = inserter + .delete(vec![request]) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu) + .context(InsertSnafu { table_name })?; + Ok(affected_rows) + } + None => { + let table_ref = TableReference::full( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + let affected_rows = self + .get_table(&table_ref) + .await? + .delete(request) + .await + .context(InsertSnafu { table_name })?; + Ok(affected_rows) + } + } + } } fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result { diff --git a/src/frontend/src/statement/dml.rs b/src/frontend/src/statement/dml.rs new file mode 100644 index 000000000000..9d6deabbc501 --- /dev/null +++ b/src/frontend/src/statement/dml.rs @@ -0,0 +1,217 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_query::Output; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; +use datatypes::schema::SchemaRef; +use futures_util::StreamExt; +use query::parser::QueryStatement; +use query::plan::LogicalPlan; +use session::context::QueryContextRef; +use snafu::{ensure, OptionExt, ResultExt}; +use sql::statements::delete::Delete; +use sql::statements::insert::Insert; +use sql::statements::statement::Statement; +use table::engine::TableReference; +use table::metadata::TableInfoRef; +use table::requests::{DeleteRequest, InsertRequest}; +use table::TableRef; + +use super::StatementExecutor; +use crate::error::{ + BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, + MissingTimeIndexColumnSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu, +}; + +impl StatementExecutor { + pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { + if insert.can_extract_values() { + // Fast path: plain insert ("insert with literal values") is executed directly + self.sql_stmt_executor + .execute_sql(Statement::Insert(insert), query_ctx) + .await + .context(ExecuteStatementSnafu) + } else { + // Slow path: insert with subquery. Execute the subquery first, via query engine. Then + // insert the results by sending insert requests. + + // 1. Plan the whole insert statement into a logical plan, then a wrong insert statement + // will be caught and a plan error will be returned. + let statement = QueryStatement::Sql(Statement::Insert(insert)); + let logical_plan = self.plan(statement, query_ctx.clone()).await?; + + // 2. Execute the subquery, get the results as a record batch stream. + let dml_statement = extract_dml_statement(logical_plan)?; + ensure!( + dml_statement.op == WriteOp::Insert, + UnexpectedSnafu { + violated: "expected an INSERT plan" + } + ); + let mut stream = self + .execute_dml_subquery(&dml_statement, query_ctx.clone()) + .await?; + + // 3. Send insert requests. + let mut affected_rows = 0; + let table = self.get_table_from_dml(dml_statement, &query_ctx).await?; + let table_info = table.table_info(); + while let Some(batch) = stream.next().await { + let record_batch = batch.context(ReadRecordBatchSnafu)?; + let insert_request = + build_insert_request(record_batch, table.schema(), &table_info)?; + affected_rows += self.send_insert_request(insert_request).await?; + } + + Ok(Output::AffectedRows(affected_rows)) + } + } + + pub async fn delete(&self, delete: Box, query_ctx: QueryContextRef) -> Result { + // 1. Plan the whole delete statement into a logical plan, then a wrong delete statement + // will be caught and a plan error will be returned. + let statement = QueryStatement::Sql(Statement::Delete(delete)); + let logical_plan = self.plan(statement, query_ctx.clone()).await?; + + // 2. Execute the subquery, get the results as a record batch stream. + let dml_statement = extract_dml_statement(logical_plan)?; + ensure!( + dml_statement.op == WriteOp::Delete, + UnexpectedSnafu { + violated: "expected a DELETE plan" + } + ); + let mut stream = self + .execute_dml_subquery(&dml_statement, query_ctx.clone()) + .await?; + + // 3. Send delete requests. + let mut affected_rows = 0; + let table = self.get_table_from_dml(dml_statement, &query_ctx).await?; + let table_info = table.table_info(); + while let Some(batch) = stream.next().await { + let record_batch = batch.context(ReadRecordBatchSnafu)?; + let delete_request = build_delete_request(record_batch, table.schema(), &table_info)?; + affected_rows += self.send_delete_request(delete_request).await?; + } + + Ok(Output::AffectedRows(affected_rows)) + } + + async fn execute_dml_subquery( + &self, + dml_statement: &DmlStatement, + query_ctx: QueryContextRef, + ) -> Result { + let subquery_plan = LogicalPlan::from(dml_statement.input.as_ref().clone()); + let output = self + .query_engine + .execute(subquery_plan, query_ctx) + .await + .context(ExecLogicalPlanSnafu)?; + match output { + Output::Stream(stream) => Ok(stream), + Output::RecordBatches(record_batches) => Ok(record_batches.as_stream()), + _ => UnexpectedSnafu { + violated: "expected a stream", + } + .fail(), + } + } + + async fn get_table_from_dml( + &self, + dml_statement: DmlStatement, + query_ctx: &QueryContextRef, + ) -> Result { + let default_catalog = query_ctx.current_catalog().to_owned(); + let default_schema = query_ctx.current_schema().to_owned(); + let resolved_table_ref = dml_statement + .table_name + .resolve(&default_catalog, &default_schema); + let table_ref = TableReference::full( + &resolved_table_ref.catalog, + &resolved_table_ref.schema, + &resolved_table_ref.table, + ); + self.get_table(&table_ref).await + } +} + +fn extract_dml_statement(logical_plan: LogicalPlan) -> Result { + let LogicalPlan::DfPlan(df_plan) = logical_plan; + match df_plan { + DfLogicalPlan::Dml(dml) => Ok(dml), + _ => UnexpectedSnafu { + violated: "expected a DML plan", + } + .fail(), + } +} + +fn build_insert_request( + record_batch: RecordBatch, + table_schema: SchemaRef, + table_info: &TableInfoRef, +) -> Result { + let columns_values = record_batch + .column_vectors(&table_info.name, table_schema) + .context(BuildColumnVectorsSnafu)?; + + Ok(InsertRequest { + catalog_name: table_info.catalog_name.clone(), + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + columns_values, + region_number: 0, + }) +} + +fn build_delete_request( + record_batch: RecordBatch, + table_schema: SchemaRef, + table_info: &TableInfoRef, +) -> Result { + let ts_column = table_schema + .timestamp_column() + .map(|x| x.name.clone()) + .with_context(|| table::error::MissingTimeIndexColumnSnafu { + table_name: table_info.name.clone(), + }) + .context(MissingTimeIndexColumnSnafu)?; + + let column_vectors = record_batch + .column_vectors(&table_info.name, table_schema) + .context(BuildColumnVectorsSnafu)?; + + let rowkey_columns = table_info + .meta + .row_key_column_names() + .collect::>(); + + let key_column_values = column_vectors + .into_iter() + .filter(|x| x.0 == ts_column || rowkey_columns.contains(&&x.0)) + .collect::>(); + + Ok(DeleteRequest { + catalog_name: table_info.catalog_name.clone(), + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + key_column_values, + }) +} diff --git a/src/frontend/src/statement/insert.rs b/src/frontend/src/statement/insert.rs deleted file mode 100644 index be88ed3c6acc..000000000000 --- a/src/frontend/src/statement/insert.rs +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_error::ext::BoxedError; -use common_query::Output; -use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; -use datanode::instance::sql::table_idents_to_full_name; -use futures_util::StreamExt; -use query::parser::QueryStatement; -use query::plan::LogicalPlan; -use session::context::QueryContextRef; -use snafu::ResultExt; -use sql::statements::insert::Insert; -use sql::statements::statement::Statement; -use table::engine::TableReference; -use table::requests::InsertRequest; - -use super::StatementExecutor; -use crate::error::{ - BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, - PlanStatementSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu, -}; - -impl StatementExecutor { - pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { - if insert.can_extract_values() { - // Fast path: plain insert ("insert with literal values") is executed directly - self.sql_stmt_executor - .execute_sql(Statement::Insert(insert), query_ctx) - .await - .context(ExecuteStatementSnafu) - } else { - // Slow path: insert with subquery. Execute the subquery first, via query engine. Then - // insert the results by sending insert requests. - - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(insert.table_name(), query_ctx.clone()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - // 1. Plan the whole insert statement into a logical plan, then a wrong insert statement - // will be caught and a plan error will be returned. - let logical_plan = self - .query_engine - .planner() - .plan( - QueryStatement::Sql(Statement::Insert(insert)), - query_ctx.clone(), - ) - .await - .context(PlanStatementSnafu)?; - - // 2. Execute the subquery, get the results as a record batch stream. - let subquery_plan = extract_subquery_plan_from_dml(logical_plan)?; - let output = self - .query_engine - .execute(subquery_plan, query_ctx) - .await - .context(ExecLogicalPlanSnafu)?; - let Output::Stream(mut stream) = output else { - return UnexpectedSnafu { - violated: "expected a stream", - } - .fail(); - }; - - // 3. Send insert requests. - let mut affected_rows = 0; - let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name); - let table = self.get_table(&table_ref).await?; - while let Some(batch) = stream.next().await { - let record_batch = batch.context(ReadRecordBatchSnafu)?; - let columns_values = record_batch - .column_vectors(&table_name, table.schema()) - .context(BuildColumnVectorsSnafu)?; - - let insert_request = InsertRequest { - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - columns_values, - region_number: 0, - }; - affected_rows += self.send_insert_request(insert_request).await?; - } - - Ok(Output::AffectedRows(affected_rows)) - } - } -} - -fn extract_subquery_plan_from_dml(logical_plan: LogicalPlan) -> Result { - let LogicalPlan::DfPlan(df_plan) = logical_plan; - match df_plan { - DfLogicalPlan::Dml(DmlStatement { - op: WriteOp::Insert, - input, - .. - }) => Ok(LogicalPlan::from(input.as_ref().clone())), - _ => UnexpectedSnafu { - violated: "expected a plan of insert dml", - } - .fail(), - } -} diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 663f31178047..46f4b78497f6 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -36,11 +36,9 @@ use snafu::prelude::*; use store_api::storage::ScanRequest; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; -use table::requests::DeleteRequest; use table::Table; use crate::catalog::FrontendCatalogManager; -use crate::instance::distributed::deleter::DistDeleter; use crate::table::scan::{DatanodeInstance, TableScanPlan}; pub mod delete; @@ -138,20 +136,6 @@ impl Table for DistTable { ) -> table::Result> { Ok(vec![FilterPushDownType::Inexact; filters.len()]) } - - async fn delete(&self, request: DeleteRequest) -> table::Result { - let deleter = DistDeleter::new( - request.catalog_name.clone(), - request.schema_name.clone(), - self.catalog_manager.clone(), - ); - let affected_rows = deleter - .delete(vec![request]) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - Ok(affected_rows) - } } impl DistTable {