Skip to content

Commit

Permalink
refactor(table): eliminate calls to DistTable.insert (#2219)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Aug 22, 2023
1 parent b3b43fe commit cb3561f
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 59 deletions.
20 changes: 18 additions & 2 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ pub enum Error {
},

#[snafu(display("Failed to read record batch, source: {}", source))]
ReadRecordBatch {
ReadDfRecordBatch {
source: datafusion::error::DataFusionError,
location: Location,
},
Expand Down Expand Up @@ -600,6 +600,18 @@ pub enum Error {

#[snafu(display("Empty data: {}", msg))]
EmptyData { msg: String, location: Location },

#[snafu(display("Failed to read record batch, source: {}", source))]
ReadRecordBatch {
source: common_recordbatch::error::Error,
location: Location,
},

#[snafu(display("Failed to build column vectors, source: {}", source))]
BuildColumnVectors {
source: common_recordbatch::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -678,7 +690,7 @@ impl ErrorExt for Error {

Error::JoinTask { .. }
| Error::BuildParquetRecordBatchStream { .. }
| Error::ReadRecordBatch { .. }
| Error::ReadDfRecordBatch { .. }
| Error::BuildFileStream { .. }
| Error::WriteStreamToFile { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Expand Down Expand Up @@ -731,6 +743,10 @@ impl ErrorExt for Error {

Error::WriteParquet { source, .. } => source.status_code(),
Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments,

Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => {
source.status_code()
}
}
}

Expand Down
21 changes: 10 additions & 11 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionNumber;
use table::error::TableOperationSnafu;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
Expand Down Expand Up @@ -373,26 +374,24 @@ impl DistInstance {
self.drop_table(table_name).await
}
Statement::Insert(insert) => {
let (catalog, schema, table) =
let (catalog, schema, _) =
table_idents_to_full_name(insert.table_name(), query_ctx.clone())
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;

let table = self
.catalog_manager
.table(&catalog, &schema, &table)
.await
.context(CatalogSnafu)?
.context(TableNotFoundSnafu { table_name: table })?;

let insert_request =
SqlHandler::insert_to_request(self.catalog_manager.clone(), &insert, query_ctx)
.await
.context(InvokeDatanodeSnafu)?;

Ok(Output::AffectedRows(
table.insert(insert_request).await.context(TableSnafu)?,
))
let inserter = DistInserter::new(catalog, schema, self.catalog_manager.clone());
let affected_rows = inserter
.insert(vec![insert_request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
.context(TableSnafu)?;
Ok(Output::AffectedRows(affected_rows as usize))
}
Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
Expand Down
67 changes: 50 additions & 17 deletions src/frontend/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ mod backup;
mod copy_table_from;
mod copy_table_to;
mod describe;
mod insert;
mod show;
mod tql;

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
Expand All @@ -36,14 +38,16 @@ use snafu::{OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::error::TableOperationSnafu;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest, InsertRequest};
use table::TableRef;

use crate::error;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu,
Result, TableNotFoundSnafu,
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::distributed::inserter::DistInserter;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};

#[derive(Clone)]
Expand Down Expand Up @@ -83,19 +87,7 @@ impl StatementExecutor {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}

// For performance consideration, only requests that can't extract values is executed by query engine.
// Plain insert ("insert with literal values") is still executed directly in statement.
Statement::Insert(insert) => {
if insert.can_extract_values() {
self.sql_stmt_executor
.execute_sql(Statement::Insert(insert), query_ctx)
.await
.context(ExecuteStatementSnafu)
} else {
self.plan_exec(QueryStatement::Sql(Statement::Insert(insert)), query_ctx)
.await
}
}
Statement::Insert(insert) => self.insert(insert, query_ctx).await,

Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,

Expand Down Expand Up @@ -162,6 +154,47 @@ impl StatementExecutor {
table_name: table_ref.to_string(),
})
}

// TODO(zhongzc): A middle state that eliminates calls to table.insert,
// For DistTable, its insert is not invoked; for MitoTable, it is still called but eventually eliminated.
async fn send_insert_request(&self, request: InsertRequest) -> Result<usize> {
let frontend_catalog_manager = self
.catalog_manager
.as_any()
.downcast_ref::<FrontendCatalogManager>();

let table_name = request.table_name.clone();
match frontend_catalog_manager {
Some(frontend_catalog_manager) => {
let inserter = DistInserter::new(
request.catalog_name.clone(),
request.schema_name.clone(),
Arc::new(frontend_catalog_manager.clone()),
);
let affected_rows = inserter
.insert(vec![request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
.context(InsertSnafu { table_name })?;
Ok(affected_rows as usize)
}
None => {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let affected_rows = self
.get_table(&table_ref)
.await?
.insert(request)
.await
.context(InsertSnafu { table_name })?;
Ok(affected_rows)
}
}
}
}

fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
Expand Down
19 changes: 6 additions & 13 deletions src/frontend/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl StatementExecutor {
let mut pending = vec![];

while let Some(r) = stream.next().await {
let record_batch = r.context(error::ReadRecordBatchSnafu)?;
let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
let vectors =
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;

Expand All @@ -322,7 +322,7 @@ impl StatementExecutor {
.zip(vectors)
.collect::<HashMap<_, _>>();

pending.push(table.insert(InsertRequest {
pending.push(self.send_insert_request(InsertRequest {
catalog_name: req.catalog_name.to_string(),
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
Expand All @@ -332,14 +332,12 @@ impl StatementExecutor {
}));

if pending_mem_size as u64 >= pending_mem_threshold {
rows_inserted +=
batch_insert(&mut pending, &mut pending_mem_size, &req.table_name).await?;
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}

if !pending.is_empty() {
rows_inserted +=
batch_insert(&mut pending, &mut pending_mem_size, &req.table_name).await?;
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}

Expand All @@ -349,16 +347,11 @@ impl StatementExecutor {

/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
async fn batch_insert(
pending: &mut Vec<impl Future<Output = table::error::Result<usize>>>,
pending: &mut Vec<impl Future<Output = Result<usize>>>,
pending_bytes: &mut usize,
table_name: &str,
) -> Result<usize> {
let batch = pending.drain(..);
let res: usize = futures::future::try_join_all(batch)
.await
.context(error::InsertSnafu { table_name })?
.iter()
.sum();
let res: usize = futures::future::try_join_all(batch).await?.iter().sum();
*pending_bytes = 0;
Ok(res)
}
Expand Down
116 changes: 116 additions & 0 deletions src/frontend/src/statement/insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::BoxedError;
use common_query::Output;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datanode::instance::sql::table_idents_to_full_name;
use futures_util::StreamExt;
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::InsertRequest;

use super::StatementExecutor;
use crate::error::{
BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
PlanStatementSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu,
};

impl StatementExecutor {
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
self.sql_stmt_executor
.execute_sql(Statement::Insert(insert), query_ctx)
.await
.context(ExecuteStatementSnafu)
} else {
// Slow path: insert with subquery. Execute the subquery first, via query engine. Then
// insert the results by sending insert requests.

let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(insert.table_name(), query_ctx.clone())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

// 1. Plan the whole insert statement into a logical plan, then a wrong insert statement
// will be caught and a plan error will be returned.
let logical_plan = self
.query_engine
.planner()
.plan(
QueryStatement::Sql(Statement::Insert(insert)),
query_ctx.clone(),
)
.await
.context(PlanStatementSnafu)?;

// 2. Execute the subquery, get the results as a record batch stream.
let subquery_plan = extract_subquery_plan_from_dml(logical_plan)?;
let output = self
.query_engine
.execute(subquery_plan, query_ctx)
.await
.context(ExecLogicalPlanSnafu)?;
let Output::Stream(mut stream) = output else {
return UnexpectedSnafu {
violated: "expected a stream",
}
.fail();
};

// 3. Send insert requests.
let mut affected_rows = 0;
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
let table = self.get_table(&table_ref).await?;
while let Some(batch) = stream.next().await {
let record_batch = batch.context(ReadRecordBatchSnafu)?;
let columns_values = record_batch
.column_vectors(&table_name, table.schema())
.context(BuildColumnVectorsSnafu)?;

let insert_request = InsertRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
columns_values,
region_number: 0,
};
affected_rows += self.send_insert_request(insert_request).await?;
}

Ok(Output::AffectedRows(affected_rows))
}
}
}

fn extract_subquery_plan_from_dml(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
let LogicalPlan::DfPlan(df_plan) = logical_plan;
match df_plan {
DfLogicalPlan::Dml(DmlStatement {
op: WriteOp::Insert,
input,
..
}) => Ok(LogicalPlan::from(input.as_ref().clone())),
_ => UnexpectedSnafu {
violated: "expected a plan of insert dml",
}
.fail(),
}
}
Loading

0 comments on commit cb3561f

Please sign in to comment.