Skip to content

Commit

Permalink
fix barrier
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jan 19, 2024
1 parent f643050 commit b7da2f8
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 36 deletions.
3 changes: 2 additions & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_pb::stream_plan::{
UpdateMutation,
};
use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest};
use thiserror_ext::AsReport;
use uuid::Uuid;

use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
Expand Down Expand Up @@ -857,7 +858,7 @@ impl CommandContext {
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
reason=?e,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
Expand Down
9 changes: 5 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -668,7 +669,7 @@ impl GlobalBarrierManager {
// back to frontend
fail_point!("inject_barrier_err_success");
let fail_node = self.checkpoint_control.barrier_failed();
tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err);
tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch");
self.failure_recovery(err, fail_node).await;
return;
}
Expand All @@ -693,7 +694,7 @@ impl GlobalBarrierManager {
.drain(index..)
.chain(self.checkpoint_control.barrier_failed().into_iter())
.collect_vec();
tracing::warn!("Failed to commit epoch {}: {:?}", prev_epoch, err);
tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch");
self.failure_recovery(err, fail_nodes).await;
}
}
Expand Down Expand Up @@ -728,7 +729,7 @@ impl GlobalBarrierManager {
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let span = tracing::info_span!(
"failure_recovery",
%err,
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

Expand All @@ -741,7 +742,7 @@ impl GlobalBarrierManager {
.await;
self.context.set_status(BarrierManagerStatus::Running).await;
} else {
panic!("failed to execute barrier: {:?}", err);
panic!("failed to execute barrier: {}", err.as_report());
}
}

Expand Down
41 changes: 20 additions & 21 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
Expand All @@ -31,6 +31,7 @@ use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest,
};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, warn, Instrument};
Expand Down Expand Up @@ -197,9 +198,7 @@ impl GlobalBarrierManagerContext {
tokio::spawn(async move {
let res: MetaResult<()> = try {
tracing::debug!("recovering stream job {}", table.id);
finished
.await
.map_err(|e| anyhow!("failed to finish command: {}", e))?;
finished.await.context("failed to finish command")?;

tracing::debug!("finished stream job {}", table.id);
// Once notified that job is finished we need to notify frontend.
Expand All @@ -212,8 +211,9 @@ impl GlobalBarrierManagerContext {
};
if let Err(e) = res.as_ref() {
tracing::error!(
"stream job {} interrupted, will retry after recovery: {e:?}",
table.id
id = table.id,
error = %e.as_report(),
"stream job interrupted, will retry after recovery",
);
// NOTE(kwannoel): We should not cleanup stream jobs,
// we don't know if it's just due to CN killed,
Expand Down Expand Up @@ -283,16 +283,15 @@ impl GlobalBarrierManagerContext {
tokio::spawn(async move {
let res: MetaResult<()> = try {
tracing::debug!("recovering stream job {}", id);
finished
.await
.map_err(|e| anyhow!("failed to finish command: {}", e))?;
tracing::debug!("finished stream job {}", id);
finished.await.ok().context("failed to finish command")?;
tracing::debug!(id, "finished stream job");
catalog_controller.finish_streaming_job(id).await?;
};
if let Err(e) = &res {
tracing::error!(
"stream job {} interrupted, will retry after recovery: {e:?}",
id
id,
error = %e.as_report(),
"stream job interrupted, will retry after recovery",
);
// NOTE(kwannoel): We should not cleanup stream jobs,
// we don't know if it's just due to CN killed,
Expand Down Expand Up @@ -354,7 +353,7 @@ impl GlobalBarrierManagerContext {
let mut info = if self.env.opts.enable_scale_in_when_recovery {
let info = self.resolve_actor_info().await;
let scaled = self.scale_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "scale actors failed");
warn!(error = %err.as_report(), "scale actors failed");
})?;
if scaled {
self.resolve_actor_info().await
Expand All @@ -364,13 +363,13 @@ impl GlobalBarrierManagerContext {
} else {
// Migrate actors in expired CN to newly joined one.
self.migrate_actors().await.inspect_err(|err| {
warn!(err = ?err, "migrate actors failed");
warn!(error = %err.as_report(), "migrate actors failed");
})?
};

// Reset all compute nodes, stop and drop existing actors.
self.reset_compute_nodes(&info).await.inspect_err(|err| {
warn!(err = ?err, "reset compute nodes failed");
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

if scheduled_barriers.pre_apply_drop_scheduled().await {
Expand All @@ -379,10 +378,10 @@ impl GlobalBarrierManagerContext {

// update and build all actors.
self.update_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "update actors failed");
warn!(error = %err.as_report(), "update actors failed");
})?;
self.build_actors(&info).await.inspect_err(|err| {
warn!(err = ?err, "build_actors failed");
warn!(error = %err.as_report(), "build_actors failed");
})?;

// get split assignments for all actors
Expand Down Expand Up @@ -424,14 +423,14 @@ impl GlobalBarrierManagerContext {
let res = match await_barrier_complete.await.result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
warn!(err = ?err, "post_collect failed");
warn!(error = %err.as_report(), "post_collect failed");
Err(err)
} else {
Ok((new_epoch.clone(), response))
}
}
Err(err) => {
warn!(err = ?err, "inject_barrier failed");
warn!(error = %err.as_report(), "inject_barrier failed");
Err(err)
}
};
Expand Down Expand Up @@ -674,8 +673,8 @@ impl GlobalBarrierManagerContext {
.await
{
tracing::error!(
"failed to apply reschedule for offline scaling in recovery: {}",
e.to_string()
error = %e.as_report(),
"failed to apply reschedule for offline scaling in recovery",
);

mgr.fragment_manager
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ impl GlobalBarrierManagerContext {
}
rx.map(move |result| match result {
Ok(completion) => completion,
Err(e) => BarrierCompletion {
Err(_e) => BarrierCompletion {
prev_epoch,
result: Err(anyhow!("failed to receive barrier completion result: {:?}", e).into()),
result: Err(anyhow!("failed to receive barrier completion result").into()),
},
})
}
Expand Down
13 changes: 5 additions & 8 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use assert_matches::assert_matches;
use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::HummockSnapshot;
Expand Down Expand Up @@ -279,20 +279,17 @@ impl BarrierScheduler {

for (injected_rx, collect_rx, finish_rx) in contexts {
// Wait for this command to be injected, and record the result.
let info = injected_rx
.await
.map_err(|e| anyhow!("failed to inject barrier: {}", e))?;
let info = injected_rx.await.ok().context("failed to inject barrier")?;
infos.push(info);

// Throw the error if it occurs when collecting this barrier.
collect_rx
.await
.map_err(|e| anyhow!("failed to collect barrier: {}", e))??;
.ok()
.context("failed to collect barrier")??;

// Wait for this command to be finished.
finish_rx
.await
.map_err(|e| anyhow!("failed to finish command: {}", e))?;
finish_rx.await.ok().context("failed to finish command")?;
}

Ok(infos)
Expand Down

0 comments on commit b7da2f8

Please sign in to comment.