Skip to content

Commit

Permalink
meta notifies frontend after recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Apr 9, 2024
1 parent 21bea68 commit c49bc37
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 2 deletions.
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ message RelationGroup {
repeated Relation relations = 1;
}

message Recovery {}

message SubscribeResponse {
enum Operation {
UNSPECIFIED = 0;
Expand Down Expand Up @@ -443,6 +445,7 @@ message SubscribeResponse {
catalog.Connection connection = 22;
FragmentParallelUnitMappings serving_parallel_unit_mappings = 23;
hummock.HummockVersionStats hummock_stats = 24;
Recovery recovery = 25;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ where
Info::ServingParallelUnitMappings(_) => true,
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Info::Recovery(_) => true,
});

self.observer_states
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentParallelUnitMapping, MetaSnapshot, SubscribeResponse};
use risingwave_rpc_client::ComputeClientPoolRef;
use tokio::sync::watch::Sender;

use crate::catalog::root_catalog::Catalog;
Expand All @@ -43,6 +44,7 @@ pub struct FrontendObserverNode {
user_info_updated_tx: Sender<UserInfoVersion>,
hummock_snapshot_manager: HummockSnapshotManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
compute_client_pool: ComputeClientPoolRef,
}

impl ObserverState for FrontendObserverNode {
Expand Down Expand Up @@ -96,6 +98,9 @@ impl ObserverState for FrontendObserverNode {
Info::ServingParallelUnitMappings(m) => {
self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation());
}
Info::Recovery(_) => {
self.compute_client_pool.invalidate_all();
}
}
}

Expand Down Expand Up @@ -192,6 +197,7 @@ impl FrontendObserverNode {
user_info_updated_tx: Sender<UserInfoVersion>,
hummock_snapshot_manager: HummockSnapshotManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
compute_client_pool: ComputeClientPoolRef,
) -> Self {
Self {
worker_node_manager,
Expand All @@ -201,6 +207,7 @@ impl FrontendObserverNode {
user_info_updated_tx,
hummock_snapshot_manager,
system_params_manager,
compute_client_pool,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl FrontendEnv {
Arc::new(ComputeClientPool::new(config.server.connection_pool_size));
let query_manager = QueryManager::new(
worker_node_manager.clone(),
compute_client_pool,
compute_client_pool.clone(),
catalog_reader.clone(),
Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
batch_config.distributed_query_limit,
Expand All @@ -311,6 +311,7 @@ impl FrontendEnv {
user_info_updated_tx,
hummock_snapshot_manager.clone(),
system_params_manager.clone(),
compute_client_pool,
);
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use risingwave_common::catalog::TableId;
use risingwave_common::config::DefaultParallelism;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::meta::{PausedReason, Recovery};
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
Expand Down Expand Up @@ -543,6 +544,10 @@ impl GlobalBarrierManager {
paused = ?self.state.paused_reason(),
"recovery success"
);

self.env
.notification_manager()
.notify_frontend_without_version(Operation::Update, Info::Recovery(Recovery {}));
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ where
.unwrap()
.clone())
}

pub fn invalidate_all(&self) {
self.clients.invalidate_all()
}
}

/// `ExtraInfoSource` is used by heartbeat worker to pull extra info that needs to be piggybacked.
Expand Down

0 comments on commit c49bc37

Please sign in to comment.