diff --git a/Cargo.lock b/Cargo.lock index ead7019e0976..f299ae45fd7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4084,7 +4084,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e81a60e817a348ee5b7dfbd991f86d35cd068ce5#e81a60e817a348ee5b7dfbd991f86d35cd068ce5" +source = "git+https://github.com/DevilExileSu/greptime-proto.git?rev=637f06ca7fe097ab22277f473d33840d92f4190c#637f06ca7fe097ab22277f473d33840d92f4190c" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2d6832e1e036..8f067c844988 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,8 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e81a60e817a348ee5b7dfbd991f86d35cd068ce5" } +# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" } +greptime-proto = {git = "https://github.com/DevilExileSu/greptime-proto.git", rev = "637f06ca7fe097ab22277f473d33840d92f4190c"} humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index f84825252435..4ec389199f91 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -28,6 +28,7 @@ use crate::rpc::router::RegionRoute; pub mod alter_table; pub mod create_table; pub mod drop_table; +pub mod truncate_table; pub mod utils; #[derive(Debug, Default)] diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs new file mode 100644 index 000000000000..3f46ff241279 --- /dev/null +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -0,0 +1,229 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::region::{ + region_request, RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest, +}; +use async_trait::async_trait; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, +}; +use common_telemetry::debug; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use store_api::storage::RegionId; +use table::engine::TableReference; +use table::metadata::{RawTableInfo, TableId}; + +use crate::ddl::utils::handle_operate_region_error; +use crate::ddl::DdlContext; +use crate::error::{self, Result, TableNotFoundSnafu}; +use crate::key::table_info::TableInfoValue; +use crate::key::table_name::TableNameKey; +use crate::rpc::ddl::TruncateTableTask; +use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::table_name::TableName; + +pub struct TruncateTableProcedure { + context: DdlContext, + data: TruncateTableData, +} + +#[async_trait] +impl Procedure for TruncateTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let error_handler = |e| { + if matches!(e, error::Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + }; + match self.data.state { + TruncateTableState::Prepare => self.on_prepare().await, + TruncateTableState::DatanodeTruncateTable => self.on_datanode_truncate_table().await, + } + .map_err(error_handler) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.data.table_ref(); + let key = common_catalog::format_full_table_name( + table_ref.catalog, + table_ref.schema, + table_ref.table, + ); + + LockKey::single(key) + } +} + +impl TruncateTableProcedure { + pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTableProcedure"; + + pub(crate) fn new( + cluster_id: u64, + task: TruncateTableTask, + table_info_value: TableInfoValue, + region_routes: Vec, + context: DdlContext, + ) -> Self { + Self { + context, + data: TruncateTableData::new(cluster_id, task, table_info_value, region_routes), + } + } + + pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + // Checks whether the table exists. + async fn on_prepare(&mut self) -> Result { + let table_ref = &self.data.table_ref(); + + let manager = &self.context.table_metadata_manager; + + let exist = manager + .table_name_manager() + .exists(TableNameKey::new( + table_ref.catalog, + table_ref.schema, + table_ref.table, + )) + .await?; + + ensure!( + exist, + TableNotFoundSnafu { + table_name: table_ref.to_string() + } + ); + + self.data.state = TruncateTableState::DatanodeTruncateTable; + + Ok(Status::executing(true)) + } + + async fn on_datanode_truncate_table(&mut self) -> Result { + let table_id = self.data.table_id(); + + let region_routes = &self.data.region_routes; + let leaders = find_leaders(region_routes); + let mut truncate_region_tasks = Vec::with_capacity(leaders.len()); + + for datanode in leaders { + let clients = self.context.datanode_manager.clone(); + let regions = find_leader_regions(region_routes, &datanode); + let region_ids = regions + .iter() + .map(|region_number| RegionId::new(table_id, *region_number)) + .collect::>(); + + truncate_region_tasks.push(async move { + for region_id in region_ids { + debug!("Truncating region {region_id} on Datanode {datanode:?}"); + + let request = RegionRequest { + header: Some(RegionRequestHeader { + trace_id: 0, + span_id: 0, + }), + body: Some(region_request::Body::Truncate(PbTruncateRegionRequest { + region_id: region_id.as_u64(), + })), + }; + + if let Err(err) = clients.datanode(&datanode).await.handle(request).await { + if err.status_code() != StatusCode::RegionNotFound { + return Err(handle_operate_region_error(datanode)(err)); + } + } + } + Ok(()) + }); + } + + join_all(truncate_region_tasks) + .await + .into_iter() + .collect::>>()?; + + Ok(Status::Done) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TruncateTableData { + state: TruncateTableState, + cluster_id: u64, + task: TruncateTableTask, + table_info_value: TableInfoValue, + region_routes: Vec, +} + +impl TruncateTableData { + pub fn new( + cluster_id: u64, + task: TruncateTableTask, + table_info_value: TableInfoValue, + region_routes: Vec, + ) -> Self { + Self { + state: TruncateTableState::DatanodeTruncateTable, + cluster_id, + task, + table_info_value, + region_routes, + } + } + + pub fn table_ref(&self) -> TableReference { + self.task.table_ref() + } + + pub fn table_name(&self) -> TableName { + self.task.table_name() + } + + fn table_info(&self) -> &RawTableInfo { + &self.table_info_value.table_info + } + + fn table_id(&self) -> TableId { + self.table_info().ident.table_id + } +} + +#[derive(Debug, Serialize, Deserialize)] +enum TruncateTableState { + /// Prepares to truncate the table + Prepare, + /// Datanode truncates the table + DatanodeTruncateTable, +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index aaa1e1c52104..62af640a3c57 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; -use common_telemetry::{error, info}; +use common_telemetry::info; use snafu::{OptionExt, ResultExt}; use crate::cache_invalidator::CacheInvalidatorRef; @@ -23,13 +23,14 @@ use crate::datanode_manager::DatanodeManagerRef; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; +use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{ DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext, TableMetadataAllocatorRef, }; use crate::error::{ self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu, - UnsupportedSnafu, WaitProcedureSnafu, + WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; @@ -122,6 +123,20 @@ impl DdlManager { ) .context(RegisterProcedureLoaderSnafu { type_name: AlterTableProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + + self.procedure_manager + .register_loader( + TruncateTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + TruncateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(RegisterProcedureLoaderSnafu { + type_name: TruncateTableProcedure::TYPE_NAME, }) } @@ -183,15 +198,21 @@ impl DdlManager { &self, cluster_id: u64, truncate_table_task: TruncateTableTask, + table_info_value: TableInfoValue, region_routes: Vec, ) -> Result { - error!("Truncate table procedure is not supported, cluster_id = {}, truncate_table_task = {:?}, region_routes = {:?}", - cluster_id, truncate_table_task, region_routes); + let context = self.create_context(); + let procedure = TruncateTableProcedure::new( + cluster_id, + truncate_table_task, + table_info_value, + region_routes, + context, + ); - UnsupportedSnafu { - operation: "TRUNCATE TABLE", - } - .fail() + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await } async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { @@ -216,32 +237,34 @@ async fn handle_truncate_table_task( cluster_id: u64, truncate_table_task: TruncateTableTask, ) -> Result { - let truncate_table = &truncate_table_task.truncate_table; - let table_id = truncate_table - .table_id - .as_ref() - .context(error::UnexpectedSnafu { - err_msg: "expected table id ", - })? - .id; - + let table_id = truncate_table_task.table_id; + let table_metadata_manager = &ddl_manager.table_metadata_manager(); let table_ref = truncate_table_task.table_ref(); - let table_route_value = ddl_manager - .table_metadata_manager() - .table_route_manager() - .get(table_id) - .await? - .with_context(|| error::TableRouteNotFoundSnafu { - table_name: table_ref.to_string(), - })?; + let (table_info_value, table_route_value) = + table_metadata_manager.get_full_table_info(table_id).await?; + + let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu { + table_name: table_ref.to_string(), + })?; let table_route = table_route_value.region_routes; let id = ddl_manager - .submit_truncate_table_task(cluster_id, truncate_table_task, table_route) + .submit_truncate_table_task( + cluster_id, + truncate_table_task, + table_info_value, + table_route, + ) .await?; + info!("Table: {table_id} is truncated via procedure_id {id:?}"); + Ok(SubmitDdlTaskResponse { key: id.to_string().into(), ..Default::default() diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index d1ca0dea94f7..aed5fa1131b3 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -65,8 +65,18 @@ impl DdlTask { DdlTask::AlterTable(AlterTableTask { alter_table }) } - pub fn new_truncate_table(truncate_table: TruncateTableExpr) -> Self { - DdlTask::TruncateTable(TruncateTableTask { truncate_table }) + pub fn new_truncate_table( + catalog: String, + schema: String, + table: String, + table_id: TableId, + ) -> Self { + DdlTask::TruncateTable(TruncateTableTask { + catalog, + schema, + table, + table_id, + }) } } @@ -112,7 +122,12 @@ impl TryFrom for PbSubmitDdlTaskRequest { alter_table: Some(task.alter_table), }), DdlTask::TruncateTable(task) => Task::TruncateTableTask(PbTruncateTableTask { - truncate_table: Some(task.truncate_table), + truncate_table: Some(TruncateTableExpr { + catalog_name: task.catalog, + schema_name: task.schema, + table_name: task.table, + table_id: Some(api::v1::TableId { id: task.table_id }), + }), }), }; @@ -358,27 +373,28 @@ impl<'de> Deserialize<'de> for AlterTableTask { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct TruncateTableTask { - pub truncate_table: TruncateTableExpr, + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: TableId, } impl TruncateTableTask { pub fn table_ref(&self) -> TableReference { TableReference { - catalog: &self.truncate_table.catalog_name, - schema: &self.truncate_table.schema_name, - table: &self.truncate_table.table_name, + catalog: &self.catalog, + schema: &self.schema, + table: &self.table, } } pub fn table_name(&self) -> TableName { - let table = &self.truncate_table; - TableName { - catalog_name: table.catalog_name.to_string(), - schema_name: table.schema_name.to_string(), - table_name: table.table_name.to_string(), + catalog_name: self.catalog.to_string(), + schema_name: self.schema.to_string(), + table_name: self.table.to_string(), } } } @@ -388,39 +404,20 @@ impl TryFrom for TruncateTableTask { fn try_from(pb: PbTruncateTableTask) -> Result { let truncate_table = pb.truncate_table.context(error::InvalidProtoMsgSnafu { - err_msg: "expected truncate_table", + err_msg: "expected drop table", })?; - Ok(TruncateTableTask { truncate_table }) - } -} - -impl Serialize for TruncateTableTask { - fn serialize(&self, serializer: S) -> result::Result - where - S: serde::Serializer, - { - let pb = PbTruncateTableTask { - truncate_table: Some(self.truncate_table.clone()), - }; - let buf = pb.encode_to_vec(); - serializer.serialize_bytes(&buf) - } -} - -impl<'de> Deserialize<'de> for TruncateTableTask { - fn deserialize(deserializer: D) -> result::Result - where - D: serde::Deserializer<'de>, - { - let buf = Vec::::deserialize(deserializer)?; - let task: PbTruncateTableTask = PbTruncateTableTask::decode(&*buf) - .map_err(|err| serde::de::Error::custom(err.to_string()))?; - - let task = TruncateTableTask::try_from(task) - .map_err(|err| serde::de::Error::custom(err.to_string()))?; - - Ok(task) + Ok(Self { + catalog: truncate_table.catalog_name, + schema: truncate_table.schema_name, + table: truncate_table.table_name, + table_id: truncate_table + .table_id + .context(error::InvalidProtoMsgSnafu { + err_msg: "expected table_id", + })? + .id, + }) } } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 460f971df53f..d81ffd5d2716 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{column_def, AlterExpr, CreateTableExpr, TruncateTableExpr}; +use api::v1::{column_def, AlterExpr, CreateTableExpr}; use catalog::CatalogManagerRef; use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -170,15 +170,8 @@ impl StatementExecutor { .with_context(|| TableNotFoundSnafu { table_name: table_name.to_string(), })?; - let table_id = table.table_info().ident.table_id; - - let expr = TruncateTableExpr { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - table_id: Some(api::v1::TableId { id: table_id }), - }; - self.truncate_table_procedure(&expr).await?; + let table_id = table.table_info().table_id(); + self.truncate_table_procedure(&table_name, table_id).await?; Ok(Output::AffectedRows(0)) } @@ -305,10 +298,16 @@ impl StatementExecutor { async fn truncate_table_procedure( &self, - truncate_table: &TruncateTableExpr, + table_name: &TableName, + table_id: TableId, ) -> Result { let request = SubmitDdlTaskRequest { - task: DdlTask::new_truncate_table(truncate_table.clone()), + task: DdlTask::new_truncate_table( + table_name.catalog_name.to_string(), + table_name.schema_name.to_string(), + table_name.table_name.to_string(), + table_id, + ), }; self.ddl_executor diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 4964a665861e..27529b89b1b4 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -116,6 +116,10 @@ impl RegionRequest { compact.region_id.into(), Self::Compact(RegionCompactRequest {}), )]), + region_request::Body::Truncate(truncate) => Ok(vec![( + truncate.region_id.into(), + Self::Truncate(RegionTruncateRequest {}), + )]), } } }