diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ae75453d8015a..de88c1ae17608 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -35,6 +35,7 @@ use risingwave_pb::stream_plan::{ UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; +use thiserror_ext::AsReport; use uuid::Uuid; use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; @@ -857,7 +858,7 @@ impl CommandContext { let table_id = table_fragments.table_id().table_id; tracing::warn!( table_id, - reason=?e, + error = %e.as_report(), "cancel_create_table_procedure failed for CancelStreamingJob", ); // If failed, check that table is not in meta store. diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index f91262117be16..0884560045398 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -37,6 +37,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::BarrierCompleteResponse; +use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -668,7 +669,7 @@ impl GlobalBarrierManager { // back to frontend fail_point!("inject_barrier_err_success"); let fail_node = self.checkpoint_control.barrier_failed(); - tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err); + tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); self.failure_recovery(err, fail_node).await; return; } @@ -693,7 +694,7 @@ impl GlobalBarrierManager { .drain(index..) .chain(self.checkpoint_control.barrier_failed().into_iter()) .collect_vec(); - tracing::warn!("Failed to commit epoch {}: {:?}", prev_epoch, err); + tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); self.failure_recovery(err, fail_nodes).await; } } @@ -728,7 +729,7 @@ impl GlobalBarrierManager { let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( "failure_recovery", - %err, + error = %err.as_report(), prev_epoch = prev_epoch.value().0 ); @@ -741,7 +742,7 @@ impl GlobalBarrierManager { .await; self.context.set_status(BarrierManagerStatus::Running).await; } else { - panic!("failed to execute barrier: {:?}", err); + panic!("failed to execute barrier: {}", err.as_report()); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index d9a8b58226456..e0ace5f9678a4 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::TryStreamExt; @@ -31,6 +31,7 @@ use risingwave_pb::stream_plan::AddMutation; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest, }; +use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, warn, Instrument}; @@ -197,9 +198,7 @@ impl GlobalBarrierManagerContext { tokio::spawn(async move { let res: MetaResult<()> = try { tracing::debug!("recovering stream job {}", table.id); - finished - .await - .map_err(|e| anyhow!("failed to finish command: {}", e))?; + finished.await.context("failed to finish command")?; tracing::debug!("finished stream job {}", table.id); // Once notified that job is finished we need to notify frontend. @@ -212,8 +211,9 @@ impl GlobalBarrierManagerContext { }; if let Err(e) = res.as_ref() { tracing::error!( - "stream job {} interrupted, will retry after recovery: {e:?}", - table.id + id = table.id, + error = %e.as_report(), + "stream job interrupted, will retry after recovery", ); // NOTE(kwannoel): We should not cleanup stream jobs, // we don't know if it's just due to CN killed, @@ -283,16 +283,15 @@ impl GlobalBarrierManagerContext { tokio::spawn(async move { let res: MetaResult<()> = try { tracing::debug!("recovering stream job {}", id); - finished - .await - .map_err(|e| anyhow!("failed to finish command: {}", e))?; - tracing::debug!("finished stream job {}", id); + finished.await.ok().context("failed to finish command")?; + tracing::debug!(id, "finished stream job"); catalog_controller.finish_streaming_job(id).await?; }; if let Err(e) = &res { tracing::error!( - "stream job {} interrupted, will retry after recovery: {e:?}", - id + id, + error = %e.as_report(), + "stream job interrupted, will retry after recovery", ); // NOTE(kwannoel): We should not cleanup stream jobs, // we don't know if it's just due to CN killed, @@ -354,7 +353,7 @@ impl GlobalBarrierManagerContext { let mut info = if self.env.opts.enable_scale_in_when_recovery { let info = self.resolve_actor_info().await; let scaled = self.scale_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "scale actors failed"); + warn!(error = %err.as_report(), "scale actors failed"); })?; if scaled { self.resolve_actor_info().await @@ -364,13 +363,13 @@ impl GlobalBarrierManagerContext { } else { // Migrate actors in expired CN to newly joined one. self.migrate_actors().await.inspect_err(|err| { - warn!(err = ?err, "migrate actors failed"); + warn!(error = %err.as_report(), "migrate actors failed"); })? }; // Reset all compute nodes, stop and drop existing actors. self.reset_compute_nodes(&info).await.inspect_err(|err| { - warn!(err = ?err, "reset compute nodes failed"); + warn!(error = %err.as_report(), "reset compute nodes failed"); })?; if scheduled_barriers.pre_apply_drop_scheduled().await { @@ -379,10 +378,10 @@ impl GlobalBarrierManagerContext { // update and build all actors. self.update_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "update actors failed"); + warn!(error = %err.as_report(), "update actors failed"); })?; self.build_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "build_actors failed"); + warn!(error = %err.as_report(), "build_actors failed"); })?; // get split assignments for all actors @@ -424,14 +423,14 @@ impl GlobalBarrierManagerContext { let res = match await_barrier_complete.await.result { Ok(response) => { if let Err(err) = command_ctx.post_collect().await { - warn!(err = ?err, "post_collect failed"); + warn!(error = %err.as_report(), "post_collect failed"); Err(err) } else { Ok((new_epoch.clone(), response)) } } Err(err) => { - warn!(err = ?err, "inject_barrier failed"); + warn!(error = %err.as_report(), "inject_barrier failed"); Err(err) } }; @@ -674,8 +673,8 @@ impl GlobalBarrierManagerContext { .await { tracing::error!( - "failed to apply reschedule for offline scaling in recovery: {}", - e.to_string() + error = %e.as_report(), + "failed to apply reschedule for offline scaling in recovery", ); mgr.fragment_manager diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index b9661a37d8e83..55c9fce4c4081 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -98,9 +98,9 @@ impl GlobalBarrierManagerContext { } rx.map(move |result| match result { Ok(completion) => completion, - Err(e) => BarrierCompletion { + Err(_e) => BarrierCompletion { prev_epoch, - result: Err(anyhow!("failed to receive barrier completion result: {:?}", e).into()), + result: Err(anyhow!("failed to receive barrier completion result").into()), }, }) } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index aab3234d620cb..26fd3ea8143ef 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Instant; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::HummockSnapshot; @@ -279,20 +279,17 @@ impl BarrierScheduler { for (injected_rx, collect_rx, finish_rx) in contexts { // Wait for this command to be injected, and record the result. - let info = injected_rx - .await - .map_err(|e| anyhow!("failed to inject barrier: {}", e))?; + let info = injected_rx.await.ok().context("failed to inject barrier")?; infos.push(info); // Throw the error if it occurs when collecting this barrier. collect_rx .await - .map_err(|e| anyhow!("failed to collect barrier: {}", e))??; + .ok() + .context("failed to collect barrier")??; // Wait for this command to be finished. - finish_rx - .await - .map_err(|e| anyhow!("failed to finish command: {}", e))?; + finish_rx.await.ok().context("failed to finish command")?; } Ok(infos)