Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 29, 2024
1 parent 1395241 commit a0acc6b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
16 changes: 9 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,14 @@ impl CheckpointControl {
}
}

/// Return whether the collect failure on `worker_id` should trigger a recovery
fn on_collect_failure(&self, worker_id: WorkerId, e: &MetaError) -> bool {
/// Return the earliest command waiting on the `worker_id`.
fn command_wait_collect_from_worker(&self, worker_id: WorkerId) -> Option<&CommandContext> {
for epoch_node in self.command_ctx_queue.values() {
if epoch_node.state.node_to_collect.contains(&worker_id) {
self.context
.report_collect_failure(&epoch_node.command_ctx, e);
return true;
return Some(&epoch_node.command_ctx);
}
}
false
None
}
}

Expand Down Expand Up @@ -701,10 +699,14 @@ impl GlobalBarrierManager {

}
Err(e) => {
if self.checkpoint_control.on_collect_failure(worker_id, &e)
let failed_command = self.checkpoint_control.command_wait_collect_from_worker(worker_id);
if failed_command.is_some()
|| self.state.inflight_actor_infos.actor_map.contains_key(&worker_id) {
let errors = self.control_stream_manager.collect_errors(worker_id, e).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
if let Some(failed_command) = failed_command {
self.context.report_collect_failure(failed_command, &err);
}
self.failure_recovery(err).await;
} else {
warn!(e = ?e.as_report(), worker_id, "no barrier to collect from worker, ignore err");
Expand Down
21 changes: 20 additions & 1 deletion src/storage/src/hummock/event_handler/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,26 @@ impl UnsyncData {
// called `start_epoch` because we have stopped writing on it.
if !table_data.unsync_epochs.contains_key(&next_epoch) {
if let Some(stopped_next_epoch) = table_data.stopped_next_epoch {
assert_eq!(stopped_next_epoch, next_epoch);
if stopped_next_epoch != next_epoch {
let table_id = table_data.table_id.table_id;
let unsync_epochs = table_data.unsync_epochs.keys().collect_vec();
if cfg!(debug_assertions) {
panic!(
"table_id {} stop epoch {} different to prev stop epoch {}. unsync epochs: {:?}, syncing epochs {:?}, max_synced_epoch {:?}",
table_id, next_epoch, stopped_next_epoch, unsync_epochs, table_data.syncing_epochs, table_data.max_synced_epoch
);
} else {
warn!(
table_id,
stopped_next_epoch,
next_epoch,
?unsync_epochs,
syncing_epochs = ?table_data.syncing_epochs,
max_synced_epoch = ?table_data.max_synced_epoch,
"different stop epoch"
);
}
}
} else {
if let Some(max_epoch) = table_data.max_epoch() {
assert_gt!(next_epoch, max_epoch);
Expand Down

0 comments on commit a0acc6b

Please sign in to comment.