Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: re-support query engine execute dml #2484

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let state = Arc::new(QueryEngineState::new(
catalog_list,
None,
None,
false,
plugins.clone(),
));
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl DatanodeBuilder {
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
None,
false,
plugins,
);
Expand Down
57 changes: 32 additions & 25 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use operator::delete::{Deleter, DeleterRef};
use operator::insert::{Inserter, InserterRef};
use operator::statement::StatementExecutor;
use operator::table::table_idents_to_full_name;
use operator::table::{table_idents_to_full_name, TableMutationOperator};
use partition::manager::PartitionRuleManager;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
Expand Down Expand Up @@ -163,14 +163,6 @@ impl Instance {
catalog_manager.datanode_manager().clone(),
);

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
true,
plugins.clone(),
)
.query_engine();

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
Expand All @@ -182,14 +174,27 @@ impl Instance {
datanode_clients,
));

let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
true,
plugins.clone(),
)
.query_engine();

let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
meta_client.clone(),
meta_backend.clone(),
catalog_manager.clone(),
inserter.clone(),
deleter.clone(),
));

plugins.insert::<StatementExecutorRef>(statement_executor.clone());
Expand Down Expand Up @@ -301,9 +306,25 @@ impl Instance {
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone());

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager.clone(),
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
inserter.clone(),
deleter.clone(),
));

let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler),
Some(table_mutation_handler),
true,
plugins.clone(),
)
Expand All @@ -317,33 +338,19 @@ impl Instance {
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
datanode_manager.clone(),
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));

let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));

let inserter = Arc::new(Inserter::new(
catalog_manager.clone(),
partition_manager.clone(),
datanode_manager.clone(),
));
let deleter = Arc::new(Deleter::new(
catalog_manager.clone(),
partition_manager,
datanode_manager,
));

let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
ddl_executor,
kv_backend.clone(),
cache_invalidator,
inserter.clone(),
deleter.clone(),
));

Ok(Instance {
Expand Down
6 changes: 4 additions & 2 deletions src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Deleter {
&self,
request: TableDeleteRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
) -> Result<usize> {
let catalog = request.catalog_name.as_str();
let schema = request.schema_name.as_str();
let table = request.table_name.as_str();
Expand All @@ -108,7 +108,9 @@ impl Deleter {
let deletes = TableToRegion::new(&table_info, &self.partition_manager)
.convert(request)
.await?;
self.do_request(deletes, ctx.trace_id(), 0).await

let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
Ok(affected_rows as _)
}
}

Expand Down
1 change: 0 additions & 1 deletion src/operator/src/req_convert/insert/table_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ mod tests {
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table_1".to_string(),
columns_values: HashMap::from([("a".to_string(), vector)]),
region_number: 0,
}
}

Expand Down
8 changes: 1 addition & 7 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use table::engine::TableReference;
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::TableRef;

use crate::delete::DeleterRef;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
Result, TableNotFoundSnafu,
Expand All @@ -66,7 +65,6 @@ pub struct StatementExecutor {
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
deleter: DeleterRef,
}

impl StatementExecutor {
Expand All @@ -77,7 +75,6 @@ impl StatementExecutor {
kv_backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
deleter: DeleterRef,
) -> Self {
Self {
catalog_manager,
Expand All @@ -87,7 +84,6 @@ impl StatementExecutor {
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
cache_invalidator,
inserter,
deleter,
}
}

Expand All @@ -104,14 +100,12 @@ impl StatementExecutor {

pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
Statement::Query(_) | Statement::Explain(_) => {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
}

Statement::Insert(insert) => self.insert(insert, query_ctx).await,
killme2008 marked this conversation as resolved.
Show resolved Hide resolved

Statement::Delete(delete) => self.delete(delete, query_ctx).await,

Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,

Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
Expand Down
1 change: 0 additions & 1 deletion src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ impl StatementExecutor {
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
region_number: 0,
},
query_ctx.clone(),
));
Expand Down
Loading