diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 81939ad615b1..c21a9ee49201 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -15,7 +15,10 @@ use std::sync::Arc; use crate::error::Result; +use crate::flow_name::FlowName; use crate::instruction::CacheIdent; +use crate::key::flow::flow_info::FlowInfoKey; +use crate::key::flow::flow_name::FlowNameKey; use crate::key::schema_name::SchemaNameKey; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; @@ -82,9 +85,19 @@ where let key: SchemaNameKey = schema_name.into(); self.invalidate_key(&key.to_bytes()).await; } - &CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { - // TODO(weny): implements it - unimplemented!() + CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { + // Do nothing + } + CacheIdent::FlowName(FlowName { + catalog_name, + flow_name, + }) => { + let key = FlowNameKey::new(catalog_name, flow_name); + self.invalidate_key(&key.to_bytes()).await + } + CacheIdent::FlowId(flow_id) => { + let key = FlowInfoKey::new(*flow_id); + self.invalidate_key(&key.to_bytes()).await; } } } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 4678504aca84..474eec53c3d8 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -34,9 +34,11 @@ use strum::AsRefStr; use table::metadata::TableId; use super::utils::add_peer_context_if_needed; +use crate::cache_invalidator::Context; use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; +use crate::instruction::{CacheIdent, CreateFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::key::table_name::TableNameKey; use crate::key::FlowId; @@ -173,6 +175,28 @@ impl CreateFlowProcedure { .create_flow_metadata(flow_id, (&self.data).into()) .await?; info!("Created flow metadata for flow {flow_id}"); + self.data.state = CreateFlowState::InvalidateFlowCache; + Ok(Status::executing(true)) + } + + async fn on_broadcast(&mut self) -> Result { + // Safety: The flow id must be allocated. + let flow_id = self.data.flow_id.unwrap(); + let ctx = Context { + subject: Some("Invalidate flow cache by creating flow".to_string()), + }; + + self.context + .cache_invalidator + .invalidate( + &ctx, + &[CacheIdent::CreateFlow(CreateFlow { + source_table_ids: self.data.source_table_ids.clone(), + flownode_ids: self.data.peers.iter().map(|peer| peer.id).collect(), + })], + ) + .await?; + Ok(Status::done_with_output(flow_id)) } } @@ -194,6 +218,7 @@ impl Procedure for CreateFlowProcedure { CreateFlowState::Prepare => self.on_prepare().await, CreateFlowState::CreateFlows => self.on_flownode_create_flows().await, CreateFlowState::CreateMetadata => self.on_create_metadata().await, + CreateFlowState::InvalidateFlowCache => self.on_broadcast().await, } .map_err(handle_retry_error) } @@ -227,6 +252,8 @@ pub enum CreateFlowState { Prepare, /// Creates flows on the flownode. CreateFlows, + /// Invalidate flow cache. + InvalidateFlowCache, /// Create metadata. CreateMetadata, } diff --git a/src/common/meta/src/ddl/drop_flow.rs b/src/common/meta/src/ddl/drop_flow.rs index 1a32781a9ff5..db5fa7901cbf 100644 --- a/src/common/meta/src/ddl/drop_flow.rs +++ b/src/common/meta/src/ddl/drop_flow.rs @@ -29,8 +29,11 @@ use snafu::{ensure, ResultExt}; use strum::AsRefStr; use super::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::cache_invalidator::Context; use crate::ddl::DdlContext; use crate::error::{self, Result}; +use crate::flow_name::FlowName; +use crate::instruction::{CacheIdent, DropFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::lock_key::{CatalogLock, FlowLock}; use crate::peer::Peer; @@ -145,7 +148,29 @@ impl DropFlowProcedure { } async fn on_broadcast(&mut self) -> Result { - // TODO(weny): invalidates cache. + let flow_id = self.data.task.flow_id; + let ctx = Context { + subject: Some("Invalidate flow cache by dropping flow".to_string()), + }; + let flow_info_value = self.data.flow_info_value.as_ref().unwrap(); + + self.context + .cache_invalidator + .invalidate( + &ctx, + &[ + CacheIdent::FlowId(flow_id), + CacheIdent::FlowName(FlowName { + catalog_name: flow_info_value.catalog_name.to_string(), + flow_name: flow_info_value.flow_name.to_string(), + }), + CacheIdent::DropFlow(DropFlow { + source_table_ids: flow_info_value.source_table_ids.clone(), + flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(), + }), + ], + ) + .await?; self.data.state = DropFlowState::DropFlows; Ok(Status::executing(true)) } diff --git a/src/common/meta/src/flow_name.rs b/src/common/meta/src/flow_name.rs new file mode 100644 index 000000000000..236e5268476a --- /dev/null +++ b/src/common/meta/src/flow_name.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Display, Formatter}; + +use serde::{Deserialize, Serialize}; + +/// The owned flow name. +#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] +pub struct FlowName { + pub catalog_name: String, + pub flow_name: String, +} + +impl Display for FlowName { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&common_catalog::format_full_flow_name( + &self.catalog_name, + &self.flow_name, + )) + } +} diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 79a641edc2c6..7820985b6571 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -21,7 +21,9 @@ use store_api::storage::{RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; +use crate::flow_name::FlowName; use crate::key::schema_name::SchemaName; +use crate::key::FlowId; use crate::table_name::TableName; use crate::{ClusterId, DatanodeId, FlownodeId}; @@ -155,6 +157,8 @@ pub struct UpgradeRegion { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] /// The identifier of cache. pub enum CacheIdent { + FlowId(FlowId), + FlowName(FlowName), TableId(TableId), TableName(TableName), SchemaName(SchemaName), diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 971e92ad79a9..5398a62a6752 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -26,6 +26,7 @@ pub mod ddl; pub mod ddl_manager; pub mod distributed_time_constants; pub mod error; +pub mod flow_name; pub mod heartbeat; pub mod instruction; pub mod key; diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 81459f288748..50fcedb62bed 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -196,7 +196,7 @@ impl TryFrom for DdlTask { Ok(DdlTask::DropDatabase(drop_database.try_into()?)) } Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)), - Task::DropFlowTask(_) => unimplemented!(), + Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)), } } }