From c49bc37c1b59a0e6ed7aa860116547d302282256 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 9 Apr 2024 15:18:12 +0800 Subject: [PATCH] meta notifies frontend after recovery --- proto/meta.proto | 3 +++ src/common/common_service/src/observer_manager.rs | 1 + src/frontend/src/observer/observer_manager.rs | 7 +++++++ src/frontend/src/session.rs | 3 ++- src/meta/src/barrier/recovery.rs | 7 ++++++- src/rpc_client/src/lib.rs | 4 ++++ 6 files changed, 23 insertions(+), 2 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index f377486368f2f..6a1460b4453a7 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -414,6 +414,8 @@ message RelationGroup { repeated Relation relations = 1; } +message Recovery {} + message SubscribeResponse { enum Operation { UNSPECIFIED = 0; @@ -443,6 +445,7 @@ message SubscribeResponse { catalog.Connection connection = 22; FragmentParallelUnitMappings serving_parallel_unit_mappings = 23; hummock.HummockVersionStats hummock_stats = 24; + Recovery recovery = 25; } } diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 92c10ebd2900d..53225ab3515e4 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -160,6 +160,7 @@ where Info::ServingParallelUnitMappings(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), Info::HummockStats(_) => true, + Info::Recovery(_) => true, }); self.observer_states diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index c66c602394e69..71b52573b5301 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -27,6 +27,7 @@ use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{FragmentParallelUnitMapping, MetaSnapshot, SubscribeResponse}; +use risingwave_rpc_client::ComputeClientPoolRef; use tokio::sync::watch::Sender; use crate::catalog::root_catalog::Catalog; @@ -43,6 +44,7 @@ pub struct FrontendObserverNode { user_info_updated_tx: Sender, hummock_snapshot_manager: HummockSnapshotManagerRef, system_params_manager: LocalSystemParamsManagerRef, + compute_client_pool: ComputeClientPoolRef, } impl ObserverState for FrontendObserverNode { @@ -96,6 +98,9 @@ impl ObserverState for FrontendObserverNode { Info::ServingParallelUnitMappings(m) => { self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()); } + Info::Recovery(_) => { + self.compute_client_pool.invalidate_all(); + } } } @@ -192,6 +197,7 @@ impl FrontendObserverNode { user_info_updated_tx: Sender, hummock_snapshot_manager: HummockSnapshotManagerRef, system_params_manager: LocalSystemParamsManagerRef, + compute_client_pool: ComputeClientPoolRef, ) -> Self { Self { worker_node_manager, @@ -201,6 +207,7 @@ impl FrontendObserverNode { user_info_updated_tx, hummock_snapshot_manager, system_params_manager, + compute_client_pool, } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index e4354980c55fb..89a2347f9a83c 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -286,7 +286,7 @@ impl FrontendEnv { Arc::new(ComputeClientPool::new(config.server.connection_pool_size)); let query_manager = QueryManager::new( worker_node_manager.clone(), - compute_client_pool, + compute_client_pool.clone(), catalog_reader.clone(), Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()), batch_config.distributed_query_limit, @@ -311,6 +311,7 @@ impl FrontendEnv { user_info_updated_tx, hummock_snapshot_manager.clone(), system_params_manager.clone(), + compute_client_pool, ); let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e862f828e2041..8060821004b87 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -23,8 +23,9 @@ use risingwave_common::catalog::TableId; use risingwave_common::config::DefaultParallelism; use risingwave_meta_model_v2::StreamingParallelism; use risingwave_pb::common::ActorInfo; +use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::State; -use risingwave_pb::meta::PausedReason; +use risingwave_pb::meta::{PausedReason, Recovery}; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; @@ -543,6 +544,10 @@ impl GlobalBarrierManager { paused = ?self.state.paused_reason(), "recovery success" ); + + self.env + .notification_manager() + .notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {})); } } diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index fabd1dabeca01..6058371556b6a 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -129,6 +129,10 @@ where .unwrap() .clone()) } + + pub fn invalidate_all(&self) { + self.clients.invalidate_all() + } } /// `ExtraInfoSource` is used by heartbeat worker to pull extra info that needs to be piggybacked.