Skip to content

Commit

Permalink
fix recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 14, 2024
1 parent 890cc6c commit 1d0da42
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/meta/src/barrier/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;
use std::mem::replace;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use arc_swap::ArcSwap;
Expand Down Expand Up @@ -46,7 +46,7 @@ use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
use crate::barrier::schedule::PeriodicBarriers;
use crate::barrier::{
schedule, BarrierKind, BarrierManagerRequest, BarrierManagerStatus,
BarrierWorkerRuntimeInfoSnapshot, RecoveryReason, TracedEpoch,
BarrierWorkerRuntimeInfoSnapshot, InflightSubscriptionInfo, RecoveryReason, TracedEpoch,
};
use crate::error::MetaErrorInner;
use crate::hummock::HummockManagerRef;
Expand Down Expand Up @@ -558,9 +558,10 @@ impl<C: GlobalBarrierWorkerContext> GlobalBarrierWorker<C> {

let mut control_stream_manager = ControlStreamManager::new(self.env.clone());
let reset_start_time = Instant::now();
let empty_subscriptions = LazyLock::new(InflightSubscriptionInfo::default);
control_stream_manager
.reset(
subscription_infos.iter().map(|(database_id, subscriptions)| (*database_id, subscriptions)),
database_fragment_infos.keys().map(|database_id| (*database_id, subscription_infos.get(database_id).unwrap_or_else(|| &*empty_subscriptions))),
active_streaming_nodes.current(),
&*self.context,
)
Expand Down

0 comments on commit 1d0da42

Please sign in to comment.