diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 72a792891465..f63867cd04e8 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -513,7 +513,7 @@ pub enum Error { }, #[snafu(display("Failed to read record batch, source: {}", source))] - ReadRecordBatch { + ReadDfRecordBatch { source: datafusion::error::DataFusionError, location: Location, }, @@ -600,6 +600,18 @@ pub enum Error { #[snafu(display("Empty data: {}", msg))] EmptyData { msg: String, location: Location }, + + #[snafu(display("Failed to read record batch, source: {}", source))] + ReadRecordBatch { + source: common_recordbatch::error::Error, + location: Location, + }, + + #[snafu(display("Failed to build column vectors, source: {}", source))] + BuildColumnVectors { + source: common_recordbatch::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -678,7 +690,7 @@ impl ErrorExt for Error { Error::JoinTask { .. } | Error::BuildParquetRecordBatchStream { .. } - | Error::ReadRecordBatch { .. } + | Error::ReadDfRecordBatch { .. } | Error::BuildFileStream { .. } | Error::WriteStreamToFile { .. } | Error::Unexpected { .. } => StatusCode::Unexpected, @@ -731,6 +743,10 @@ impl ErrorExt for Error { Error::WriteParquet { source, .. } => source.status_code(), Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, + + Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => { + source.status_code() + } } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index c2ccbb3cb0eb..0e8314da5efb 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -57,6 +57,7 @@ use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; use store_api::storage::RegionNumber; +use table::error::TableOperationSnafu; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType}; use table::requests::{AlterTableRequest, TableOptions}; use table::TableRef; @@ -373,26 +374,24 @@ impl DistInstance { self.drop_table(table_name).await } Statement::Insert(insert) => { - let (catalog, schema, table) = + let (catalog, schema, _) = table_idents_to_full_name(insert.table_name(), query_ctx.clone()) .map_err(BoxedError::new) .context(error::ExternalSnafu)?; - let table = self - .catalog_manager - .table(&catalog, &schema, &table) - .await - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name: table })?; - let insert_request = SqlHandler::insert_to_request(self.catalog_manager.clone(), &insert, query_ctx) .await .context(InvokeDatanodeSnafu)?; - Ok(Output::AffectedRows( - table.insert(insert_request).await.context(TableSnafu)?, - )) + let inserter = DistInserter::new(catalog, schema, self.catalog_manager.clone()); + let affected_rows = inserter + .insert(vec![insert_request]) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu) + .context(TableSnafu)?; + Ok(Output::AffectedRows(affected_rows as usize)) } Statement::ShowCreateTable(show) => { let (catalog, schema, table) = diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index 56817a501aae..b44a1f2ad37a 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -16,11 +16,13 @@ mod backup; mod copy_table_from; mod copy_table_to; mod describe; +mod insert; mod show; mod tql; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; @@ -36,14 +38,16 @@ use snafu::{OptionExt, ResultExt}; use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use table::engine::TableReference; -use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; +use table::error::TableOperationSnafu; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest, InsertRequest}; use table::TableRef; -use crate::error; +use crate::catalog::FrontendCatalogManager; use crate::error::{ - CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu, - Result, TableNotFoundSnafu, + self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu, + PlanStatementSnafu, Result, TableNotFoundSnafu, }; +use crate::instance::distributed::inserter::DistInserter; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; #[derive(Clone)] @@ -83,19 +87,7 @@ impl StatementExecutor { self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await } - // For performance consideration, only requests that can't extract values is executed by query engine. - // Plain insert ("insert with literal values") is still executed directly in statement. - Statement::Insert(insert) => { - if insert.can_extract_values() { - self.sql_stmt_executor - .execute_sql(Statement::Insert(insert), query_ctx) - .await - .context(ExecuteStatementSnafu) - } else { - self.plan_exec(QueryStatement::Sql(Statement::Insert(insert)), query_ctx) - .await - } - } + Statement::Insert(insert) => self.insert(insert, query_ctx).await, Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await, @@ -162,6 +154,47 @@ impl StatementExecutor { table_name: table_ref.to_string(), }) } + + // TODO(zhongzc): A middle state that eliminates calls to table.insert, + // For DistTable, its insert is not invoked; for MitoTable, it is still called but eventually eliminated. + async fn send_insert_request(&self, request: InsertRequest) -> Result { + let frontend_catalog_manager = self + .catalog_manager + .as_any() + .downcast_ref::(); + + let table_name = request.table_name.clone(); + match frontend_catalog_manager { + Some(frontend_catalog_manager) => { + let inserter = DistInserter::new( + request.catalog_name.clone(), + request.schema_name.clone(), + Arc::new(frontend_catalog_manager.clone()), + ); + let affected_rows = inserter + .insert(vec![request]) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu) + .context(InsertSnafu { table_name })?; + Ok(affected_rows as usize) + } + None => { + let table_ref = TableReference::full( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ); + let affected_rows = self + .get_table(&table_ref) + .await? + .insert(request) + .await + .context(InsertSnafu { table_name })?; + Ok(affected_rows) + } + } + } } fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result { diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 17dbc28934db..30548d737f38 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -310,7 +310,7 @@ impl StatementExecutor { let mut pending = vec![]; while let Some(r) = stream.next().await { - let record_batch = r.context(error::ReadRecordBatchSnafu)?; + let record_batch = r.context(error::ReadDfRecordBatchSnafu)?; let vectors = Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?; @@ -322,7 +322,7 @@ impl StatementExecutor { .zip(vectors) .collect::>(); - pending.push(table.insert(InsertRequest { + pending.push(self.send_insert_request(InsertRequest { catalog_name: req.catalog_name.to_string(), schema_name: req.schema_name.to_string(), table_name: req.table_name.to_string(), @@ -332,14 +332,12 @@ impl StatementExecutor { })); if pending_mem_size as u64 >= pending_mem_threshold { - rows_inserted += - batch_insert(&mut pending, &mut pending_mem_size, &req.table_name).await?; + rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; } } if !pending.is_empty() { - rows_inserted += - batch_insert(&mut pending, &mut pending_mem_size, &req.table_name).await?; + rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; } } @@ -349,16 +347,11 @@ impl StatementExecutor { /// Executes all pending inserts all at once, drain pending requests and reset pending bytes. async fn batch_insert( - pending: &mut Vec>>, + pending: &mut Vec>>, pending_bytes: &mut usize, - table_name: &str, ) -> Result { let batch = pending.drain(..); - let res: usize = futures::future::try_join_all(batch) - .await - .context(error::InsertSnafu { table_name })? - .iter() - .sum(); + let res: usize = futures::future::try_join_all(batch).await?.iter().sum(); *pending_bytes = 0; Ok(res) } diff --git a/src/frontend/src/statement/insert.rs b/src/frontend/src/statement/insert.rs new file mode 100644 index 000000000000..be88ed3c6acc --- /dev/null +++ b/src/frontend/src/statement/insert.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::BoxedError; +use common_query::Output; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; +use datanode::instance::sql::table_idents_to_full_name; +use futures_util::StreamExt; +use query::parser::QueryStatement; +use query::plan::LogicalPlan; +use session::context::QueryContextRef; +use snafu::ResultExt; +use sql::statements::insert::Insert; +use sql::statements::statement::Statement; +use table::engine::TableReference; +use table::requests::InsertRequest; + +use super::StatementExecutor; +use crate::error::{ + BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, + PlanStatementSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu, +}; + +impl StatementExecutor { + pub async fn insert(&self, insert: Box, query_ctx: QueryContextRef) -> Result { + if insert.can_extract_values() { + // Fast path: plain insert ("insert with literal values") is executed directly + self.sql_stmt_executor + .execute_sql(Statement::Insert(insert), query_ctx) + .await + .context(ExecuteStatementSnafu) + } else { + // Slow path: insert with subquery. Execute the subquery first, via query engine. Then + // insert the results by sending insert requests. + + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(insert.table_name(), query_ctx.clone()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + // 1. Plan the whole insert statement into a logical plan, then a wrong insert statement + // will be caught and a plan error will be returned. + let logical_plan = self + .query_engine + .planner() + .plan( + QueryStatement::Sql(Statement::Insert(insert)), + query_ctx.clone(), + ) + .await + .context(PlanStatementSnafu)?; + + // 2. Execute the subquery, get the results as a record batch stream. + let subquery_plan = extract_subquery_plan_from_dml(logical_plan)?; + let output = self + .query_engine + .execute(subquery_plan, query_ctx) + .await + .context(ExecLogicalPlanSnafu)?; + let Output::Stream(mut stream) = output else { + return UnexpectedSnafu { + violated: "expected a stream", + } + .fail(); + }; + + // 3. Send insert requests. + let mut affected_rows = 0; + let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name); + let table = self.get_table(&table_ref).await?; + while let Some(batch) = stream.next().await { + let record_batch = batch.context(ReadRecordBatchSnafu)?; + let columns_values = record_batch + .column_vectors(&table_name, table.schema()) + .context(BuildColumnVectorsSnafu)?; + + let insert_request = InsertRequest { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + columns_values, + region_number: 0, + }; + affected_rows += self.send_insert_request(insert_request).await?; + } + + Ok(Output::AffectedRows(affected_rows)) + } + } +} + +fn extract_subquery_plan_from_dml(logical_plan: LogicalPlan) -> Result { + let LogicalPlan::DfPlan(df_plan) = logical_plan; + match df_plan { + DfLogicalPlan::Dml(DmlStatement { + op: WriteOp::Insert, + input, + .. + }) => Ok(LogicalPlan::from(input.as_ref().clone())), + _ => UnexpectedSnafu { + violated: "expected a plan of insert dml", + } + .fail(), + } +} diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index f6cad01041d7..663f31178047 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -36,12 +36,11 @@ use snafu::prelude::*; use store_api::storage::ScanRequest; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfoRef, TableType}; -use table::requests::{DeleteRequest, InsertRequest}; +use table::requests::DeleteRequest; use table::Table; use crate::catalog::FrontendCatalogManager; use crate::instance::distributed::deleter::DistDeleter; -use crate::instance::distributed::inserter::DistInserter; use crate::table::scan::{DatanodeInstance, TableScanPlan}; pub mod delete; @@ -73,20 +72,6 @@ impl Table for DistTable { self.table_info.table_type } - async fn insert(&self, request: InsertRequest) -> table::Result { - let inserter = DistInserter::new( - request.catalog_name.clone(), - request.schema_name.clone(), - self.catalog_manager.clone(), - ); - let affected_rows = inserter - .insert(vec![request]) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - Ok(affected_rows as usize) - } - // TODO(ruihang): DistTable should not call this method directly async fn scan_to_stream( &self, @@ -291,6 +276,7 @@ pub(crate) mod test { use partition::PartitionRuleRef; use store_api::storage::RegionNumber; use table::meter_insert_request; + use table::requests::InsertRequest; use super::*;