Skip to content

Commit

Permalink
feat: support to invalidate flow cache (GreptimeTeam#3926)
Browse files Browse the repository at this point in the history
* feat: add `FlowName` & `FlowId` to `CacheIdent`

* feat: support to invalidate flow cache

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored May 14, 2024
1 parent 6214180 commit f16ce3c
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 5 deletions.
19 changes: 16 additions & 3 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::sync::Arc;

use crate::error::Result;
use crate::flow_name::FlowName;
use crate::instruction::CacheIdent;
use crate::key::flow::flow_info::FlowInfoKey;
use crate::key::flow::flow_name::FlowNameKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
Expand Down Expand Up @@ -82,9 +85,19 @@ where
let key: SchemaNameKey = schema_name.into();
self.invalidate_key(&key.to_bytes()).await;
}
&CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
// TODO(weny): implements it
unimplemented!()
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
// Do nothing
}
CacheIdent::FlowName(FlowName {
catalog_name,
flow_name,
}) => {
let key = FlowNameKey::new(catalog_name, flow_name);
self.invalidate_key(&key.to_bytes()).await
}
CacheIdent::FlowId(flow_id) => {
let key = FlowInfoKey::new(*flow_id);
self.invalidate_key(&key.to_bytes()).await;
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ use strum::AsRefStr;
use table::metadata::TableId;

use super::utils::add_peer_context_if_needed;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::FlowId;
Expand Down Expand Up @@ -173,6 +175,28 @@ impl CreateFlowProcedure {
.create_flow_metadata(flow_id, (&self.data).into())
.await?;
info!("Created flow metadata for flow {flow_id}");
self.data.state = CreateFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}

async fn on_broadcast(&mut self) -> Result<Status> {
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let ctx = Context {
subject: Some("Invalidate flow cache by creating flow".to_string()),
};

self.context
.cache_invalidator
.invalidate(
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownode_ids: self.data.peers.iter().map(|peer| peer.id).collect(),
})],
)
.await?;

Ok(Status::done_with_output(flow_id))
}
}
Expand All @@ -194,6 +218,7 @@ impl Procedure for CreateFlowProcedure {
CreateFlowState::Prepare => self.on_prepare().await,
CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowState::CreateMetadata => self.on_create_metadata().await,
CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
}
.map_err(handle_retry_error)
}
Expand Down Expand Up @@ -227,6 +252,8 @@ pub enum CreateFlowState {
Prepare,
/// Creates flows on the flownode.
CreateFlows,
/// Invalidate flow cache.
InvalidateFlowCache,
/// Create metadata.
CreateMetadata,
}
Expand Down
27 changes: 26 additions & 1 deletion src/common/meta/src/ddl/drop_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use super::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::flow_name::FlowName;
use crate::instruction::{CacheIdent, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::lock_key::{CatalogLock, FlowLock};
use crate::peer::Peer;
Expand Down Expand Up @@ -145,7 +148,29 @@ impl DropFlowProcedure {
}

async fn on_broadcast(&mut self) -> Result<Status> {
// TODO(weny): invalidates cache.
let flow_id = self.data.task.flow_id;
let ctx = Context {
subject: Some("Invalidate flow cache by dropping flow".to_string()),
};
let flow_info_value = self.data.flow_info_value.as_ref().unwrap();

self.context
.cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::FlowId(flow_id),
CacheIdent::FlowName(FlowName {
catalog_name: flow_info_value.catalog_name.to_string(),
flow_name: flow_info_value.flow_name.to_string(),
}),
CacheIdent::DropFlow(DropFlow {
source_table_ids: flow_info_value.source_table_ids.clone(),
flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
}),
],
)
.await?;
self.data.state = DropFlowState::DropFlows;
Ok(Status::executing(true))
}
Expand Down
33 changes: 33 additions & 0 deletions src/common/meta/src/flow_name.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Display, Formatter};

use serde::{Deserialize, Serialize};

/// The owned flow name.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct FlowName {
pub catalog_name: String,
pub flow_name: String,
}

impl Display for FlowName {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&common_catalog::format_full_flow_name(
&self.catalog_name,
&self.flow_name,
))
}
}
4 changes: 4 additions & 0 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use store_api::storage::{RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;

use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::FlowId;
use crate::table_name::TableName;
use crate::{ClusterId, DatanodeId, FlownodeId};

Expand Down Expand Up @@ -155,6 +157,8 @@ pub struct UpgradeRegion {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
/// The identifier of cache.
pub enum CacheIdent {
FlowId(FlowId),
FlowName(FlowName),
TableId(TableId),
TableName(TableName),
SchemaName(SchemaName),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod ddl;
pub mod ddl_manager;
pub mod distributed_time_constants;
pub mod error;
pub mod flow_name;
pub mod heartbeat;
pub mod instruction;
pub mod key;
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl TryFrom<Task> for DdlTask {
Ok(DdlTask::DropDatabase(drop_database.try_into()?))
}
Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)),
Task::DropFlowTask(_) => unimplemented!(),
Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)),
}
}
}
Expand Down

0 comments on commit f16ce3c

Please sign in to comment.