diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index eb3181e2d4b6..2304d49a70b9 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,12 +18,14 @@ use std::{fs, path}; use async_trait::async_trait; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; -use common_catalog::consts::MIN_USER_TABLE_ID; +use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::ProcedureExecutorRef; +use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef}; +use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; +use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -45,6 +47,7 @@ use frontend::server::Services; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, }; +use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ}; use mito2::config::MitoConfig; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; @@ -411,30 +414,40 @@ impl StartCommand { let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( - SequenceBuilder::new("table_id", kv_backend.clone()) + SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) .step(10) .build(), ); + let flow_task_id_sequence = Arc::new( + SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_TASK_ID as u64) + .step(10) + .build(), + ); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( opts.wal_meta.clone(), kv_backend.clone(), )); - let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; - + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), )); + let flow_task_meta_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), + ); let ddl_task_executor = Self::create_ddl_task_executor( - table_metadata_manager, procedure_manager.clone(), node_manager.clone(), multi_cache_invalidator, + table_metadata_manager, table_meta_allocator, + flow_task_metadata_manager, + flow_task_meta_allocator, ) .await?; @@ -462,20 +475,26 @@ impl StartCommand { } pub async fn create_ddl_task_executor( - table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, node_manager: NodeManagerRef, cache_invalidator: CacheInvalidatorRef, - table_meta_allocator: TableMetadataAllocatorRef, + table_metadata_manager: TableMetadataManagerRef, + table_metadata_allocator: TableMetadataAllocatorRef, + flow_task_metadata_manager: FlowTaskMetadataManagerRef, + flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( + DdlContext { + node_manager, + cache_invalidator, + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, + }, procedure_manager, - node_manager, - cache_invalidator, - table_metadata_manager, - table_meta_allocator, - Arc::new(MemoryRegionKeeper::default()), true, ) .context(InitDdlManagerSnafu)?, diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 8834b6239f91..175435d89842 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -19,6 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime"; pub const DEFAULT_SCHEMA_NAME: &str = "public"; pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private"; +/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage. +/// User defined table id starts from this value. +pub const MIN_USER_FLOW_TASK_ID: u32 = 1024; /// Reserves [0,MIN_USER_TABLE_ID) for internal usage. /// User defined table id starts from this value. pub const MIN_USER_TABLE_ID: u32 = 1024; diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3feea55253ef..5bdcb1f68e2b 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -18,9 +18,11 @@ use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionNumber, TableId}; -use self::table_meta::TableMetadataAllocatorRef; use crate::cache_invalidator::CacheInvalidatorRef; +use crate::ddl::table_meta::TableMetadataAllocatorRef; +use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef; use crate::error::Result; +use crate::key::flow_task::FlowTaskMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; @@ -31,6 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu pub mod alter_logical_tables; pub mod alter_table; pub mod create_database; +pub mod create_flow; pub mod create_logical_tables; pub mod create_table; mod create_table_template; @@ -38,6 +41,7 @@ pub mod drop_database; pub mod drop_table; mod physical_table_metadata; pub mod table_meta; +pub mod task_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] @@ -93,11 +97,21 @@ pub struct TableMetadata { pub region_wal_options: HashMap, } +/// The context of ddl. #[derive(Clone)] pub struct DdlContext { + /// Sends querying and requests to nodes. pub node_manager: NodeManagerRef, + /// Cache invalidation. pub cache_invalidator: CacheInvalidatorRef, - pub table_metadata_manager: TableMetadataManagerRef, + /// Keep tracking operating regions. pub memory_region_keeper: MemoryRegionKeeperRef, + /// Table metadata manager. + pub table_metadata_manager: TableMetadataManagerRef, + /// Allocator for table metadata. pub table_metadata_allocator: TableMetadataAllocatorRef, + /// Flow task metadata manager. + pub flow_task_metadata_manager: FlowTaskMetadataManagerRef, + /// Allocator for flow task metadata. + pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef, } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs new file mode 100644 index 000000000000..018c7dc84276 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow.rs @@ -0,0 +1,246 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod check; +mod metadata; + +use std::collections::BTreeMap; + +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{CreateRequest, FlowRequest}; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use futures::future::join_all; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use strum::AsRefStr; +use table::metadata::TableId; + +use super::utils::add_peer_context_if_needed; +use crate::ddl::utils::handle_retry_error; +use crate::ddl::DdlContext; +use crate::error::Result; +use crate::key::flow_task::flow_task_info::FlowTaskInfoValue; +use crate::key::FlowTaskId; +use crate::lock_key::{CatalogLock, FlowTaskNameLock, TableNameLock}; +use crate::peer::Peer; +use crate::rpc::ddl::CreateFlowTask; +use crate::{metrics, ClusterId}; + +/// The procedure of flow task creation. +pub struct CreateFlowProcedure { + pub context: DdlContext, + pub data: CreateFlowTaskData, +} + +impl CreateFlowProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow"; + + /// Returns a new [CreateFlowProcedure]. + pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { + Self { + context, + data: CreateFlowTaskData { + cluster_id, + task, + flow_task_id: None, + peers: vec![], + source_table_ids: vec![], + state: CreateFlowTaskState::CreateMetadata, + }, + } + } + + /// Deserializes from `json`. + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(CreateFlowProcedure { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + self.check_creation().await?; + self.collect_source_tables().await?; + self.allocate_flow_task_id().await?; + self.data.state = CreateFlowTaskState::CreateFlows; + + Ok(Status::executing(true)) + } + + async fn on_flownode_create_flows(&mut self) -> Result { + // Safety: must be allocated. + let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); + for peer in &self.data.peers { + let requester = self.context.node_manager.flownode(peer).await; + let request = FlowRequest { + body: Some(PbFlowRequest::Create((&self.data).into())), + }; + create_flow_task.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer.clone())) + }); + } + + join_all(create_flow_task) + .await + .into_iter() + .collect::>>()?; + + self.data.state = CreateFlowTaskState::CreateMetadata; + Ok(Status::executing(true)) + } + + /// Creates flow task metadata. + /// + /// Abort(not-retry): + /// - Failed to create table metadata. + async fn on_create_metadata(&mut self) -> Result { + // Safety: The flow task id must be allocated. + let flow_task_id = self.data.flow_task_id.unwrap(); + // TODO(weny): Support `or_replace`. + self.context + .flow_task_metadata_manager + .create_flow_task_metadata(flow_task_id, (&self.data).into()) + .await?; + info!("Created flow task metadata for flow {flow_task_id}"); + Ok(Status::done_with_output(flow_task_id)) + } +} + +#[async_trait] +impl Procedure for CreateFlowProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateFlowTaskState::Prepare => self.on_prepare().await, + CreateFlowTaskState::CreateFlows => self.on_flownode_create_flows().await, + CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + let sink_table_name = &self.data.task.sink_table_name; + + LockKey::new(vec![ + CatalogLock::Read(catalog_name).into(), + TableNameLock::new( + &sink_table_name.catalog_name, + &sink_table_name.schema_name, + &sink_table_name.catalog_name, + ) + .into(), + FlowTaskNameLock::new(catalog_name, task_name).into(), + ]) + } +} + +/// The state of [CreateFlowTaskProcedure]. +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] +pub enum CreateFlowTaskState { + /// Prepares to create the flow. + Prepare, + /// Creates flows on the flownode. + CreateFlows, + /// Create metadata. + CreateMetadata, +} + +/// The serializable data. +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateFlowTaskData { + pub(crate) cluster_id: ClusterId, + pub(crate) state: CreateFlowTaskState, + pub(crate) task: CreateFlowTask, + pub(crate) flow_task_id: Option, + pub(crate) peers: Vec, + pub(crate) source_table_ids: Vec, +} + +impl From<&CreateFlowTaskData> for CreateRequest { + fn from(value: &CreateFlowTaskData) -> Self { + let flow_task_id = value.flow_task_id.unwrap(); + let source_table_ids = &value.source_table_ids; + + CreateRequest { + task_id: Some(api::v1::flow::TaskId { id: flow_task_id }), + source_table_ids: source_table_ids + .iter() + .map(|table_id| api::v1::TableId { id: *table_id }) + .collect_vec(), + sink_table_name: Some(value.task.sink_table_name.clone().into()), + // Always be true + create_if_not_exists: true, + expire_when: value.task.expire_when.clone(), + comment: value.task.comment.clone(), + sql: value.task.sql.clone(), + task_options: value.task.options.clone(), + } + } +} + +impl From<&CreateFlowTaskData> for FlowTaskInfoValue { + fn from(value: &CreateFlowTaskData) -> Self { + let CreateFlowTask { + catalog_name, + task_name, + sink_table_name, + expire_when, + comment, + sql, + options, + .. + } = value.task.clone(); + + let flownode_ids = value + .peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, peer.id)) + .collect::>(); + + FlowTaskInfoValue { + source_table_ids: value.source_table_ids.clone(), + sink_table_name, + flownode_ids, + catalog_name, + task_name, + raw_sql: sql, + expire_when, + comment, + options, + } + } +} diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs new file mode 100644 index 000000000000..6aa1ecb3ed00 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow/check.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; + +use crate::ddl::create_flow::CreateFlowProcedure; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; + +impl CreateFlowProcedure { + /// Checks: + /// - The new task name doesn't exist. + /// - The sink table doesn't exist. + pub(crate) async fn check_creation(&self) -> Result<()> { + let catalog_name = &self.data.task.catalog_name; + let task_name = &self.data.task.task_name; + let sink_table_name = &self.data.task.sink_table_name; + + // Ensures the task name doesn't exist. + let exists = self + .context + .flow_task_metadata_manager + .flow_task_name_manager() + .exists(catalog_name, task_name) + .await?; + ensure!( + !exists, + error::TaskAlreadyExistsSnafu { + task_name: format!("{}.{}", catalog_name, task_name), + } + ); + + // Ensures sink table doesn't exist. + let exists = self + .context + .table_metadata_manager + .table_name_manager() + .exists(TableNameKey::new( + &sink_table_name.catalog_name, + &sink_table_name.schema_name, + &sink_table_name.table_name, + )) + .await?; + ensure!( + !exists, + error::TableAlreadyExistsSnafu { + table_name: sink_table_name.to_string(), + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs new file mode 100644 index 000000000000..ce35ae91ca98 --- /dev/null +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::OptionExt; + +use crate::ddl::create_flow::CreateFlowProcedure; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; + +impl CreateFlowProcedure { + /// Allocates the [FlowTaskId]. + pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> { + // TODO(weny, ruihang): We don't support the partitions. It's always be 1, now. + let partitions = 1; + let (flow_task_id, peers) = self + .context + .flow_task_metadata_allocator + .create(partitions) + .await?; + self.data.flow_task_id = Some(flow_task_id); + self.data.peers = peers; + + Ok(()) + } + + /// Ensures all source tables exist and collects source table ids + pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { + // Ensures all source tables exist. + let keys = self + .data + .task + .source_table_names + .iter() + .map(|name| TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name)) + .collect::>(); + + let source_table_ids = self + .context + .table_metadata_manager + .table_name_manager() + .batch_get(keys) + .await?; + + let source_table_ids = self + .data + .task + .source_table_names + .iter() + .zip(source_table_ids) + .map(|(name, table_id)| { + Ok(table_id + .with_context(|| error::TableNotFoundSnafu { + table_name: name.to_string(), + })? + .table_id()) + }) + .collect::>>()?; + + self.data.source_table_ids = source_table_ids; + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/task_meta.rs b/src/common/meta/src/ddl/task_meta.rs new file mode 100644 index 000000000000..3e8a4fb36cfc --- /dev/null +++ b/src/common/meta/src/ddl/task_meta.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use tonic::async_trait; + +use crate::error::Result; +use crate::key::FlowTaskId; +use crate::peer::Peer; +use crate::sequence::SequenceRef; + +/// The reference of [FlowTaskMetadataAllocator]. +pub type FlowTaskMetadataAllocatorRef = Arc; + +/// [FlowTaskMetadataAllocator] provides the ability of: +/// - [FlowTaskId] Allocation. +/// - [FlownodeId] Selection. +#[derive(Clone)] +pub struct FlowTaskMetadataAllocator { + flow_task_id_sequence: SequenceRef, + partition_peer_allocator: PartitionPeerAllocatorRef, +} + +impl FlowTaskMetadataAllocator { + /// Returns the [FlowTaskMetadataAllocator] with [NoopPartitionPeerAllocator]. + pub fn with_noop_peer_allocator(flow_task_id_sequence: SequenceRef) -> Self { + Self { + flow_task_id_sequence, + partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator), + } + } + + /// Allocates a the [FlowTaskId]. + pub(crate) async fn allocate_flow_task_id(&self) -> Result { + let flow_task_id = self.flow_task_id_sequence.next().await? as FlowTaskId; + Ok(flow_task_id) + } + + /// Allocates the [FlowTaskId] and [Peer]s. + pub async fn create(&self, partitions: usize) -> Result<(FlowTaskId, Vec)> { + let flow_task_id = self.allocate_flow_task_id().await?; + let peers = self.partition_peer_allocator.alloc(partitions).await?; + + Ok((flow_task_id, peers)) + } +} + +/// Allocates [Peer]s for partitions. +#[async_trait] +pub trait PartitionPeerAllocator: Send + Sync { + /// Allocates [Peer] nodes for storing partitions. + async fn alloc(&self, partitions: usize) -> Result>; +} + +/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions. +pub type PartitionPeerAllocatorRef = Arc; + +struct NoopPartitionPeerAllocator; + +#[async_trait] +impl PartitionPeerAllocator for NoopPartitionPeerAllocator { + async fn alloc(&self, partitions: usize) -> Result> { + Ok(vec![Peer::default(); partitions]) + } +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 8db6198bd609..2f3d3a4eb45c 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -19,18 +19,18 @@ use common_procedure::{ }; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, info, tracing}; +use derive_builder::Builder; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::TableId; -use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::create_database::CreateDatabaseProcedure; +use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_table::DropTableProcedure; -use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; use crate::error::{ @@ -42,15 +42,13 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use crate::node_manager::NodeManagerRef; -use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::DdlTask::{ - AlterLogicalTables, AlterTable, CreateDatabase, CreateLogicalTables, CreateTable, DropDatabase, - DropLogicalTables, DropTable, TruncateTable, + AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, + DropDatabase, DropLogicalTables, DropTable, TruncateTable, }; use crate::rpc::ddl::{ - AlterTableTask, CreateDatabaseTask, CreateTableTask, DropDatabaseTask, DropTableTask, - SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask, + DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -62,33 +60,22 @@ pub type DdlManagerRef = Arc; pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader; /// The [DdlManager] provides the ability to execute Ddl. +#[derive(Builder)] pub struct DdlManager { + ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, - node_manager: NodeManagerRef, - cache_invalidator: CacheInvalidatorRef, - table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, - memory_region_keeper: MemoryRegionKeeperRef, } -/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. impl DdlManager { + /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. pub fn try_new( + ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, - datanode_clients: NodeManagerRef, - cache_invalidator: CacheInvalidatorRef, - table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, - memory_region_keeper: MemoryRegionKeeperRef, register_loaders: bool, ) -> Result { let manager = Self { + ddl_context, procedure_manager, - node_manager: datanode_clients, - cache_invalidator, - table_metadata_manager, - table_metadata_allocator, - memory_region_keeper, }; if register_loaders { manager.register_loaders()?; @@ -98,21 +85,16 @@ impl DdlManager { /// Returns the [TableMetadataManagerRef]. pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { - &self.table_metadata_manager + &self.ddl_context.table_metadata_manager } /// Returns the [DdlContext] pub fn create_context(&self) -> DdlContext { - DdlContext { - node_manager: self.node_manager.clone(), - cache_invalidator: self.cache_invalidator.clone(), - table_metadata_manager: self.table_metadata_manager.clone(), - memory_region_keeper: self.memory_region_keeper.clone(), - table_metadata_allocator: self.table_metadata_allocator.clone(), - } + self.ddl_context.clone() } - fn register_loaders(&self) -> Result<()> { + /// Registers all Ddl loaders. + pub fn register_loaders(&self) -> Result<()> { let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = vec![ ( CreateTableProcedure::TYPE_NAME, @@ -133,6 +115,15 @@ impl DdlManager { }) }, ), + ( + CreateFlowProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + CreateFlowProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), ( AlterTableProcedure::TYPE_NAME, &|context: DdlContext| -> BoxedProcedureLoader { @@ -331,6 +322,20 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + #[tracing::instrument(skip_all)] + /// Submits and executes a create flow task. + pub async fn submit_create_flow_task( + &self, + cluster_id: ClusterId, + create_flow: CreateFlowTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = CreateFlowProcedure::new(cluster_id, create_flow, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + #[tracing::instrument(skip_all)] /// Submits and executes a truncate table task. pub async fn submit_truncate_table_task( @@ -419,7 +424,7 @@ async fn handle_alter_table_task( let table_ref = alter_table_task.table_ref(); let table_id = ddl_manager - .table_metadata_manager + .table_metadata_manager() .table_name_manager() .get(TableNameKey::new( table_ref.catalog, @@ -517,7 +522,7 @@ async fn handle_create_logical_table_tasks( } ); let physical_table_id = utils::check_and_get_physical_table_id( - &ddl_manager.table_metadata_manager, + ddl_manager.table_metadata_manager(), &create_table_tasks, ) .await?; @@ -591,6 +596,35 @@ async fn handle_drop_database_task( }) } +async fn handle_create_flow_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + create_flow_task: CreateFlowTask, +) -> Result { + let (id, output) = ddl_manager + .submit_create_flow_task(cluster_id, create_flow_task.clone()) + .await?; + + let procedure_id = id.to_string(); + let output = output.context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "empty output", + })?; + let flow_id = *(output.downcast_ref::().context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "downcast to `u32`", + })?); + info!( + "Flow {}.{}({flow_id}) is created via procedure_id {id:?}", + create_flow_task.catalog_name, create_flow_task.task_name, + ); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + ..Default::default() + }) +} + async fn handle_alter_logical_table_tasks( ddl_manager: &DdlManager, cluster_id: ClusterId, @@ -610,7 +644,7 @@ async fn handle_alter_logical_table_tasks( table: &alter_table_tasks[0].alter_table.table_name, }; let physical_table_id = - utils::get_physical_table_id(&ddl_manager.table_metadata_manager, first_table).await?; + utils::get_physical_table_id(ddl_manager.table_metadata_manager(), first_table).await?; let num_logical_tables = alter_table_tasks.len(); let (id, _) = ddl_manager @@ -670,6 +704,9 @@ impl ProcedureExecutor for DdlManager { DropDatabase(drop_database_task) => { handle_drop_database_task(self, cluster_id, drop_database_task).await } + CreateFlow(create_flow_task) => { + handle_create_flow_task(self, cluster_id, create_flow_task).await + } } } .trace(span) @@ -720,7 +757,10 @@ mod tests { use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::table_meta::TableMetadataAllocator; + use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; + use crate::ddl::DdlContext; + use crate::key::flow_task::FlowTaskMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; @@ -730,7 +770,7 @@ mod tests { use crate::state_store::KvStateStore; use crate::wal_options_allocator::WalOptionsAllocator; - /// A dummy implemented [DatanodeManager]. + /// A dummy implemented [NodeManager]. pub struct DummyDatanodeManager; #[async_trait::async_trait] @@ -748,20 +788,30 @@ mod tests { fn test_try_new() { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), + Arc::new(WalOptionsAllocator::default()), + )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_task_metadata_allocator = + Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator( + Arc::new(SequenceBuilder::new("flow-test", kv_backend.clone()).build()), + )); let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); let _ = DdlManager::try_new( + DdlContext { + node_manager: Arc::new(DummyDatanodeManager), + cache_invalidator: Arc::new(DummyCacheInvalidator), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + }, procedure_manager.clone(), - Arc::new(DummyDatanodeManager), - Arc::new(DummyCacheInvalidator), - table_metadata_manager.clone(), - Arc::new(TableMetadataAllocator::new( - Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), - Arc::new(WalOptionsAllocator::default()), - )), - Arc::new(MemoryRegionKeeper::default()), true, ); diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 323b2c0fee10..40d5070c3bb3 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -23,7 +23,6 @@ use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::key::FlowTaskId; use crate::peer::Peer; use crate::DatanodeId; @@ -242,14 +241,9 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Task already exists, task: {}, flow_task_id: {}", - task_name, - flow_task_id - ))] + #[snafu(display("Task already exists: {}", task_name))] TaskAlreadyExists { task_name: String, - flow_task_id: FlowTaskId, location: Location, }, diff --git a/src/common/meta/src/key/flow_task.rs b/src/common/meta/src/key/flow_task.rs index f5fc9b4793cd..5e1c2e427ab6 100644 --- a/src/common/meta/src/key/flow_task.rs +++ b/src/common/meta/src/key/flow_task.rs @@ -18,6 +18,7 @@ pub(crate) mod flownode_task; pub(crate) mod table_task; use std::ops::Deref; +use std::sync::Arc; use common_telemetry::info; use snafu::{ensure, OptionExt}; @@ -82,12 +83,14 @@ impl> MetaKey> for FlowTaskScoped { } } +pub type FlowTaskMetadataManagerRef = Arc; + /// The manager of metadata, provides ability to: /// - Create metadata of the task. /// - Retrieve metadata of the task. /// - Delete metadata of the task. pub struct FlowTaskMetadataManager { - flow_task_manager: FlowTaskInfoManager, + flow_task_info_manager: FlowTaskInfoManager, flownode_task_manager: FlownodeTaskManager, table_task_manager: TableTaskManager, flow_task_name_manager: FlowTaskNameManager, @@ -98,7 +101,7 @@ impl FlowTaskMetadataManager { /// Returns a new [FlowTaskMetadataManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { - flow_task_manager: FlowTaskInfoManager::new(kv_backend.clone()), + flow_task_info_manager: FlowTaskInfoManager::new(kv_backend.clone()), flow_task_name_manager: FlowTaskNameManager::new(kv_backend.clone()), flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), table_task_manager: TableTaskManager::new(kv_backend.clone()), @@ -106,9 +109,14 @@ impl FlowTaskMetadataManager { } } - /// Returns the [FlowTaskManager]. - pub fn flow_task_manager(&self) -> &FlowTaskInfoManager { - &self.flow_task_manager + /// Returns the [FlowTaskNameManager]. + pub fn flow_task_name_manager(&self) -> &FlowTaskNameManager { + &self.flow_task_name_manager + } + + /// Returns the [FlowTaskInfoManager]. + pub fn flow_task_info_manager(&self) -> &FlowTaskInfoManager { + &self.flow_task_info_manager } /// Returns the [FlownodeTaskManager]. @@ -135,7 +143,7 @@ impl FlowTaskMetadataManager { )?; let (create_flow_task_txn, on_create_flow_task_failure) = - self.flow_task_manager.build_create_txn( + self.flow_task_info_manager.build_create_txn( &flow_task_value.catalog_name, flow_task_id, &flow_task_value, @@ -192,7 +200,6 @@ impl FlowTaskMetadataManager { "{}.{}", flow_task_value.catalog_name, flow_task_value.task_name ), - flow_task_id, } .fail(); } @@ -223,6 +230,7 @@ mod tests { use crate::key::flow_task::table_task::TableTaskKey; use crate::key::scope::CatalogScoped; use crate::kv_backend::memory::MemoryKvBackend; + use crate::table_name::TableName; #[derive(Debug)] struct MockKey { @@ -273,11 +281,16 @@ mod tests { let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv.clone()); let task_id = 10; let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -294,7 +307,7 @@ mod tests { .await .unwrap(); let got = flow_metadata_manager - .flow_task_manager() + .flow_task_info_manager() .get(catalog_name, task_id) .await .unwrap() @@ -332,11 +345,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -349,10 +368,10 @@ mod tests { .unwrap(); // Creates again. let flow_task_value = FlowTaskInfoValue { - catalog_name: "greptime".to_string(), + catalog_name: catalog_name.to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -371,11 +390,17 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowTaskMetadataManager::new(mem_kv); let task_id = 10; + let catalog_name = "greptime"; + let sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2049, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: sink_table_name.clone(), flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), @@ -387,11 +412,16 @@ mod tests { .await .unwrap(); // Creates again. + let another_sink_table_name = TableName { + catalog_name: catalog_name.to_string(), + schema_name: "my_schema".to_string(), + table_name: "another_sink_table".to_string(), + }; let flow_task_value = FlowTaskInfoValue { catalog_name: "greptime".to_string(), task_name: "task".to_string(), - source_tables: vec![1024, 1025, 1026], - sink_table: 2048, + source_table_ids: vec![1024, 1025, 1026], + sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), expire_when: "expr".to_string(), diff --git a/src/common/meta/src/key/flow_task/flow_task_info.rs b/src/common/meta/src/key/flow_task/flow_task_info.rs index f30d3217f8a6..371ab96a1e41 100644 --- a/src/common/meta/src/key/flow_task/flow_task_info.rs +++ b/src/common/meta/src/key/flow_task/flow_task_info.rs @@ -29,6 +29,7 @@ use crate::key::{ }; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::table_name::TableName; use crate::FlownodeId; const FLOW_TASK_INFO_KEY_PREFIX: &str = "info"; @@ -117,9 +118,9 @@ impl MetaKey for FlowTaskInfoKeyInner { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowTaskInfoValue { /// The source tables used by the task. - pub(crate) source_tables: Vec, + pub(crate) source_table_ids: Vec, /// The sink table used by the task. - pub(crate) sink_table: TableId, + pub(crate) sink_table_name: TableName, /// Which flow nodes this task is running on. pub(crate) flownode_ids: BTreeMap, /// The catalog name. @@ -144,7 +145,7 @@ impl FlowTaskInfoValue { /// Returns the `source_table`. pub fn source_table_ids(&self) -> &[TableId] { - &self.source_tables + &self.source_table_ids } } diff --git a/src/common/meta/src/key/flow_task/flow_task_name.rs b/src/common/meta/src/key/flow_task/flow_task_name.rs index 9828283e6401..eaf6da5ae848 100644 --- a/src/common/meta/src/key/flow_task/flow_task_name.rs +++ b/src/common/meta/src/key/flow_task/flow_task_name.rs @@ -149,6 +149,13 @@ impl FlowTaskNameManager { .transpose() } + /// Returns true if the `task` exists. + pub async fn exists(&self, catalog: &str, task: &str) -> Result { + let key = FlowTaskNameKey::new(catalog.to_string(), task.to_string()); + let raw_key = key.to_bytes(); + self.kv_backend.exists(&raw_key).await + } + /// Builds a create flow task name transaction. /// It's expected that the `__flow_task/{catalog}/name/{task_name}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index ad09c064d31d..456d1ccffad7 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -22,6 +22,7 @@ const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; const TABLE_LOCK_PREFIX: &str = "__table_lock"; const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; +const FLOW_TASK_NAME_LOCK_PREFIX: &str = "__flow_task_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; /// [CatalogLock] acquires the lock on the tenant level. @@ -110,6 +111,32 @@ impl From for StringKey { } } +/// [FlowTaskNameLock] prevents any procedures trying to create a flow task named it. +pub enum FlowTaskNameLock { + Write(String), +} + +impl FlowTaskNameLock { + pub fn new(catalog: &str, table: &str) -> Self { + Self::Write(format!("{catalog}.{table}")) + } +} + +impl Display for FlowTaskNameLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let FlowTaskNameLock::Write(name) = self; + write!(f, "{}/{}", FLOW_TASK_NAME_LOCK_PREFIX, name) + } +} + +impl From for StringKey { + fn from(value: FlowTaskNameLock) -> Self { + match value { + FlowTaskNameLock::Write(_) => StringKey::Exclusive(value.to_string()), + } + } +} + /// [TableLock] acquires the lock on the table level. /// /// Note: Allows to read/modify the corresponding table's [TableInfoValue](crate::key::table_info::TableInfoValue), diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 4e810195bb2b..0a47b1de1463 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,6 +39,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_flow_task", + "meta procedure create flow task", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_create_tables", "meta procedure create tables", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 3a88ea11bf2e..911cab18df4c 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -51,9 +51,14 @@ pub enum DdlTask { AlterLogicalTables(Vec), CreateDatabase(CreateDatabaseTask), DropDatabase(DropDatabaseTask), + CreateFlow(CreateFlowTask), } impl DdlTask { + pub fn new_create_flow(expr: CreateFlowTask) -> Self { + DdlTask::CreateFlow(expr) + } + pub fn new_create_table( expr: CreateTableExpr, partitions: Vec, @@ -182,7 +187,7 @@ impl TryFrom for DdlTask { Task::DropDatabaseTask(drop_database) => { Ok(DdlTask::DropDatabase(drop_database.try_into()?)) } - Task::CreateFlowTask(_) => unimplemented!(), + Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)), Task::DropFlowTask(_) => unimplemented!(), } } @@ -228,6 +233,7 @@ impl TryFrom for PbDdlTaskRequest { } DdlTask::CreateDatabase(task) => Task::CreateDatabaseTask(task.try_into()?), DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), + DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()), }; Ok(Self { @@ -724,9 +730,10 @@ impl TryFrom for PbDropDatabaseTask { } /// Create flow task +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateFlowTask { pub catalog_name: String, - pub flow_name: String, + pub task_name: String, pub source_table_names: Vec, pub sink_table_name: TableName, pub or_replace: bool, @@ -758,7 +765,7 @@ impl TryFrom for CreateFlowTask { Ok(CreateFlowTask { catalog_name, - flow_name: task_name, + task_name, source_table_names: source_table_names.into_iter().map(Into::into).collect(), sink_table_name: sink_table_name .context(error::InvalidProtoMsgSnafu { @@ -779,7 +786,7 @@ impl From for PbCreateFlowTask { fn from( CreateFlowTask { catalog_name, - flow_name: task_name, + task_name, source_table_names, sink_table_name, or_replace, diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 3d282e8caff3..fe5bf0c439be 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -21,8 +21,10 @@ use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; use crate::ddl::table_meta::TableMetadataAllocator; +use crate::ddl::task_meta::FlowTaskMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; +use crate::key::flow_task::FlowTaskMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; @@ -99,19 +101,29 @@ pub fn new_ddl_context_with_kv_backend( kv_backend: KvBackendRef, ) -> DdlContext { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( + Arc::new( + SequenceBuilder::new("test", kv_backend.clone()) + .initial(1024) + .build(), + ), + Arc::new(WalOptionsAllocator::default()), + )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_task_metadata_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new( + SequenceBuilder::new("flow-test", kv_backend) + .initial(1024) + .build(), + )), + ); DdlContext { node_manager, cache_invalidator: Arc::new(DummyCacheInvalidator), memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), - table_metadata_allocator: Arc::new(TableMetadataAllocator::new( - Arc::new( - SequenceBuilder::new("test", kv_backend) - .initial(1024) - .build(), - ), - Arc::new(WalOptionsAllocator::default()), - )), + table_metadata_allocator, table_metadata_manager, + flow_task_metadata_allocator, + flow_task_metadata_manager, } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index dd0fbb1fde1a..308d08a20ae0 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -59,6 +59,7 @@ use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::{become_follower, become_leader, StateRef}; pub const TABLE_ID_SEQ: &str = "table_id"; +pub const FLOW_TASK_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 223ccf11d147..ab17088745c5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,16 +18,19 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; -use common_catalog::consts::MIN_USER_TABLE_ID; +use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; +use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; +use common_meta::ddl::DdlContext; +use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::flow_task::FlowTaskMetadataManager; +use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::node_manager::NodeManagerRef; -use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; +use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_meta::wal_options_allocator::WalOptionsAllocator; @@ -35,6 +38,7 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; +use super::FLOW_TASK_ID_SEQ; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; @@ -201,6 +205,9 @@ impl MetasrvBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new( leader_cached_kv_backend.clone() as _, )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new( + leader_cached_kv_backend.clone() as _, + )); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), @@ -231,24 +238,55 @@ impl MetasrvBuilder { peer_allocator, )) }); - - let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); - - let ddl_manager = build_ddl_manager( - &options, - node_manager, - &procedure_manager, - &mailbox, - &table_metadata_manager, - &table_metadata_allocator, - &opening_region_keeper, - )?; + // TODO(weny): use the real allocator. + let flow_task_metadata_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(Arc::new( + SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_TASK_ID as u64) + .step(10) + .build(), + )), + ); + let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); + let node_manager = node_manager.unwrap_or_else(|| { + let datanode_client_channel_config = ChannelConfig::new() + .timeout(Duration::from_millis( + options.datanode.client_options.timeout_millis, + )) + .connect_timeout(Duration::from_millis( + options.datanode.client_options.connect_timeout_millis, + )) + .tcp_nodelay(options.datanode.client_options.tcp_nodelay); + Arc::new(DatanodeClients::new(datanode_client_channel_config)) + }); + let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( + mailbox.clone(), + MetasrvInfo { + server_addr: options.server_addr.clone(), + }, + )); + let ddl_manager = Arc::new( + DdlManager::try_new( + DdlContext { + node_manager, + cache_invalidator, + memory_region_keeper: memory_region_keeper.clone(), + table_metadata_manager: table_metadata_manager.clone(), + table_metadata_allocator: table_metadata_allocator.clone(), + flow_task_metadata_manager: flow_task_metadata_manager.clone(), + flow_task_metadata_allocator: flow_task_metadata_allocator.clone(), + }, + procedure_manager.clone(), + true, + ) + .context(error::InitDdlManagerSnafu)?, + ); let region_migration_manager = Arc::new(RegionMigrationManager::new( procedure_manager.clone(), DefaultContextFactory::new( table_metadata_manager.clone(), - opening_region_keeper.clone(), + memory_region_keeper.clone(), mailbox.clone(), options.server_addr.clone(), ), @@ -289,7 +327,7 @@ impl MetasrvBuilder { let region_lease_handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), - opening_region_keeper.clone(), + memory_region_keeper.clone(), ); let group = HeartbeatHandlerGroup::new(pushers); @@ -344,7 +382,7 @@ impl MetasrvBuilder { ) .await, plugins: plugins.unwrap_or_else(Plugins::default), - memory_region_keeper: opening_region_keeper, + memory_region_keeper, region_migration_manager, }) } @@ -390,47 +428,6 @@ fn build_procedure_manager( Arc::new(LocalManager::new(manager_config, Arc::new(state_store))) } -fn build_ddl_manager( - options: &MetasrvOptions, - datanode_clients: Option, - procedure_manager: &ProcedureManagerRef, - mailbox: &MailboxRef, - table_metadata_manager: &TableMetadataManagerRef, - table_metadata_allocator: &TableMetadataAllocatorRef, - memory_region_keeper: &MemoryRegionKeeperRef, -) -> Result { - let datanode_clients = datanode_clients.unwrap_or_else(|| { - let datanode_client_channel_config = ChannelConfig::new() - .timeout(Duration::from_millis( - options.datanode.client_options.timeout_millis, - )) - .connect_timeout(Duration::from_millis( - options.datanode.client_options.connect_timeout_millis, - )) - .tcp_nodelay(options.datanode.client_options.tcp_nodelay); - Arc::new(DatanodeClients::new(datanode_client_channel_config)) - }); - let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( - mailbox.clone(), - MetasrvInfo { - server_addr: options.server_addr.clone(), - }, - )); - - Ok(Arc::new( - DdlManager::try_new( - procedure_manager.clone(), - datanode_clients, - cache_invalidator, - table_metadata_manager.clone(), - table_metadata_allocator.clone(), - memory_region_keeper.clone(), - true, - ) - .context(error::InitDdlManagerSnafu)?, - )) -} - impl Default for MetasrvBuilder { fn default() -> Self { Self::new() diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index f614f33b00d8..55a9119db3ed 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -106,7 +106,9 @@ pub mod test_data { use chrono::DateTime; use common_catalog::consts::MITO2_ENGINE; use common_meta::ddl::table_meta::TableMetadataAllocator; + use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; use common_meta::ddl::DdlContext; + use common_meta::key::flow_task::FlowTaskMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::node_manager::NodeManagerRef; @@ -194,8 +196,16 @@ pub mod test_data { let mailbox_sequence = SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( + Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()), + Arc::new(WalOptionsAllocator::default()), + )); + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); + let flow_task_metadata_allocator = + Arc::new(FlowTaskMetadataAllocator::with_noop_peer_allocator( + Arc::new(SequenceBuilder::new("test", kv_backend).build()), + )); DdlContext { node_manager, cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( @@ -204,12 +214,11 @@ pub mod test_data { server_addr: "127.0.0.1:4321".to_string(), }, )), - table_metadata_manager: table_metadata_manager.clone(), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), - table_metadata_allocator: Arc::new(TableMetadataAllocator::new( - Arc::new(SequenceBuilder::new("test", kv_backend).build()), - Arc::new(WalOptionsAllocator::default()), - )), } } } diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 4547770ef433..4e7aef084fb2 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,11 +18,12 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDataTypeExtension, - CreateFlowTaskExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, - TableName, + CreateTableExpr, DropColumn, DropColumns, RenameTable, SemanticType, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; +use common_meta::rpc::ddl::CreateFlowTask; +use common_meta::table_name::TableName; use common_time::Timezone; use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; @@ -494,7 +495,7 @@ pub(crate) fn to_alter_expr( pub fn to_create_flow_task_expr( create_flow: CreateFlow, query_ctx: QueryContextRef, -) -> Result { +) -> Result { // retrieve sink table name let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true) @@ -538,22 +539,20 @@ pub fn to_create_flow_task_expr( }) .collect::>>()?; - Ok(CreateFlowTaskExpr { + Ok(CreateFlowTask { catalog_name: query_ctx.current_catalog().to_string(), task_name: create_flow.flow_name.to_string(), source_table_names, - sink_table_name: Some(sink_table_name), - create_if_not_exists: create_flow.if_not_exists, + sink_table_name, or_replace: create_flow.or_replace, - // TODO(ruihang): change this field to optional in proto + create_if_not_exists: create_flow.if_not_exists, expire_when: create_flow .expire_when .map(|e| e.to_string()) .unwrap_or_default(), - // TODO(ruihang): change this field to optional in proto comment: create_flow.comment.unwrap_or_default(), sql: create_flow.query.to_string(), - task_options: HashMap::new(), + options: HashMap::new(), }) } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index b5966351896c..cf908611f52a 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -27,7 +27,7 @@ use common_meta::ddl::ExecutorContext; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::NAME_PATTERN; -use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_meta::rpc::ddl::{CreateFlowTask, DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; @@ -323,14 +323,27 @@ impl StatementExecutor { } #[tracing::instrument(skip_all)] - pub async fn create_flow(&self, stmt: CreateFlow, query_ctx: QueryContextRef) -> Result<()> { + pub async fn create_flow( + &self, + stmt: CreateFlow, + query_ctx: QueryContextRef, + ) -> Result { // TODO(ruihang): do some verification + let expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; - let _expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; + self.create_flow_procedure(expr).await?; + Ok(Output::new_with_affected_rows(0)) + } - // TODO: invoke procedure + async fn create_flow_procedure(&self, expr: CreateFlowTask) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_flow(expr), + }; - Ok(()) + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) } #[tracing::instrument(skip_all)] diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 79fbef604e24..0931e87d05d4 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -17,11 +17,14 @@ use std::sync::Arc; use catalog::kvbackend::KvBackendCatalogManager; use cmd::options::MixOptions; use common_base::Plugins; -use common_catalog::consts::MIN_USER_TABLE_ID; +use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::ddl::table_meta::TableMetadataAllocator; +use common_meta::ddl::task_meta::FlowTaskMetadataAllocator; +use common_meta::ddl::DdlContext; use common_meta::ddl_manager::DdlManager; +use common_meta::key::flow_task::FlowTaskMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; @@ -35,6 +38,7 @@ use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; +use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ}; use servers::Mode; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -125,7 +129,7 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); - + let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone())); let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); let catalog_manager = KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; @@ -133,28 +137,41 @@ impl GreptimeDbStandaloneBuilder { let node_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( - SequenceBuilder::new("table_id", kv_backend.clone()) + SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_TABLE_ID as u64) .step(10) .build(), ); + let flow_task_id_sequence = Arc::new( + SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_FLOW_TASK_ID as u64) + .step(10) + .build(), + ); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( mix_options.wal_meta.clone(), kv_backend.clone(), )); - let table_meta_allocator = Arc::new(TableMetadataAllocator::new( + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), )); + let flow_task_metadata_allocator = Arc::new( + FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence), + ); let ddl_task_executor = Arc::new( DdlManager::try_new( + DdlContext { + node_manager: node_manager.clone(), + cache_invalidator: multi_cache_invalidator, + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + table_metadata_manager, + table_metadata_allocator, + flow_task_metadata_manager, + flow_task_metadata_allocator, + }, procedure_manager.clone(), - node_manager.clone(), - multi_cache_invalidator, - table_metadata_manager, - table_meta_allocator, - Arc::new(MemoryRegionKeeper::default()), register_procedure_loaders, ) .unwrap(),