From 49105a44beb50b9ed331b0e30813f529119daa1f Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Sun, 1 Sep 2024 23:12:28 -0500 Subject: [PATCH] Client action listeners will now timeout actions Listeners of the actions will now flag actions as timedout instad of just if the scheduler detects that the worker went offline. This is to support distributed schedulers. Since any worker or scheduler may now go offline we need to ensure the "owner" of the action is actually the one who cares about it, in this case the client. towards #359 --- nativelink-scheduler/src/simple_scheduler.rs | 14 +- .../src/simple_scheduler_state_manager.rs | 306 +++++++++++++++--- .../tests/simple_scheduler_test.rs | 94 ++++++ 3 files changed, 375 insertions(+), 39 deletions(-) diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index b347e37c1..82b12c34a 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::SystemTime; use async_trait::async_trait; use futures::Future; @@ -21,6 +22,7 @@ use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::action_messages::{ ActionInfo, ActionStage, ActionState, OperationId, WorkerId, }; +use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, @@ -295,6 +297,7 @@ impl SimpleScheduler { tokio::time::sleep(Duration::from_millis(1)) }, task_change_notify, + SystemTime::now, ) } @@ -302,11 +305,14 @@ impl SimpleScheduler { Fut: Future + Send, F: Fn() -> Fut + Send + Sync + 'static, A: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, >( scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, awaited_action_db: A, on_matching_engine_run: F, task_change_notify: Arc, + now_fn: NowFn, ) -> (Arc, Arc) { let platform_property_manager = Arc::new(PlatformPropertyManager::new( scheduler_cfg @@ -326,7 +332,13 @@ impl SimpleScheduler { } let worker_change_notify = Arc::new(Notify::new()); - let state_manager = SimpleSchedulerStateManager::new(max_job_retries, awaited_action_db); + let state_manager = SimpleSchedulerStateManager::new( + max_job_retries, + // TODO(allada) This should probably have its own config. + Duration::from_secs(worker_timeout_s), + awaited_action_db, + now_fn, + ); let worker_scheduler = ApiWorkerScheduler::new( state_manager.clone(), diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index 246b42e9c..edcaeb938 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -13,8 +13,10 @@ // limitations under the License. use std::ops::Bound; -use std::sync::Arc; +use std::sync::{Arc, Weak}; +use std::time::{Duration, SystemTime}; +use async_lock::Mutex; use async_trait::async_trait; use futures::{future, stream, StreamExt, TryStreamExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; @@ -23,6 +25,7 @@ use nativelink_util::action_messages::{ ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata, OperationId, WorkerId, }; +use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, @@ -121,20 +124,48 @@ fn apply_filter_predicate(awaited_action: &AwaitedAction, filter: &OperationFilt true } -struct ClientActionStateResult { - inner: MatchingEngineActionStateResult, +struct ClientActionStateResult +where + U: AwaitedActionSubscriber, + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ + inner: MatchingEngineActionStateResult, } -impl ClientActionStateResult { - fn new(sub: T) -> Self { +impl ClientActionStateResult +where + U: AwaitedActionSubscriber, + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ + fn new( + sub: U, + simple_scheduler_state_manager: Weak>, + no_event_action_timeout: Duration, + now_fn: NowFn, + ) -> Self { Self { - inner: MatchingEngineActionStateResult::new(sub), + inner: MatchingEngineActionStateResult::new( + sub, + simple_scheduler_state_manager, + no_event_action_timeout, + now_fn, + ), } } } #[async_trait] -impl ActionStateResult for ClientActionStateResult { +impl ActionStateResult for ClientActionStateResult +where + U: AwaitedActionSubscriber, + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ async fn as_state(&self) -> Result, Error> { self.inner.as_state().await } @@ -148,26 +179,101 @@ impl ActionStateResult for ClientActionStateResult { - awaited_action_sub: T, +struct MatchingEngineActionStateResult +where + U: AwaitedActionSubscriber, + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ + awaited_action_sub: U, + simple_scheduler_state_manager: Weak>, + no_event_action_timeout: Duration, + now_fn: NowFn, } -impl MatchingEngineActionStateResult { - fn new(awaited_action_sub: T) -> Self { - Self { awaited_action_sub } +impl MatchingEngineActionStateResult +where + U: AwaitedActionSubscriber, + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ + fn new( + awaited_action_sub: U, + simple_scheduler_state_manager: Weak>, + no_event_action_timeout: Duration, + now_fn: NowFn, + ) -> Self { + Self { + awaited_action_sub, + simple_scheduler_state_manager, + no_event_action_timeout, + now_fn, + } } } #[async_trait] -impl ActionStateResult for MatchingEngineActionStateResult { +impl ActionStateResult for MatchingEngineActionStateResult +where + U: AwaitedActionSubscriber, + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ async fn as_state(&self) -> Result, Error> { Ok(self.awaited_action_sub.borrow().state().clone()) } async fn changed(&mut self) -> Result, Error> { - self.awaited_action_sub - .changed() - .await - .map(|v| v.state().clone()) + let mut timeout_attempts = 0; + loop { + tokio::select! { + awaited_action_result = self.awaited_action_sub.changed() => { + return awaited_action_result + .err_tip(|| "In MatchingEngineActionStateResult::changed") + .map(|v| v.state().clone()); + } + _ = (self.now_fn)().sleep(self.no_event_action_timeout) => { + // Timeout happened, do additional checks below. + } + } + + let awaited_action = self.awaited_action_sub.borrow(); + + if matches!(awaited_action.state().stage, ActionStage::Queued) { + // Actions in queued state do not get periodically updated, + // so we don't need to timeout them. + continue; + } + + let simple_scheduler_state_manager = self + .simple_scheduler_state_manager + .upgrade() + .err_tip(|| format!("Failed to upgrade weak reference to SimpleSchedulerStateManager in MatchingEngineActionStateResult::changed at attempt: {timeout_attempts}"))?; + + event!( + Level::WARN, + ?awaited_action, + "OperationId {} timed out after {} seconds issuing a retry", + awaited_action.operation_id(), + self.no_event_action_timeout.as_secs_f32(), + ); + + simple_scheduler_state_manager + .timeout_operation_id(awaited_action.operation_id()) + .await + .err_tip(|| "In MatchingEngineActionStateResult::changed")?; + + if timeout_attempts >= MAX_UPDATE_RETRIES { + return Err(make_err!( + Code::Internal, + "Failed to update action after {} retries with no error set in MatchingEngineActionStateResult::changed", + MAX_UPDATE_RETRIES, + )); + } + timeout_attempts += 1; + } } async fn as_action_info(&self) -> Result, Error> { @@ -180,7 +286,12 @@ impl ActionStateResult for MatchingEngineActionState /// It also includes the workers that are available to execute actions based on allocation /// strategy. #[derive(MetricsComponent)] -pub struct SimpleSchedulerStateManager { +pub struct SimpleSchedulerStateManager +where + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ /// Database for storing the state of all actions. #[metric(group = "action_db")] action_db: T, @@ -190,16 +301,103 @@ pub struct SimpleSchedulerStateManager { // of always having it on every SimpleScheduler. #[metric(help = "Maximum number of times a job can be retried")] max_job_retries: usize, + + /// Duration after which an action is considered to be timed out if + /// no event is received. + #[metric( + help = "Duration after which an action is considered to be timed out if no event is received" + )] + no_event_action_timeout: Duration, + + // A lock to ensure only one timeout operation is running at a time + // on this service. + timeout_operation_mux: Mutex<()>, + + /// Weak reference to self. + weak_self: Weak, + + /// Function to get the current time. + now_fn: NowFn, } -impl SimpleSchedulerStateManager { - pub fn new(max_job_retries: usize, action_db: T) -> Arc { - Arc::new(Self { +impl SimpleSchedulerStateManager +where + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ + pub fn new( + max_job_retries: usize, + no_event_action_timeout: Duration, + action_db: T, + now_fn: NowFn, + ) -> Arc { + Arc::new_cyclic(|weak_self| Self { action_db, max_job_retries, + no_event_action_timeout, + timeout_operation_mux: Mutex::new(()), + weak_self: weak_self.clone(), + now_fn, }) } + /// Let the scheduler know that an operation has timed out from + /// the client side (ie: worker has not updated in a while). + async fn timeout_operation_id(&self, operation_id: &OperationId) -> Result<(), Error> { + // Ensure that only one timeout operation is running at a time. + // Failing to do this could result in the same operation being + // timed out multiple times at the same time. + // Note: We could implement this on a per-operation_id basis, but it is quite + // complex to manage the locks. + let _lock = self.timeout_operation_mux.lock().await; + + let awaited_action_subscriber = self + .action_db + .get_by_operation_id(operation_id) + .await + .err_tip(|| "In SimpleSchedulerStateManager::timeout_operation_id")? + .err_tip(|| { + format!("Operation id {operation_id} does not exist in SimpleSchedulerStateManager::timeout_operation_id") + })?; + + let awaited_action = awaited_action_subscriber.borrow(); + + // If the action is not executing, we should not timeout the action. + if !matches!(awaited_action.state().stage, ActionStage::Executing) { + return Ok(()); + } + + let last_worker_updated = awaited_action + .last_worker_updated_timestamp() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| { + make_err!( + Code::Internal, + "Failed to convert last_worker_updated to duration since epoch {e:?}" + ) + })?; + let worker_should_update_before = last_worker_updated + .checked_add(self.no_event_action_timeout) + .err_tip(|| "Timestamp too big in SimpleSchedulerStateManager::timeout_operation_id")?; + if worker_should_update_before < (self.now_fn)().elapsed() { + // The action was updated recently, we should not timeout the action. + // This is to prevent timing out actions that have recently been updated + // (like multiple clients timeout the same action at the same time). + return Ok(()); + } + + self.assign_operation( + operation_id, + Err(make_err!( + Code::DeadlineExceeded, + "Operation timed out after {} seconds", + self.no_event_action_timeout.as_secs_f32(), + )), + ) + .await + } + async fn inner_update_operation( &self, operation_id: &OperationId, @@ -223,16 +421,6 @@ impl SimpleSchedulerStateManager { let mut awaited_action = awaited_action_subscriber.borrow(); - // Make sure we don't update an action that is already completed. - if awaited_action.state().stage.is_finished() { - return Err(make_err!( - Code::Internal, - "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}", - awaited_action.state().stage, - maybe_worker_id, - )); - } - // Make sure the worker id matches the awaited action worker id. // This might happen if the worker sending the update is not the // worker that was assigned. @@ -260,6 +448,16 @@ impl SimpleSchedulerStateManager { return Err(err); } + // Make sure we don't update an action that is already completed. + if awaited_action.state().stage.is_finished() { + return Err(make_err!( + Code::Internal, + "Action {operation_id:?} is already completed with state {:?} - maybe_worker_id: {:?}", + awaited_action.state().stage, + maybe_worker_id, + )); + } + let stage = match &action_stage_result { Ok(stage) => stage.clone(), Err(err) => { @@ -450,7 +648,12 @@ impl SimpleSchedulerStateManager { } #[async_trait] -impl ClientStateManager for SimpleSchedulerStateManager { +impl ClientStateManager for SimpleSchedulerStateManager +where + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ async fn add_action( &self, client_operation_id: OperationId, @@ -460,15 +663,27 @@ impl ClientStateManager for SimpleSchedulerStateManager { .inner_add_operation(client_operation_id.clone(), action_info.clone()) .await?; - Ok(Box::new(ClientActionStateResult::new(sub))) + Ok(Box::new(ClientActionStateResult::new( + sub, + self.weak_self.clone(), + self.no_event_action_timeout, + self.now_fn.clone(), + ))) } async fn filter_operations<'a>( &'a self, filter: OperationFilter, ) -> Result, Error> { - self.inner_filter_operations(filter, move |rx| Box::new(ClientActionStateResult::new(rx))) - .await + self.inner_filter_operations(filter, move |rx| { + Box::new(ClientActionStateResult::new( + rx, + self.weak_self.clone(), + self.no_event_action_timeout, + self.now_fn.clone(), + )) + }) + .await } fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> { @@ -477,7 +692,12 @@ impl ClientStateManager for SimpleSchedulerStateManager { } #[async_trait] -impl WorkerStateManager for SimpleSchedulerStateManager { +impl WorkerStateManager for SimpleSchedulerStateManager +where + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ async fn update_operation( &self, operation_id: &OperationId, @@ -490,13 +710,23 @@ impl WorkerStateManager for SimpleSchedulerStateManager { } #[async_trait] -impl MatchingEngineStateManager for SimpleSchedulerStateManager { +impl MatchingEngineStateManager for SimpleSchedulerStateManager +where + T: AwaitedActionDb, + I: InstantWrapper, + NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, +{ async fn filter_operations<'a>( &'a self, filter: OperationFilter, ) -> Result, Error> { self.inner_filter_operations(filter, |rx| { - Box::new(MatchingEngineActionStateResult::new(rx)) + Box::new(MatchingEngineActionStateResult::new( + rx, + self.weak_self.clone(), + self.no_event_action_timeout, + self.now_fn.clone(), + )) }) .await } diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index 136223ec2..f7a1d56e0 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::collections::HashMap; +use std::future::Future; use std::ops::Bound; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -166,6 +168,7 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -210,6 +213,78 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { Ok(()) } +#[nativelink_test] +async fn client_does_not_receive_update_timeout() -> Result<(), Error> { + let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + + let task_change_notify = Arc::new(Notify::new()); + let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( + &nativelink_config::schedulers::SimpleScheduler { + worker_timeout_s: WORKER_TIMEOUT_S, + ..Default::default() + }, + memory_awaited_action_db_factory( + 0, + task_change_notify.clone(), + MockInstantWrapped::default, + ), + || async move {}, + task_change_notify.clone(), + MockInstantWrapped::default, + ); + let action_digest = DigestInfo::new([99u8; 32], 512); + + let _rx_from_worker = + setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + let mut action_listener = setup_action( + &scheduler, + action_digest, + HashMap::new(), + make_system_time(1), + ) + .await + .unwrap(); + + // Trigger a do_try_match to ensure we get a state change. + scheduler.do_try_match_for_test().await.unwrap(); + assert_eq!( + action_listener.changed().await.unwrap().stage, + ActionStage::Executing + ); + + async fn advance_time(duration: Duration, poll_fut: &mut Pin<&mut impl Future>) { + const STEP_AMOUNT: Duration = Duration::from_millis(100); + for _ in 0..(duration.as_millis() / STEP_AMOUNT.as_millis()) { + MockClock::advance(STEP_AMOUNT); + tokio::task::yield_now().await; + assert!(poll!(&mut *poll_fut).is_pending()); + } + } + + let changed_fut = action_listener.changed(); + tokio::pin!(changed_fut); + + { + // No update should have been received yet. + assert_eq!(poll!(&mut changed_fut).is_ready(), false); + } + // Advance our time by just under the timeout. + advance_time(Duration::from_secs(WORKER_TIMEOUT_S - 1), &mut changed_fut).await; + { + // Sill no update should have been received yet. + assert_eq!(poll!(&mut changed_fut).is_ready(), false); + } + // Advance it by just over the timeout. + MockClock::advance(Duration::from_secs(2)); + { + // Now we should have received a timeout and the action should have been + // put back in the queue. + assert_eq!(changed_fut.await.unwrap().stage, ActionStage::Queued); + } + + Ok(()) +} + #[nativelink_test] async fn find_executing_action() -> Result<(), Error> { let worker_id: WorkerId = WorkerId(Uuid::new_v4()); @@ -224,6 +299,7 @@ async fn find_executing_action() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -303,6 +379,7 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest1 = DigestInfo::new([99u8; 32], 512); let action_digest2 = DigestInfo::new([88u8; 32], 512); @@ -477,6 +554,7 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -563,6 +641,7 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); let mut platform_properties = HashMap::new(); @@ -653,6 +732,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -750,6 +830,7 @@ async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let worker_id: WorkerId = WorkerId(Uuid::new_v4()); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -904,6 +985,7 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { awaited_action, || async move {}, task_change_notify, + MockInstantWrapped::default, ); // Initial worker calls do_try_match, so send it no items. senders.tx_get_range_of_actions.send(vec![]).unwrap(); @@ -948,6 +1030,7 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { awaited_action, || async move {}, task_change_notify, + MockInstantWrapped::default, ); // senders.tx_get_awaited_action_by_id.send(Ok(None)).unwrap(); senders.tx_get_range_of_actions.send(vec![]).unwrap(); @@ -1005,6 +1088,7 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1126,6 +1210,7 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1224,6 +1309,7 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1339,6 +1425,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1434,6 +1521,7 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1574,6 +1662,7 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1731,6 +1820,7 @@ async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest1 = DigestInfo::new([11u8; 32], 512); let action_digest2 = DigestInfo::new([99u8; 32], 512); @@ -1797,6 +1887,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -1945,6 +2036,7 @@ async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { async move {} }, task_change_notify, + MockInstantWrapped::default, ); assert_eq!(dropped.load(Ordering::Relaxed), false); @@ -1973,6 +2065,7 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512); @@ -2047,6 +2140,7 @@ async fn client_reconnect_keeps_action_alive() -> Result<(), Error> { ), || async move {}, task_change_notify, + MockInstantWrapped::default, ); let action_digest = DigestInfo::new([99u8; 32], 512);