diff --git a/Cargo.lock b/Cargo.lock index f299ae45fd7d..0d9dc91ce04b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4084,7 +4084,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/DevilExileSu/greptime-proto.git?rev=637f06ca7fe097ab22277f473d33840d92f4190c#637f06ca7fe097ab22277f473d33840d92f4190c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=637f06ca7fe097ab22277f473d33840d92f4190c#637f06ca7fe097ab22277f473d33840d92f4190c" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8f067c844988..7eb324198f10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,8 +79,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" } -greptime-proto = {git = "https://github.com/DevilExileSu/greptime-proto.git", rev = "637f06ca7fe097ab22277f473d33840d92f4190c"} +greptime-proto = {git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "637f06ca7fe097ab22277f473d33840d92f4190c"} humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 3f46ff241279..81cd685762d7 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -20,22 +20,24 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, - Result as ProcedureResult, Status, + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; use common_telemetry::debug; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::storage::RegionId; +use strum::AsRefStr; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; +use super::utils::handle_retry_error; use crate::ddl::utils::handle_operate_region_error; use crate::ddl::DdlContext; -use crate::error::{self, Result, TableNotFoundSnafu}; +use crate::error::{Result, TableNotFoundSnafu}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; +use crate::metrics; use crate::rpc::ddl::TruncateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; use crate::table_name::TableName; @@ -52,18 +54,20 @@ impl Procedure for TruncateTableProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let error_handler = |e| { - if matches!(e, error::Error::RetryLater { .. }) { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - }; + let state = &self.data.state; + + let _timer = common_telemetry::timer!( + metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE, + &[("step", state.as_ref().to_string())] + ); + match self.data.state { TruncateTableState::Prepare => self.on_prepare().await, - TruncateTableState::DatanodeTruncateTable => self.on_datanode_truncate_table().await, + TruncateTableState::DatanodeTruncateRegions => { + self.on_datanode_truncate_regions().await + } } - .map_err(error_handler) + .map_err(handle_retry_error) } fn dump(&self) -> ProcedureResult { @@ -83,7 +87,7 @@ impl Procedure for TruncateTableProcedure { } impl TruncateTableProcedure { - pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTableProcedure"; + pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable"; pub(crate) fn new( cluster_id: u64, @@ -125,12 +129,12 @@ impl TruncateTableProcedure { } ); - self.data.state = TruncateTableState::DatanodeTruncateTable; + self.data.state = TruncateTableState::DatanodeTruncateRegions; Ok(Status::executing(true)) } - async fn on_datanode_truncate_table(&mut self) -> Result { + async fn on_datanode_truncate_regions(&mut self) -> Result { let table_id = self.data.table_id(); let region_routes = &self.data.region_routes; @@ -195,7 +199,7 @@ impl TruncateTableData { region_routes: Vec, ) -> Self { Self { - state: TruncateTableState::DatanodeTruncateTable, + state: TruncateTableState::Prepare, cluster_id, task, table_info_value, @@ -220,10 +224,10 @@ impl TruncateTableData { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, AsRefStr)] enum TruncateTableState { /// Prepares to truncate the table Prepare, - /// Datanode truncates the table - DatanodeTruncateTable, + /// Truncates regions on Datanode + DatanodeTruncateRegions, } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 49535607af72..815c8aacb1cf 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -19,3 +19,4 @@ pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table"; pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table"; pub(crate) const METRIC_META_PROCEDURE_ALTER_TABLE: &str = "meta.procedure.alter_table"; +pub(crate) const METRIC_META_PROCEDURE_TRUNCATE_TABLE: &str = "meta.procedure.truncate_table"; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 27529b89b1b4..684012508c0d 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -417,6 +417,7 @@ pub struct RegionFlushRequest {} #[derive(Debug)] pub struct RegionCompactRequest {} +/// Truncate region request. #[derive(Debug)] pub struct RegionTruncateRequest {}