From a0acc6bd1eddacffba39c4bbe8c9495f8322e1e0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 29 Jul 2024 15:20:30 +0800 Subject: [PATCH] fix ci --- src/meta/src/barrier/mod.rs | 16 +++++++------- .../src/hummock/event_handler/uploader/mod.rs | 21 ++++++++++++++++++- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index ed1a936da784e..f58cb3d395033 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -390,16 +390,14 @@ impl CheckpointControl { } } - /// Return whether the collect failure on `worker_id` should trigger a recovery - fn on_collect_failure(&self, worker_id: WorkerId, e: &MetaError) -> bool { + /// Return the earliest command waiting on the `worker_id`. + fn command_wait_collect_from_worker(&self, worker_id: WorkerId) -> Option<&CommandContext> { for epoch_node in self.command_ctx_queue.values() { if epoch_node.state.node_to_collect.contains(&worker_id) { - self.context - .report_collect_failure(&epoch_node.command_ctx, e); - return true; + return Some(&epoch_node.command_ctx); } } - false + None } } @@ -701,10 +699,14 @@ impl GlobalBarrierManager { } Err(e) => { - if self.checkpoint_control.on_collect_failure(worker_id, &e) + let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id); + if failed_command.is_some() || self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) { let errors = self.control_stream_manager.collect_errors(worker_id, e).await; let err = merge_node_rpc_errors("get error from control stream", errors); + if let Some(failed_command) = failed_command { + self.context.report_collect_failure(failed_command, &err); + } self.failure_recovery(err).await; } else { warn!(e = ?e.as_report(), worker_id, "no barrier to collect from worker, ignore err"); diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 884b29de5edd3..8210a998974c4 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -850,7 +850,26 @@ impl UnsyncData { // called `start_epoch` because we have stopped writing on it. if !table_data.unsync_epochs.contains_key(&next_epoch) { if let Some(stopped_next_epoch) = table_data.stopped_next_epoch { - assert_eq!(stopped_next_epoch, next_epoch); + if stopped_next_epoch != next_epoch { + let table_id = table_data.table_id.table_id; + let unsync_epochs = table_data.unsync_epochs.keys().collect_vec(); + if cfg!(debug_assertions) { + panic!( + "table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}", + table_id, next_epoch, stopped_next_epoch, unsync_epochs, table_data.syncing_epochs, table_data.max_synced_epoch + ); + } else { + warn!( + table_id, + stopped_next_epoch, + next_epoch, + ?unsync_epochs, + syncing_epochs = ?table_data.syncing_epochs, + max_synced_epoch = ?table_data.max_synced_epoch, + "different stop epoch" + ); + } + } } else { if let Some(max_epoch) = table_data.max_epoch() { assert_gt!(next_epoch, max_epoch);