diff --git a/Cargo.lock b/Cargo.lock index 9edc6dcc7045..5e71b5c64fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2676,7 +2676,6 @@ dependencies = [ "store-api", "substrait 0.4.0-nightly", "table", - "table-procedure", "tokio", "tokio-stream", "toml 0.7.6", @@ -9515,30 +9514,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "table-procedure" -version = "0.4.0-nightly" -dependencies = [ - "async-trait", - "catalog", - "common-catalog", - "common-error", - "common-procedure", - "common-procedure-test", - "common-telemetry", - "common-test-util", - "datatypes", - "log-store", - "mito", - "object-store", - "serde", - "serde_json", - "snafu", - "storage", - "table", - "tokio", -] - [[package]] name = "tagptr" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index c26db0e06854..647df95383f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,6 @@ members = [ "src/storage", "src/store-api", "src/table", - "src/table-procedure", # TODO: add this back once the region server is available # "tests-integration", "tests/runner", diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 7fc1d0eec581..e0d1c495cac6 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -66,7 +66,6 @@ storage = { workspace = true } store-api = { workspace = true } substrait = { workspace = true } table = { workspace = true } -table-procedure = { workspace = true } tokio-stream = { version = "0.1", features = ["net"] } tokio.workspace = true toml.workspace = true diff --git a/src/table-procedure/Cargo.toml b/src/table-procedure/Cargo.toml deleted file mode 100644 index 1c1ce73c3710..000000000000 --- a/src/table-procedure/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "table-procedure" -version.workspace = true -edition.workspace = true -license.workspace = true - -[dependencies] -async-trait.workspace = true -catalog = { workspace = true } -common-error = { workspace = true } -common-procedure = { workspace = true } -common-telemetry = { workspace = true } -datatypes = { workspace = true } -serde.workspace = true -serde_json.workspace = true -snafu.workspace = true -table = { workspace = true } - -[dev-dependencies] -common-catalog = { workspace = true } -common-procedure-test = { workspace = true } -common-test-util = { workspace = true } -log-store = { workspace = true } -mito = { workspace = true } -object-store = { workspace = true } -storage = { workspace = true } -tokio.workspace = true diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs deleted file mode 100644 index 72f0ac2ab9b0..000000000000 --- a/src/table-procedure/src/alter.rs +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Procedure to alter a table. - -use async_trait::async_trait; -use catalog::CatalogManagerRef; -use common_procedure::error::SubprocedureFailedSnafu; -use common_procedure::{ - Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, - ProcedureWithId, Result, Status, -}; -use common_telemetry::logging; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; -use table::engine::{EngineContext, TableEngineProcedureRef, TableReference}; -use table::metadata::TableId; -use table::requests::{AlterKind, AlterTableRequest}; - -use crate::error::{ - AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableExistsSnafu, - TableNotFoundSnafu, -}; - -/// Procedure to alter a table. -pub struct AlterTableProcedure { - data: AlterTableData, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, -} - -#[async_trait] -impl Procedure for AlterTableProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, ctx: &Context) -> Result { - match self.data.state { - AlterTableState::Prepare => self.on_prepare().await, - AlterTableState::EngineAlterTable => self.on_engine_alter_table(ctx).await, - // No more need to "rename table in catalog", because the table metadata is now stored - // in kv backend, and updated by the unified DDL procedure soon. For ordinary tables, - // catalog manager will be a readonly proxy. - } - } - - fn dump(&self) -> Result { - let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; - Ok(json) - } - - fn lock_key(&self) -> LockKey { - // We lock the whole table. - let table_name = self.data.table_ref().to_string(); - // If alter kind is rename, we also need to lock the renamed table. - if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { - let new_table_name = TableReference { - catalog: &self.data.request.catalog_name, - schema: &self.data.request.schema_name, - table: new_table_name, - } - .to_string(); - LockKey::new([table_name, new_table_name]) - } else { - LockKey::single(table_name) - } - } -} - -impl AlterTableProcedure { - const TYPE_NAME: &str = "table-procedure:AlterTableProcedure"; - - /// Returns a new [AlterTableProcedure]. - pub fn new( - request: AlterTableRequest, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - ) -> AlterTableProcedure { - AlterTableProcedure { - data: AlterTableData { - state: AlterTableState::Prepare, - request, - table_id: None, - subprocedure_id: None, - }, - catalog_manager, - engine_procedure, - } - } - - /// Register the loader of this procedure to the `procedure_manager`. - /// - /// # Panics - /// Panics on error. - pub fn register_loader( - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - procedure_manager: &dyn ProcedureManager, - ) { - procedure_manager - .register_loader( - Self::TYPE_NAME, - Box::new(move |data| { - Self::from_json(data, catalog_manager.clone(), engine_procedure.clone()) - .map(|p| Box::new(p) as _) - }), - ) - .unwrap() - } - - /// Recover the procedure from json. - fn from_json( - json: &str, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - ) -> Result { - let data: AlterTableData = serde_json::from_str(json).context(DeserializeProcedureSnafu)?; - - Ok(AlterTableProcedure { - data, - catalog_manager, - engine_procedure, - }) - } - - async fn on_prepare(&mut self) -> Result { - let request = &self.data.request; - let table = self - .catalog_manager - .table( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) - .await - .context(AccessCatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - name: format!( - "{}.{}.{}", - request.catalog_name, request.schema_name, request.table_name - ), - })?; - - if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { - ensure!( - self.catalog_manager - .table(&request.catalog_name, &request.schema_name, new_table_name) - .await - .context(AccessCatalogSnafu)? - .is_none(), - TableExistsSnafu { - name: format!( - "{}.{}.{}", - request.catalog_name, request.schema_name, new_table_name - ), - } - ); - } - - self.data.state = AlterTableState::EngineAlterTable; - // Assign procedure id to the subprocedure. - self.data.subprocedure_id = Some(ProcedureId::random()); - // Set the table id. - self.data.table_id = Some(table.table_info().ident.table_id); - - Ok(Status::executing(true)) - } - - async fn on_engine_alter_table(&mut self, ctx: &Context) -> Result { - // Safety: subprocedure id is always set in this state. - let sub_id = self.data.subprocedure_id.unwrap(); - - // Query subprocedure state. - let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { - logging::info!( - "On engine alter table {}, subprocedure not found, sub_id: {}", - self.data.request.table_name, - sub_id - ); - - // If the subprocedure is not found, we create a new subprocedure with the same id. - let engine_ctx = EngineContext::default(); - let procedure = self - .engine_procedure - .alter_table_procedure(&engine_ctx, self.data.request.clone()) - .map_err(Error::from_error_ext)?; - return Ok(Status::Suspended { - subprocedures: vec![ProcedureWithId { - id: sub_id, - procedure, - }], - persist: true, - }); - }; - - match sub_state { - ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { - subprocedures: Vec::new(), - persist: false, - }), - ProcedureState::Done => { - logging::info!( - "On engine alter table {}, done, sub_id: {}", - self.data.request.table_name, - sub_id - ); - Ok(Status::Done) - } - ProcedureState::Failed { error } => { - // Return error if the subprocedure is failed. - Err(error).context(SubprocedureFailedSnafu { - subprocedure_id: sub_id, - })? - } - } - } -} - -/// Represents each step while altering a table in the datanode. -#[derive(Debug, Serialize, Deserialize)] -enum AlterTableState { - /// Validate request and prepare to alter table. - Prepare, - /// Alter table in the table engine. - EngineAlterTable, -} - -/// Serializable data of [AlterTableProcedure]. -#[derive(Debug, Serialize, Deserialize)] -struct AlterTableData { - /// Current state. - state: AlterTableState, - /// Request to alter this table. - request: AlterTableRequest, - /// Id of the table. - /// - /// Available after [AlterTableState::Prepare] state. - table_id: Option, - /// Id of the subprocedure to alter this table in the engine. - /// - /// This id is `Some` while the procedure is in [AlterTableState::EngineAlterTable] - /// state. - subprocedure_id: Option, -} - -impl AlterTableData { - fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.request.catalog_name, - schema: &self.request.schema_name, - table: &self.request.table_name, - } - } -} diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs deleted file mode 100644 index d3a17ce89bec..000000000000 --- a/src/table-procedure/src/create.rs +++ /dev/null @@ -1,416 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Procedure to create a table. - -use async_trait::async_trait; -use catalog::CatalogManagerRef; -use common_procedure::error::SubprocedureFailedSnafu; -use common_procedure::{ - Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, - ProcedureWithId, Result, Status, -}; -use common_telemetry::logging; -use serde::{Deserialize, Serialize}; -use snafu::ResultExt; -use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference}; -use table::requests::{CreateTableRequest, OpenTableRequest}; - -use crate::error::{ - AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableExistsSnafu, -}; - -/// Procedure to create a table. -pub struct CreateTableProcedure { - data: CreateTableData, - catalog_manager: CatalogManagerRef, - table_engine: TableEngineRef, - engine_procedure: TableEngineProcedureRef, -} - -#[async_trait] -impl Procedure for CreateTableProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, ctx: &Context) -> Result { - match self.data.state { - CreateTableState::Prepare => self.on_prepare().await, - CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await, - CreateTableState::RegisterCatalog => self.on_register_catalog().await, - } - } - - fn dump(&self) -> Result { - let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; - Ok(json) - } - - fn lock_key(&self) -> LockKey { - // We lock the whole table. - let table_name = self.data.table_ref().to_string(); - LockKey::single(table_name) - } -} - -impl CreateTableProcedure { - const TYPE_NAME: &str = "table-procedure::CreateTableProcedure"; - - /// Returns a new [CreateTableProcedure]. - pub fn new( - request: CreateTableRequest, - catalog_manager: CatalogManagerRef, - table_engine: TableEngineRef, - engine_procedure: TableEngineProcedureRef, - ) -> CreateTableProcedure { - CreateTableProcedure { - data: CreateTableData { - state: CreateTableState::Prepare, - request, - subprocedure_id: None, - }, - catalog_manager, - table_engine, - engine_procedure, - } - } - - /// Register the loader of this procedure to the `procedure_manager`. - /// - /// # Panics - /// Panics on error. - pub fn register_loader( - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - table_engine: TableEngineRef, - procedure_manager: &dyn ProcedureManager, - ) { - procedure_manager - .register_loader( - Self::TYPE_NAME, - Box::new(move |data| { - Self::from_json( - data, - catalog_manager.clone(), - table_engine.clone(), - engine_procedure.clone(), - ) - .map(|p| Box::new(p) as _) - }), - ) - .unwrap() - } - - /// Recover the procedure from json. - fn from_json( - json: &str, - catalog_manager: CatalogManagerRef, - table_engine: TableEngineRef, - engine_procedure: TableEngineProcedureRef, - ) -> Result { - let data: CreateTableData = - serde_json::from_str(json).context(DeserializeProcedureSnafu)?; - - Ok(CreateTableProcedure { - data, - catalog_manager, - table_engine, - engine_procedure, - }) - } - - async fn on_prepare(&mut self) -> Result { - let table_exists = self - .catalog_manager - .table_exist( - &self.data.request.catalog_name, - &self.data.request.schema_name, - &self.data.request.table_name, - ) - .await - .context(AccessCatalogSnafu)?; - if table_exists { - return if self.data.request.create_if_not_exists { - Ok(Status::Done) - } else { - TableExistsSnafu { - name: &self.data.request.table_name, - } - .fail()? - }; - } - - self.data.state = CreateTableState::EngineCreateTable; - // Assign procedure id to the subprocedure. - self.data.subprocedure_id = Some(ProcedureId::random()); - - Ok(Status::executing(true)) - } - - async fn on_engine_create_table(&mut self, ctx: &Context) -> Result { - // Safety: subprocedure id is always set in this state. - let sub_id = self.data.subprocedure_id.unwrap(); - - // Query subprocedure state. - let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { - // We need to submit the subprocedure if it doesn't exist. We always need to - // do this check as we might not submitted the subprocedure yet when the manager - // recover this procedure from procedure store. - logging::info!( - "On engine create table {}, table_id: {}, subprocedure not found, sub_id: {}", - self.data.request.table_name, - self.data.request.id, - sub_id - ); - - // If the sub procedure is not found, we create a new sub procedure with the same id. - let engine_ctx = EngineContext::default(); - let procedure = self - .engine_procedure - .create_table_procedure(&engine_ctx, self.data.request.clone()) - .map_err(Error::from_error_ext)?; - return Ok(Status::Suspended { - subprocedures: vec![ProcedureWithId { - id: sub_id, - procedure, - }], - persist: true, - }); - }; - - match sub_state { - ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { - subprocedures: Vec::new(), - persist: false, - }), - ProcedureState::Done => { - logging::info!( - "On engine create table {}, table_id: {}, done, sub_id: {}", - self.data.request.table_name, - self.data.request.id, - sub_id - ); - // The sub procedure is done, we can execute next step. - self.data.state = CreateTableState::RegisterCatalog; - Ok(Status::executing(true)) - } - ProcedureState::Failed { error } => { - // Return error if the subprocedure is failed. - Err(error).context(SubprocedureFailedSnafu { - subprocedure_id: sub_id, - })? - } - } - } - - async fn on_register_catalog(&mut self) -> Result { - if self - .catalog_manager - .table_exist( - &self.data.request.catalog_name, - &self.data.request.schema_name, - &self.data.request.table_name, - ) - .await - .map_err(Error::from_error_ext)? - { - return Ok(Status::Done); - } - - // If we recover the procedure from json, then the table engine hasn't open this table yet, - // so we need to use `open_table()` instead of `get_table()`. - let engine_ctx = EngineContext::default(); - let open_req = OpenTableRequest { - catalog_name: self.data.request.catalog_name.clone(), - schema_name: self.data.request.schema_name.clone(), - table_name: self.data.request.table_name.clone(), - table_id: self.data.request.id, - region_numbers: self.data.request.region_numbers.clone(), - }; - // Safety: The table is already created. - let _ = self - .table_engine - .open_table(&engine_ctx, open_req) - .await - .map_err(Error::from_error_ext)? - .unwrap(); - - Ok(Status::Done) - } -} - -/// Represents each step while creating a table in the datanode. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] -enum CreateTableState { - /// Validate request and prepare to create table. - Prepare, - /// Create table in the table engine. - EngineCreateTable, - /// Register the table to the catalog. - RegisterCatalog, -} - -/// Serializable data of [CreateTableProcedure]. -#[derive(Debug, Serialize, Deserialize)] -struct CreateTableData { - /// Current state. - state: CreateTableState, - /// Request to create this table. - request: CreateTableRequest, - /// Id of the subprocedure to create this table in the engine. - /// - /// This id is `Some` while the procedure is in [CreateTableState::EngineCreateTable] - /// state. - subprocedure_id: Option, -} - -impl CreateTableData { - fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.request.catalog_name, - schema: &self.request.schema_name, - table: &self.request.table_name, - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use common_procedure_test::{ - execute_procedure_once, execute_procedure_until_done, execute_until_suspended_or_done, - MockContextProvider, - }; - use table::engine::{EngineContext, TableEngine}; - - use super::*; - use crate::test_util::{self, TestEnv}; - - #[tokio::test] - async fn test_create_table_procedure() { - let TestEnv { - dir: _dir, - table_engine, - procedure_manager, - catalog_manager, - } = TestEnv::new("create"); - - let table_name = "test_create"; - let request = test_util::new_create_request(table_name); - let procedure = CreateTableProcedure::new( - request.clone(), - catalog_manager, - table_engine.clone(), - table_engine.clone(), - ); - - let engine_ctx = EngineContext::default(); - assert!(table_engine - .get_table(&engine_ctx, request.id) - .unwrap() - .is_none()); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); - watcher.changed().await.unwrap(); - - assert!(table_engine - .get_table(&engine_ctx, request.id) - .unwrap() - .is_some()); - } - - #[tokio::test] - async fn test_recover_register_catalog() { - common_telemetry::init_default_ut_logging(); - - let TestEnv { - dir, - table_engine, - procedure_manager: _, - catalog_manager, - } = TestEnv::new("create"); - - let table_name = "test_create"; - let request = test_util::new_create_request(table_name); - let procedure = CreateTableProcedure::new( - request.clone(), - catalog_manager, - table_engine.clone(), - table_engine.clone(), - ); - - let engine_ctx = EngineContext::default(); - assert!(table_engine - .get_table(&engine_ctx, request.id) - .unwrap() - .is_none()); - - let procedure_id = ProcedureId::random(); - let mut procedure = Box::new(procedure); - // Execute until suspended. We use an empty provider so the parent can submit - // a new subprocedure as the it can't find the subprocedure. - let mut subprocedures = execute_until_suspended_or_done( - procedure_id, - MockContextProvider::default(), - &mut procedure, - ) - .await - .unwrap(); - assert_eq!(1, subprocedures.len()); - // Execute the subprocedure. - let mut subprocedure = subprocedures.pop().unwrap(); - execute_procedure_until_done(&mut subprocedure.procedure).await; - let states = HashMap::from([(subprocedure.id, ProcedureState::Done)]); - // Execute the parent procedure once. - let _ = execute_procedure_once( - procedure_id, - MockContextProvider::new(states), - &mut procedure, - ) - .await; - assert_eq!(CreateTableState::RegisterCatalog, procedure.data.state); - - // Close the table engine and reopen the TestEnv. - table_engine.close().await.unwrap(); - let TestEnv { - dir: _dir, - table_engine, - procedure_manager: _, - catalog_manager, - } = TestEnv::from_temp_dir(dir); - - // Recover the procedure - let json = procedure.dump().unwrap(); - let procedure = CreateTableProcedure::from_json( - &json, - catalog_manager, - table_engine.clone(), - table_engine.clone(), - ) - .unwrap(); - let mut procedure = Box::new(procedure); - assert_eq!(CreateTableState::RegisterCatalog, procedure.data.state); - // Execute until done. - execute_procedure_until_done(&mut procedure).await; - - // The table is created. - assert!(table_engine - .get_table(&engine_ctx, request.id) - .unwrap() - .is_some()); - } -} diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs deleted file mode 100644 index 7dc1bafd7a7b..000000000000 --- a/src/table-procedure/src/drop.rs +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Procedure to drop a table. - -use async_trait::async_trait; -use catalog::CatalogManagerRef; -use common_procedure::error::SubprocedureFailedSnafu; -use common_procedure::{ - Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, - ProcedureWithId, Result, Status, -}; -use common_telemetry::logging; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; -use table::engine::{EngineContext, TableEngineProcedureRef, TableReference}; -use table::requests::DropTableRequest; - -use crate::error::{ - AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu, -}; - -/// Procedure to drop a table. -pub struct DropTableProcedure { - data: DropTableData, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, -} - -#[async_trait] -impl Procedure for DropTableProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, ctx: &Context) -> Result { - match self.data.state { - DropTableState::Prepare => self.on_prepare().await, - DropTableState::RemoveFromCatalog => self.on_remove_from_catalog().await, - DropTableState::EngineDropTable => self.on_engine_drop_table(ctx).await, - } - } - - fn dump(&self) -> Result { - let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; - Ok(json) - } - - fn lock_key(&self) -> LockKey { - // We lock the whole table. - let table_name = self.data.table_ref().to_string(); - LockKey::single(table_name) - } -} - -impl DropTableProcedure { - const TYPE_NAME: &str = "table-procedure::DropTableProcedure"; - - /// Returns a new [DropTableProcedure]. - pub fn new( - request: DropTableRequest, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - ) -> DropTableProcedure { - DropTableProcedure { - data: DropTableData { - state: DropTableState::Prepare, - request, - subprocedure_id: None, - }, - catalog_manager, - engine_procedure, - } - } - - /// Register the loader of this procedure to the `procedure_manager`. - /// - /// # Panics - /// Panics on error. - pub fn register_loader( - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - procedure_manager: &dyn ProcedureManager, - ) { - procedure_manager - .register_loader( - Self::TYPE_NAME, - Box::new(move |data| { - Self::from_json(data, catalog_manager.clone(), engine_procedure.clone()) - .map(|p| Box::new(p) as _) - }), - ) - .unwrap() - } - - /// Recover the procedure from json. - fn from_json( - json: &str, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - ) -> Result { - let data: DropTableData = serde_json::from_str(json).context(DeserializeProcedureSnafu)?; - - Ok(DropTableProcedure { - data, - catalog_manager, - engine_procedure, - }) - } - - async fn on_prepare(&mut self) -> Result { - let request = &self.data.request; - // Ensure the table exists. - let table_exists = self - .catalog_manager - .table_exist( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) - .await - .context(AccessCatalogSnafu)?; - ensure!( - table_exists, - TableNotFoundSnafu { - name: &request.table_name, - } - ); - - self.data.state = DropTableState::RemoveFromCatalog; - - Ok(Status::executing(true)) - } - - async fn on_remove_from_catalog(&mut self) -> Result { - self.data.state = DropTableState::EngineDropTable; - // Assign procedure id to the subprocedure. - self.data.subprocedure_id = Some(ProcedureId::random()); - - Ok(Status::executing(true)) - } - - async fn on_engine_drop_table(&mut self, ctx: &Context) -> Result { - // Safety: subprocedure id is always set in this state. - let sub_id = self.data.subprocedure_id.unwrap(); - - // Query subprocedure state. - let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { - logging::info!( - "On engine drop table {}, subprocedure not found, sub_id: {}", - self.data.request.table_name, - sub_id - ); - - // If the subprocedure is not found, we create a new subprocedure with the same id. - let engine_ctx = EngineContext::default(); - - let procedure = self - .engine_procedure - .drop_table_procedure(&engine_ctx, self.data.request.clone()) - .map_err(Error::from_error_ext)?; - return Ok(Status::Suspended { - subprocedures: vec![ProcedureWithId { - id: sub_id, - procedure, - }], - persist: true, - }); - }; - - match sub_state { - ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { - subprocedures: Vec::new(), - persist: false, - }), - ProcedureState::Done => { - logging::info!( - "On engine drop table {}, done, sub_id: {}", - self.data.request.table_name, - sub_id - ); - - Ok(Status::Done) - } - ProcedureState::Failed { error } => { - // Return error if the subprocedure is failed. - Err(error).context(SubprocedureFailedSnafu { - subprocedure_id: sub_id, - })? - } - } - } -} - -/// Represents each step while dropping a table in the datanode. -#[derive(Debug, Serialize, Deserialize)] -enum DropTableState { - /// Validate request and prepare to drop table. - Prepare, - /// Remove the table from the catalog. - RemoveFromCatalog, - /// Drop table in the table engine. - EngineDropTable, -} - -/// Serializable data of [DropTableProcedure]. -#[derive(Debug, Serialize, Deserialize)] -struct DropTableData { - /// Current state. - state: DropTableState, - /// Request to drop this table. - request: DropTableRequest, - /// Id of the subprocedure to drop this table from the engine. - /// - /// This id is `Some` while the procedure is in [DropTableState::EngineDropTable] - /// state. - subprocedure_id: Option, -} - -impl DropTableData { - fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.request.catalog_name, - schema: &self.request.schema_name, - table: &self.request.table_name, - } - } -} - -#[cfg(test)] -mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - - use super::*; - use crate::test_util::TestEnv; - - #[tokio::test] - async fn test_drop_not_exists_table() { - common_telemetry::init_default_ut_logging(); - let TestEnv { - dir: _, - table_engine, - procedure_manager: _, - catalog_manager, - } = TestEnv::new("drop"); - let table_name = "test_drop"; - - let request = DropTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id: 0, - }; - - let mut procedure = - DropTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); - assert!(procedure.on_prepare().await.is_err()); - } -} diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs deleted file mode 100644 index a6aa0a441ab1..000000000000 --- a/src/table-procedure/src/error.rs +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; - -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; -use snafu::{Location, Snafu}; - -#[derive(Debug, Snafu)] -#[snafu(visibility(pub(crate)))] -pub enum Error { - #[snafu(display("Failed to serialize procedure to json, source: {}", source))] - SerializeProcedure { - source: serde_json::Error, - location: Location, - }, - - #[snafu(display("Failed to deserialize procedure from json, source: {}", source))] - DeserializeProcedure { - source: serde_json::Error, - location: Location, - }, - - #[snafu(display("Invalid raw schema, source: {}", source))] - InvalidRawSchema { - location: Location, - source: datatypes::error::Error, - }, - - #[snafu(display("Failed to access catalog, source: {}", source))] - AccessCatalog { - location: Location, - source: catalog::error::Error, - }, - - #[snafu(display("Table {} not found", name))] - TableNotFound { name: String }, - - #[snafu(display("Table already exists: {}", name))] - TableExists { name: String }, - - #[snafu(display("Failed to deregister table: {}", name))] - DeregisterTable { name: String }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - use Error::*; - - match self { - DeregisterTable { .. } | SerializeProcedure { .. } | DeserializeProcedure { .. } => { - StatusCode::Internal - } - InvalidRawSchema { source, .. } => source.status_code(), - AccessCatalog { source, .. } => source.status_code(), - TableNotFound { .. } => StatusCode::TableNotFound, - TableExists { .. } => StatusCode::TableAlreadyExists, - } - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -impl From for common_procedure::Error { - fn from(e: Error) -> common_procedure::Error { - common_procedure::Error::from_error_ext(e) - } -} diff --git a/src/table-procedure/src/lib.rs b/src/table-procedure/src/lib.rs deleted file mode 100644 index 0963adb474d2..000000000000 --- a/src/table-procedure/src/lib.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Procedures for table operations. - -mod alter; -mod create; -mod drop; -pub mod error; -mod truncate; - -pub use alter::AlterTableProcedure; -use catalog::CatalogManagerRef; -use common_procedure::ProcedureManager; -pub use create::CreateTableProcedure; -pub use drop::DropTableProcedure; -use table::engine::{TableEngineProcedureRef, TableEngineRef}; -pub use truncate::TruncateTableProcedure; - -/// Register all procedure loaders to the procedure manager. -/// -/// # Panics -/// Panics on error. -#[allow(clippy::items_after_test_module)] -pub fn register_procedure_loaders( - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - table_engine: TableEngineRef, - procedure_manager: &dyn ProcedureManager, -) { - CreateTableProcedure::register_loader( - catalog_manager.clone(), - engine_procedure.clone(), - table_engine, - procedure_manager, - ); - AlterTableProcedure::register_loader( - catalog_manager.clone(), - engine_procedure.clone(), - procedure_manager, - ); - DropTableProcedure::register_loader( - catalog_manager.clone(), - engine_procedure.clone(), - procedure_manager, - ); - TruncateTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager) -} - -#[cfg(test)] -mod test_util; diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs deleted file mode 100644 index c986531fcdb4..000000000000 --- a/src/table-procedure/src/test_util.rs +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; -use std::time::Duration; - -use catalog::local::MemoryCatalogManager; -use catalog::CatalogManagerRef; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_procedure::local::{LocalManager, ManagerConfig}; -use common_procedure::store::state_store::ObjectStateStore; -use common_procedure::ProcedureManagerRef; -use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datatypes::prelude::ConcreteDataType; -use datatypes::schema::{ColumnSchema, RawSchema}; -use log_store::NoopLogStore; -use mito::config::EngineConfig; -use mito::engine::{MitoEngine, MITO_ENGINE}; -use object_store::services::Fs; -use object_store::ObjectStore; -use storage::compaction::noop::NoopCompactionScheduler; -use storage::config::EngineConfig as StorageEngineConfig; -use storage::EngineImpl; -use table::requests::CreateTableRequest; - -pub struct TestEnv { - pub dir: TempDir, - pub table_engine: Arc>>, - pub procedure_manager: ProcedureManagerRef, - pub catalog_manager: CatalogManagerRef, -} - -impl TestEnv { - pub fn new(prefix: &str) -> TestEnv { - let dir = create_temp_dir(prefix); - TestEnv::from_temp_dir(dir) - } - - pub fn from_temp_dir(dir: TempDir) -> TestEnv { - let store_dir = format!("{}/db", dir.path().to_string_lossy()); - let mut builder = Fs::default(); - let _ = builder.root(&store_dir); - let object_store = ObjectStore::new(builder).unwrap().finish(); - - let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - let storage_engine = EngineImpl::new( - StorageEngineConfig::default(), - Arc::new(NoopLogStore), - object_store.clone(), - compaction_scheduler, - ) - .unwrap(); - let table_engine = Arc::new(MitoEngine::new( - EngineConfig::default(), - storage_engine, - object_store, - )); - - let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy()); - let mut builder = Fs::default(); - let _ = builder.root(&procedure_dir); - let object_store = ObjectStore::new(builder).unwrap().finish(); - - let config = ManagerConfig { - max_retry_times: 3, - retry_delay: Duration::from_secs(500), - ..Default::default() - }; - let state_store = Arc::new(ObjectStateStore::new(object_store)); - let procedure_manager = Arc::new(LocalManager::new(config, state_store)); - - let catalog_manager = MemoryCatalogManager::with_default_setup(); - - TestEnv { - dir, - table_engine, - procedure_manager, - catalog_manager, - } - } -} - -pub fn schema_for_test() -> RawSchema { - let column_schemas = vec![ - // Key - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - // Nullable value column: cpu - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - // Non-null value column: memory - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true), - ]; - - RawSchema::new(column_schemas) -} - -pub fn new_create_request(table_name: &str) -> CreateTableRequest { - CreateTableRequest { - id: 1, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - desc: Some("a test table".to_string()), - schema: schema_for_test(), - region_numbers: vec![0, 1], - create_if_not_exists: true, - primary_key_indices: vec![0], - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - } -} diff --git a/src/table-procedure/src/truncate.rs b/src/table-procedure/src/truncate.rs deleted file mode 100644 index 63bc07cc4dfb..000000000000 --- a/src/table-procedure/src/truncate.rs +++ /dev/null @@ -1,263 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Procedure to truncate a table. - -use async_trait::async_trait; -use catalog::CatalogManagerRef; -use common_procedure::error::SubprocedureFailedSnafu; -use common_procedure::{ - Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, - ProcedureWithId, Result, Status, -}; -use common_telemetry::logging; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; -use table::engine::{EngineContext, TableEngineProcedureRef, TableReference}; -use table::requests::TruncateTableRequest; - -use crate::error::{ - AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu, -}; - -/// Procedure to truncate a table. -#[allow(dead_code)] -pub struct TruncateTableProcedure { - data: TruncateTableData, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, -} - -#[async_trait] -impl Procedure for TruncateTableProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, ctx: &Context) -> Result { - match self.data.state { - TruncateTableState::Prepare => self.on_prepare().await, - TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table(ctx).await, - } - } - - fn dump(&self) -> Result { - let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; - Ok(json) - } - - fn lock_key(&self) -> LockKey { - // We lock the whole table. - let table_name = self.data.table_ref().to_string(); - LockKey::single(table_name) - } -} - -impl TruncateTableProcedure { - const TYPE_NAME: &str = "table-procedure::TruncateTableProcedure"; - - /// Returns a new [TruncateTableProcedure]. - pub fn new( - request: TruncateTableRequest, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - ) -> TruncateTableProcedure { - TruncateTableProcedure { - data: TruncateTableData { - state: TruncateTableState::Prepare, - request, - subprocedure_id: None, - }, - catalog_manager, - engine_procedure, - } - } - - /// Register the loader of this procedure to the `procedure_manager`. - /// - /// # Panics - /// Panics on error. - pub fn register_loader( - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - procedure_manager: &dyn ProcedureManager, - ) { - procedure_manager - .register_loader( - Self::TYPE_NAME, - Box::new(move |data| { - Self::from_json(data, catalog_manager.clone(), engine_procedure.clone()) - .map(|p| Box::new(p) as _) - }), - ) - .unwrap() - } - - /// Recover the procedure from json. - fn from_json( - json: &str, - catalog_manager: CatalogManagerRef, - engine_procedure: TableEngineProcedureRef, - ) -> Result { - let data: TruncateTableData = - serde_json::from_str(json).context(DeserializeProcedureSnafu)?; - - Ok(TruncateTableProcedure { - data, - catalog_manager, - engine_procedure, - }) - } - - async fn on_prepare(&mut self) -> Result { - let request = &self.data.request; - // Ensure the table exists. - let table_exists = self - .catalog_manager - .table_exist( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) - .await - .context(AccessCatalogSnafu)?; - ensure!( - table_exists, - TableNotFoundSnafu { - name: &request.table_name, - } - ); - - self.data.state = TruncateTableState::EngineTruncateTable; - // Assign procedure id to the subprocedure. - self.data.subprocedure_id = Some(ProcedureId::random()); - - Ok(Status::executing(true)) - } - - async fn on_engine_truncate_table(&mut self, ctx: &Context) -> Result { - // Safety: subprocedure id is always set in this state. - let sub_id = self.data.subprocedure_id.unwrap(); - - // Query subprocedure state. - let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { - logging::info!( - "On engine truncate table {}, subprocedure not found, sub_id: {}", - self.data.request.table_name, - sub_id, - ); - // If the subprocedure is not found, we create a new subprocedure with the same id. - let engine_ctx = EngineContext::default(); - - let procedure = self - .engine_procedure - .truncate_table_procedure(&engine_ctx, self.data.request.clone()) - .map_err(Error::from_error_ext)?; - - return Ok(Status::Suspended { - subprocedures: vec![ProcedureWithId { - id: sub_id, - procedure, - }], - persist: true, - }); - }; - - match sub_state { - ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { - subprocedures: Vec::new(), - persist: false, - }), - ProcedureState::Done => { - logging::info!( - "On engine truncate table {}, done, sub_id: {}", - self.data.request.table_name, - sub_id - ); - - Ok(Status::Done) - } - ProcedureState::Failed { error } => { - // Return error if the subprocedure is failed. - Err(error).context(SubprocedureFailedSnafu { - subprocedure_id: sub_id, - })? - } - } - } -} - -/// Represents each step while truncating a table in the datanode. -#[derive(Debug, Serialize, Deserialize)] -enum TruncateTableState { - /// Validate request and prepare to drop table. - Prepare, - /// Truncate table in the table engine. - EngineTruncateTable, -} - -/// Serializable data of [TruncateTableProcedure]. -#[derive(Debug, Serialize, Deserialize)] -struct TruncateTableData { - /// Current state. - state: TruncateTableState, - /// Request to truncate this table. - request: TruncateTableRequest, - /// Id of the subprocedure to truncate this table from the engine. - /// - /// This id is `Some` while the procedure is in [TruncateTableState::EngineTruncateTable] - /// state. - subprocedure_id: Option, -} - -impl TruncateTableData { - fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.request.catalog_name, - schema: &self.request.schema_name, - table: &self.request.table_name, - } - } -} - -#[cfg(test)] -mod tests { - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - - use super::*; - use crate::test_util::TestEnv; - - #[tokio::test] - async fn test_truncate_not_exists_table() { - common_telemetry::init_default_ut_logging(); - let TestEnv { - dir: _, - table_engine, - procedure_manager: _, - catalog_manager, - } = TestEnv::new("truncate"); - let table_name = "test_truncate"; - - let request = TruncateTableRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id: 0, - }; - - let mut procedure = - TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); - assert!(procedure.on_prepare().await.is_err()); - } -}