Skip to content

Commit

Permalink
feat: support invalidate schema name key cache (#3725)
Browse files Browse the repository at this point in the history
* feat: support invalidate schema name key cache

* fix: remove pub for invalidate_schema_cache

* refactor: add DropMetadataBroadcast State Op

* fix: delete files
  • Loading branch information
poltao authored Apr 18, 2024
1 parent 6494553 commit 4248dfc
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 25 deletions.
5 changes: 5 additions & 0 deletions src/common/meta/src/cache_invalidator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::RwLock;

use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
Expand Down Expand Up @@ -107,6 +108,10 @@ where
let key: TableNameKey = (&table_name).into();
self.invalidate_key(&key.as_raw_key()).await
}
CacheIdent::SchemaName(schema_name) => {
let key: SchemaNameKey = (&schema_name).into();
self.invalidate_key(&key.as_raw_key()).await;
}
}
}
Ok(())
Expand Down
74 changes: 67 additions & 7 deletions src/common/meta/src/ddl/drop_database/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use common_procedure::Status;
use serde::{Deserialize, Serialize};

use super::end::DropDatabaseEnd;
use crate::cache_invalidator::Context;
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::schema_name::SchemaNameKey;
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaName, SchemaNameKey};

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropDatabaseRemoveMetadata;
Expand All @@ -40,7 +42,53 @@ impl State for DropDatabaseRemoveMetadata {
.delete(SchemaNameKey::new(&ctx.catalog, &ctx.schema))
.await?;

return Ok((Box::new(DropDatabaseEnd), Status::done()));
return Ok((Box::new(DropMetadataBroadcast), Status::executing(true)));
}

fn as_any(&self) -> &dyn Any {
self
}
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropMetadataBroadcast;

impl DropMetadataBroadcast {
/// Invalidates frontend caches
async fn invalidate_schema_cache(
&self,
ddl_ctx: &DdlContext,
db_ctx: &mut DropDatabaseContext,
) -> Result<()> {
let cache_invalidator = &ddl_ctx.cache_invalidator;
let ctx = Context {
subject: Some("Invalidate schema cache by dropping database".to_string()),
};

cache_invalidator
.invalidate(
&ctx,
vec![CacheIdent::SchemaName(SchemaName {
catalog_name: db_ctx.catalog.clone(),
schema_name: db_ctx.schema.clone(),
})],
)
.await?;

Ok(())
}
}

#[async_trait::async_trait]
#[typetag::serde]
impl State for DropMetadataBroadcast {
async fn next(
&mut self,
ddl_ctx: &DdlContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
self.invalidate_schema_cache(ddl_ctx, ctx).await?;
Ok((Box::new(DropDatabaseEnd), Status::done()))
}

fn as_any(&self) -> &dyn Any {
Expand All @@ -53,7 +101,7 @@ mod tests {
use std::sync::Arc;

use crate::ddl::drop_database::end::DropDatabaseEnd;
use crate::ddl::drop_database::metadata::DropDatabaseRemoveMetadata;
use crate::ddl::drop_database::metadata::{DropDatabaseRemoveMetadata, DropMetadataBroadcast};
use crate::ddl::drop_database::{DropDatabaseContext, State};
use crate::key::schema_name::SchemaNameKey;
use crate::test_util::{new_ddl_context, MockDatanodeManager};
Expand All @@ -76,14 +124,23 @@ mod tests {
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
assert!(status.is_done());
state
.as_any()
.downcast_ref::<DropMetadataBroadcast>()
.unwrap();
assert!(!status.is_done());
assert!(!ddl_context
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey::new("foo", "bar"))
.await
.unwrap());

let mut state = DropMetadataBroadcast;
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
assert!(status.is_done());

// Schema not exists
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
Expand All @@ -93,7 +150,10 @@ mod tests {
tables: None,
};
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
state.as_any().downcast_ref::<DropDatabaseEnd>().unwrap();
assert!(status.is_done());
state
.as_any()
.downcast_ref::<DropMetadataBroadcast>()
.unwrap();
assert!(!status.is_done());
}
}
2 changes: 2 additions & 0 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use store_api::storage::{RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;

use crate::key::schema_name::SchemaName;
use crate::table_name::TableName;
use crate::{ClusterId, DatanodeId};

Expand Down Expand Up @@ -156,6 +157,7 @@ pub struct UpgradeRegion {
pub enum CacheIdent {
TableId(TableId),
TableName(TableName),
SchemaName(SchemaName),
}

#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
Expand Down
15 changes: 15 additions & 0 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,21 @@ impl SchemaManager {
}
}

#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct SchemaName {
pub catalog_name: String,
pub schema_name: String,
}

impl<'a> From<&'a SchemaName> for SchemaNameKey<'a> {
fn from(value: &'a SchemaName) -> Self {
Self {
catalog: &value.catalog_name,
schema: &value.schema_name,
}
}
}

#[cfg(test)]
mod tests {

Expand Down
80 changes: 62 additions & 18 deletions src/frontend/src/heartbeat/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::TableMetaKey;
use partition::manager::TableRouteCacheInvalidator;
Expand Down Expand Up @@ -53,6 +54,27 @@ impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator {
}
}

pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
MessageMeta {
id,
subject: subject.to_string(),
to: to.to_string(),
from: from.to_string(),
}
}

async fn handle_instruction(
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
mailbox: Arc<HeartbeatMailbox>,
instruction: Instruction,
) {
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
executor.handle(ctx).await.unwrap();
}

#[tokio::test]
async fn test_invalidate_table_cache_handler() {
let table_id = 1;
Expand Down Expand Up @@ -92,23 +114,45 @@ async fn test_invalidate_table_cache_handler() {
.await;
}

pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
MessageMeta {
id,
subject: subject.to_string(),
to: to.to_string(),
from: from.to_string(),
}
}
#[tokio::test]
async fn test_invalidate_schema_key_handler() {
let (catalog, schema) = ("foo", "bar");
let schema_key = SchemaNameKey { catalog, schema };
let inner = HashMap::from([(schema_key.as_raw_key(), 1)]);
let backend = Arc::new(MockKvCacheInvalidator {
inner: Mutex::new(inner),
});

async fn handle_instruction(
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
mailbox: Arc<HeartbeatMailbox>,
instruction: Instruction,
) {
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
executor.handle(ctx).await.unwrap();
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
)]));

let (tx, _) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));

// removes a valid key
let valid_key = SchemaName {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
};
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(valid_key.clone())]),
)
.await;

assert!(!backend
.inner
.lock()
.unwrap()
.contains_key(&schema_key.as_raw_key()));

// removes a invalid key
handle_instruction(
executor,
mailbox,
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(valid_key)]),
)
.await;
}

0 comments on commit 4248dfc

Please sign in to comment.