From 92a8e863ded618fe1be93f799360015b4f8f28b6 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Wed, 27 Mar 2024 19:39:23 +0800 Subject: [PATCH] chore: do not reply for broadcast msg (#3595) --- src/common/meta/src/instruction.rs | 4 -- .../handler/invalidate_table_cache.rs | 41 ++++++------------- src/frontend/src/heartbeat/handler/tests.rs | 16 +------- 3 files changed, 15 insertions(+), 46 deletions(-) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 4b0055551615..caefe2eb221e 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -203,7 +203,6 @@ pub enum InstructionReply { OpenRegion(SimpleReply), CloseRegion(SimpleReply), UpgradeRegion(UpgradeRegionReply), - InvalidateTableCache(SimpleReply), DowngradeRegion(DowngradeRegionReply), } @@ -213,9 +212,6 @@ impl Display for InstructionReply { Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply), Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply), Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply), - Self::InvalidateTableCache(reply) => { - write!(f, "InstructionReply::Invalidate({})", reply) - } Self::DowngradeRegion(reply) => { write!(f, "InstructionReply::DowngradeRegion({})", reply) } diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 48abd8fadda4..3cda489035b1 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -18,8 +18,8 @@ use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_telemetry::error; +use common_meta::instruction::Instruction; +use common_telemetry::debug; #[derive(Clone)] pub struct InvalidateTableCacheHandler { @@ -36,35 +36,20 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { } async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { - let mailbox = ctx.mailbox.clone(); - let cache_invalidator = self.cache_invalidator.clone(); - - let (meta, invalidator) = match ctx.incoming_message.take() { - Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move { - cache_invalidator - .invalidate(&Context::default(), caches) - .await - }), - _ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"), + let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else { + unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'") }; - let _handle = common_runtime::spawn_bg(async move { - // Local cache invalidation always succeeds. - let _ = invalidator.await; + debug!( + "InvalidateTableCacheHandler: invalidating caches: {:?}", + caches + ); - if let Err(e) = mailbox - .send(( - meta, - InstructionReply::InvalidateTableCache(SimpleReply { - result: true, - error: None, - }), - )) - .await - { - error!(e; "Failed to send reply to mailbox"); - } - }); + // Invalidate local cache always success + let _ = self + .cache_invalidator + .invalidate(&Context::default(), caches) + .await?; Ok(HandleControl::Done) } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index f23558cc7e16..1b6885ddb777 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -22,7 +21,7 @@ use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::TableMetaKey; use partition::manager::TableRouteCacheInvalidator; @@ -67,7 +66,7 @@ async fn test_invalidate_table_cache_handler() { InvalidateTableCacheHandler::new(backend.clone()), )])); - let (tx, mut rx) = mpsc::channel(8); + let (tx, _) = mpsc::channel(8); let mailbox = Arc::new(HeartbeatMailbox::new(tx)); // removes a valid key @@ -78,11 +77,6 @@ async fn test_invalidate_table_cache_handler() { ) .await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. }) - ); assert!(!backend .inner .lock() @@ -96,12 +90,6 @@ async fn test_invalidate_table_cache_handler() { Instruction::InvalidateCaches(vec![CacheIdent::TableId(0)]), ) .await; - - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. }) - ); } pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {