Skip to content

Commit

Permalink
refactor(table): eliminate calls to DistTable.delete (#2225)
Browse files Browse the repository at this point in the history
* refactor(table): eliminate calls to DistTable.delete

Signed-off-by: Zhenchi <[email protected]>

* fix: format

Signed-off-by: Zhenchi <[email protected]>

* fix: clippy

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Aug 23, 2023
1 parent d81ddd8 commit af95e46
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 140 deletions.
67 changes: 59 additions & 8 deletions src/frontend/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod backup;
mod copy_table_from;
mod copy_table_to;
mod describe;
mod insert;
mod dml;
mod show;
mod tql;

Expand All @@ -31,6 +31,7 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::query_engine::SqlStatementExecutorRef;
use query::QueryEngineRef;
use session::context::QueryContextRef;
Expand All @@ -39,14 +40,17 @@ use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::error::TableOperationSnafu;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest, InsertRequest};
use table::requests::{
CopyDatabaseRequest, CopyDirection, CopyTableRequest, DeleteRequest, InsertRequest,
};
use table::TableRef;

use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
};
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::distributed::inserter::DistInserter;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};

Expand Down Expand Up @@ -83,12 +87,14 @@ impl StatementExecutor {

pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
Statement::Query(_) | Statement::Explain(_) => {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}

Statement::Insert(insert) => self.insert(insert, query_ctx).await,

Statement::Delete(delete) => self.delete(delete, query_ctx).await,

Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,

Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
Expand Down Expand Up @@ -128,12 +134,16 @@ impl StatementExecutor {
}
}

async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let planner = self.query_engine.planner();
let plan = planner
.plan(stmt, query_ctx.clone())
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan> {
self.query_engine
.planner()
.plan(stmt, query_ctx)
.await
.context(PlanStatementSnafu)?;
.context(PlanStatementSnafu)
}

async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?;
self.query_engine
.execute(plan, query_ctx)
.await
Expand Down Expand Up @@ -195,6 +205,47 @@ impl StatementExecutor {
}
}
}

// TODO(zhongzc): A middle state that eliminates calls to table.delete,
// For DistTable, its delete is not invoked; for MitoTable, it is still called but eventually eliminated.
async fn send_delete_request(&self, request: DeleteRequest) -> Result<usize> {
let frontend_catalog_manager = self
.catalog_manager
.as_any()
.downcast_ref::<FrontendCatalogManager>();

let table_name = request.table_name.clone();
match frontend_catalog_manager {
Some(frontend_catalog_manager) => {
let inserter = DistDeleter::new(
request.catalog_name.clone(),
request.schema_name.clone(),
Arc::new(frontend_catalog_manager.clone()),
);
let affected_rows = inserter
.delete(vec![request])
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)
.context(InsertSnafu { table_name })?;
Ok(affected_rows)
}
None => {
let table_ref = TableReference::full(
&request.catalog_name,
&request.schema_name,
&request.table_name,
);
let affected_rows = self
.get_table(&table_ref)
.await?
.delete(request)
.await
.context(InsertSnafu { table_name })?;
Ok(affected_rows)
}
}
}
}

fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
Expand Down
217 changes: 217 additions & 0 deletions src/frontend/src/statement/dml.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_query::Output;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::delete::Delete;
use sql::statements::insert::Insert;
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::metadata::TableInfoRef;
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;

use super::StatementExecutor;
use crate::error::{
BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu,
MissingTimeIndexColumnSnafu, ReadRecordBatchSnafu, Result, UnexpectedSnafu,
};

impl StatementExecutor {
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
self.sql_stmt_executor
.execute_sql(Statement::Insert(insert), query_ctx)
.await
.context(ExecuteStatementSnafu)
} else {
// Slow path: insert with subquery. Execute the subquery first, via query engine. Then
// insert the results by sending insert requests.

// 1. Plan the whole insert statement into a logical plan, then a wrong insert statement
// will be caught and a plan error will be returned.
let statement = QueryStatement::Sql(Statement::Insert(insert));
let logical_plan = self.plan(statement, query_ctx.clone()).await?;

// 2. Execute the subquery, get the results as a record batch stream.
let dml_statement = extract_dml_statement(logical_plan)?;
ensure!(
dml_statement.op == WriteOp::Insert,
UnexpectedSnafu {
violated: "expected an INSERT plan"
}
);
let mut stream = self
.execute_dml_subquery(&dml_statement, query_ctx.clone())
.await?;

// 3. Send insert requests.
let mut affected_rows = 0;
let table = self.get_table_from_dml(dml_statement, &query_ctx).await?;
let table_info = table.table_info();
while let Some(batch) = stream.next().await {
let record_batch = batch.context(ReadRecordBatchSnafu)?;
let insert_request =
build_insert_request(record_batch, table.schema(), &table_info)?;
affected_rows += self.send_insert_request(insert_request).await?;
}

Ok(Output::AffectedRows(affected_rows))
}
}

pub async fn delete(&self, delete: Box<Delete>, query_ctx: QueryContextRef) -> Result<Output> {
// 1. Plan the whole delete statement into a logical plan, then a wrong delete statement
// will be caught and a plan error will be returned.
let statement = QueryStatement::Sql(Statement::Delete(delete));
let logical_plan = self.plan(statement, query_ctx.clone()).await?;

// 2. Execute the subquery, get the results as a record batch stream.
let dml_statement = extract_dml_statement(logical_plan)?;
ensure!(
dml_statement.op == WriteOp::Delete,
UnexpectedSnafu {
violated: "expected a DELETE plan"
}
);
let mut stream = self
.execute_dml_subquery(&dml_statement, query_ctx.clone())
.await?;

// 3. Send delete requests.
let mut affected_rows = 0;
let table = self.get_table_from_dml(dml_statement, &query_ctx).await?;
let table_info = table.table_info();
while let Some(batch) = stream.next().await {
let record_batch = batch.context(ReadRecordBatchSnafu)?;
let delete_request = build_delete_request(record_batch, table.schema(), &table_info)?;
affected_rows += self.send_delete_request(delete_request).await?;
}

Ok(Output::AffectedRows(affected_rows))
}

async fn execute_dml_subquery(
&self,
dml_statement: &DmlStatement,
query_ctx: QueryContextRef,
) -> Result<SendableRecordBatchStream> {
let subquery_plan = LogicalPlan::from(dml_statement.input.as_ref().clone());
let output = self
.query_engine
.execute(subquery_plan, query_ctx)
.await
.context(ExecLogicalPlanSnafu)?;
match output {
Output::Stream(stream) => Ok(stream),
Output::RecordBatches(record_batches) => Ok(record_batches.as_stream()),
_ => UnexpectedSnafu {
violated: "expected a stream",
}
.fail(),
}
}

async fn get_table_from_dml(
&self,
dml_statement: DmlStatement,
query_ctx: &QueryContextRef,
) -> Result<TableRef> {
let default_catalog = query_ctx.current_catalog().to_owned();
let default_schema = query_ctx.current_schema().to_owned();
let resolved_table_ref = dml_statement
.table_name
.resolve(&default_catalog, &default_schema);
let table_ref = TableReference::full(
&resolved_table_ref.catalog,
&resolved_table_ref.schema,
&resolved_table_ref.table,
);
self.get_table(&table_ref).await
}
}

fn extract_dml_statement(logical_plan: LogicalPlan) -> Result<DmlStatement> {
let LogicalPlan::DfPlan(df_plan) = logical_plan;
match df_plan {
DfLogicalPlan::Dml(dml) => Ok(dml),
_ => UnexpectedSnafu {
violated: "expected a DML plan",
}
.fail(),
}
}

fn build_insert_request(
record_batch: RecordBatch,
table_schema: SchemaRef,
table_info: &TableInfoRef,
) -> Result<InsertRequest> {
let columns_values = record_batch
.column_vectors(&table_info.name, table_schema)
.context(BuildColumnVectorsSnafu)?;

Ok(InsertRequest {
catalog_name: table_info.catalog_name.clone(),
schema_name: table_info.schema_name.clone(),
table_name: table_info.name.clone(),
columns_values,
region_number: 0,
})
}

fn build_delete_request(
record_batch: RecordBatch,
table_schema: SchemaRef,
table_info: &TableInfoRef,
) -> Result<DeleteRequest> {
let ts_column = table_schema
.timestamp_column()
.map(|x| x.name.clone())
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
table_name: table_info.name.clone(),
})
.context(MissingTimeIndexColumnSnafu)?;

let column_vectors = record_batch
.column_vectors(&table_info.name, table_schema)
.context(BuildColumnVectorsSnafu)?;

let rowkey_columns = table_info
.meta
.row_key_column_names()
.collect::<Vec<&String>>();

let key_column_values = column_vectors
.into_iter()
.filter(|x| x.0 == ts_column || rowkey_columns.contains(&&x.0))
.collect::<HashMap<_, _>>();

Ok(DeleteRequest {
catalog_name: table_info.catalog_name.clone(),
schema_name: table_info.schema_name.clone(),
table_name: table_info.name.clone(),
key_column_values,
})
}
Loading

0 comments on commit af95e46

Please sign in to comment.