Skip to content

Commit

Permalink
refactor(error): clean-up direct error formatting (part 4) (#14691)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jan 22, 2024
1 parent 47f0e8b commit 6ca7ee3
Show file tree
Hide file tree
Showing 36 changed files with 213 additions and 161 deletions.
4 changes: 2 additions & 2 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ pub fn execute_with_jni_env<T>(
Ok(true) => env
.exception_clear()
.inspect_err(|e| {
tracing::warn!("Exception occurred but failed to clear: {:?}", e);
tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear");
})
.unwrap(),
Ok(false) => {
// No exception, do nothing
}
Err(e) => {
tracing::warn!("Failed to check exception: {:?}", e);
tracing::warn!(error = %e.as_report(), "Failed to check exception");
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use risingwave_pb::hummock::{
compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct,
GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange,
};
use thiserror_ext::AsReport;
use tokio::sync::{OnceCell, RwLock};
use tracing::warn;

Expand Down Expand Up @@ -397,7 +398,9 @@ impl HummockManager {
pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) {
self.unregister_table_ids(table_ids)
.await
.unwrap_or_else(|e| panic!("unregister table ids fail: {table_ids:?} {e}"));
.unwrap_or_else(|e| {
panic!("unregister table ids fail: {table_ids:?} {}", e.as_report())
});
}

pub async fn update_compaction_config(
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use std::ops::DerefMut;
use std::time::Duration;

use anyhow::Context;
use function_name::named;
use futures::{stream, StreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -266,8 +267,7 @@ pub async fn collect_global_gc_watermark(
}
let mut buffered = stream::iter(worker_futures).buffer_unordered(workers.len());
while let Some(worker_result) = buffered.next().await {
let worker_watermark = worker_result
.map_err(|e| anyhow::anyhow!("Failed to collect GC watermark: {:#?}", e))?;
let worker_watermark = worker_result.context("Failed to collect GC watermark")?;
// None means either the worker has gone or the worker has not set a watermark.
global_watermark = cmp::min(
global_watermark,
Expand Down
76 changes: 43 additions & 33 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::{Arc, LazyLock};
use std::time::{Duration, Instant, SystemTime};

use anyhow::Context;
use arc_swap::ArcSwap;
use bytes::Bytes;
use fail::fail_point;
Expand Down Expand Up @@ -2037,9 +2038,9 @@ impl HummockManager {
Ok(_) => true,
Err(e) => {
tracing::error!(
"failed to send compaction request for compaction group {}. {}",
error = %e.as_report(),
"failed to send compaction request for compaction group {}",
compaction_group,
e
);
false
}
Expand Down Expand Up @@ -2081,27 +2082,28 @@ impl HummockManager {
.into());
}
Err(err) => {
tracing::warn!("Failed to get compaction task: {:#?}.", err);
return Err(anyhow::anyhow!(
"Failed to get compaction task: {:#?} compaction_group {}",
err,
compaction_group
)
.into());
tracing::warn!(error = %err.as_report(), "Failed to get compaction task");

return Err(anyhow::anyhow!(err)
.context(format!(
"Failed to get compaction task for compaction_group {}",
compaction_group,
))
.into());
}
};

// 3. send task to compactor
let compact_task_string = compact_task_to_string(&compact_task);
if let Err(e) = compactor.send_event(ResponseEvent::CompactTask(compact_task)) {
// TODO: shall we need to cancel on meta ?
return Err(anyhow::anyhow!(
"Failed to trigger compaction task: {:#?} compaction_group {}",
e,
compaction_group
)
.into());
}
// TODO: shall we need to cancel on meta ?
compactor
.send_event(ResponseEvent::CompactTask(compact_task))
.with_context(|| {
format!(
"Failed to trigger compaction task for compaction_group {}",
compaction_group,
)
})?;

tracing::info!(
"Trigger manual compaction task. {}. cost time: {:?}",
Expand Down Expand Up @@ -2422,8 +2424,12 @@ impl HummockManager {
)
.await
{
tracing::error!("Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status. task_id: {}, ERR: {e:?}", task.task_id);
tracing::error!(
task_id = task.task_id,
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status",
);
}
}
}
Expand Down Expand Up @@ -2745,7 +2751,7 @@ impl HummockManager {
}

(Some(Err(err)), _stream) => {
tracing::warn!("compactor {} leaving the cluster with err {:?}", context_id, err);
tracing::warn!(error = %err.as_report(), "compactor {} leaving the cluster with err", context_id);
hummock_manager.compactor_manager
.remove_compactor(context_id);
continue
Expand Down Expand Up @@ -2815,10 +2821,10 @@ impl HummockManager {
ResponseEvent::CompactTask(compact_task)
) {
tracing::warn!(
"Failed to send task {} to {}. {:#?}",
error = %e.as_report(),
"Failed to send task {} to {}",
task_id,
compactor.context_id(),
e
);

compactor_alive = false;
Expand All @@ -2831,7 +2837,7 @@ impl HummockManager {
break;
}
Err(err) => {
tracing::warn!("Failed to get compaction task: {:#?}.", err);
tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
break;
}
};
Expand All @@ -2842,9 +2848,9 @@ impl HummockManager {
if compactor_alive {
if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})){
tracing::warn!(
"Failed to send ask to {}. {:#?}",
error = %e.as_report(),
"Failed to send ask to {}",
context_id,
e
);

compactor_alive = false;
Expand All @@ -2864,7 +2870,7 @@ impl HummockManager {
}) => {
if let Err(e) = hummock_manager.report_compact_task(task_id, TaskStatus::try_from(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change))
.await {
tracing::error!("report compact_tack fail {e:?}");
tracing::error!(error = %e.as_report(), "report compact_tack fail");
}
},

Expand All @@ -2888,8 +2894,12 @@ impl HummockManager {
.cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled)
.await
{
tracing::error!("Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status. task_id: {}, ERR: {e:?}", task.task_id);
tracing::error!(
task_id = task.task_id,
error = %e.as_report(),
"Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
until we can successfully report its status."
);
}

if let Some(compactor) = compactor_manager.get_compactor(context_id) {
Expand Down Expand Up @@ -2943,10 +2953,10 @@ impl HummockManager {
for cg_id in self.compaction_group_ids().await {
if let Err(e) = self.compaction_state.try_sched_compaction(cg_id, task_type) {
tracing::warn!(
"Failed to schedule {:?} compaction for compaction group {}. {}",
error = %e.as_report(),
"Failed to schedule {:?} compaction for compaction group {}",
task_type,
cg_id,
e
);
}
}
Expand Down Expand Up @@ -3065,10 +3075,10 @@ impl HummockManager {
}
Err(e) => {
tracing::info!(
"failed to move state table [{}] from group-{} because {:?}",
error = %e.as_report(),
"failed to move state table [{}] from group-{}",
table_id,
parent_group_id,
e
)
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,12 @@ pub struct HummockVersionSafePoint {

impl Drop for HummockVersionSafePoint {
fn drop(&mut self) {
if let Err(e) = self
if self
.event_sender
.send(HummockManagerEvent::DropSafePoint(self.id))
.is_err()
{
tracing::debug!(
"failed to drop hummock version safe point {}. {}",
self.id,
e
);
tracing::debug!("failed to drop hummock version safe point {}", self.id);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::common::WorkerType;
use sync_point::sync_point;
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tokio_retry::strategy::{jitter, ExponentialBackoff};

Expand Down Expand Up @@ -107,9 +108,9 @@ impl HummockManager {
|| async {
if let Err(err) = self.release_contexts(vec![worker_node.id]).await {
tracing::warn!(
"Failed to release hummock context {}. {}. Will retry.",
error = %err.as_report(),
"Failed to release hummock context {}, will retry",
worker_node.id,
err
);
return Err(err);
}
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use risingwave_pb::hummock::{
};
use risingwave_rpc_client::error::{Result, RpcError};
use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -119,7 +120,7 @@ impl MockHummockMetaClient {
}

fn mock_err(error: super::error::Error) -> RpcError {
anyhow!("mock error: {}", error).into()
anyhow!(error).context("mock error").into()
}

#[async_trait]
Expand Down Expand Up @@ -326,7 +327,7 @@ impl HummockMetaClient for MockHummockMetaClient {
)
.await
{
tracing::error!("report compact_tack fail {e:?}");
tracing::error!(error = %e.as_report(), "report compact_tack fail");
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod compactor_manager;
pub mod error;
mod manager;
pub use manager::*;
use thiserror_ext::AsReport;

mod level_handler;
mod metrics_utils;
Expand Down Expand Up @@ -84,7 +85,7 @@ pub fn start_vacuum_metadata_loop(
}
}
if let Err(err) = vacuum.vacuum_metadata().await {
tracing::warn!("Vacuum metadata error {:#?}", err);
tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
}
}
});
Expand All @@ -111,7 +112,7 @@ pub fn start_vacuum_object_loop(
}
}
if let Err(err) = vacuum.vacuum_object().await {
tracing::warn!("Vacuum object error {:#?}", err);
tracing::warn!(error = %err.as_report(), "Vacuum object error");
}
}
});
Expand Down Expand Up @@ -146,7 +147,7 @@ pub fn start_checkpoint_loop(
.create_version_checkpoint(min_delta_log_num)
.await
{
tracing::warn!("Hummock version checkpoint error {:#?}", err);
tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error");
}
}
});
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::VacuumTask;
use thiserror_ext::AsReport;

use super::CompactorManagerRef;
use crate::backup_restore::BackupManagerRef;
Expand Down Expand Up @@ -148,9 +149,9 @@ impl VacuumManager {
}
Err(err) => {
tracing::warn!(
"Failed to send vacuum task to worker {}: {:#?}",
error = %err.as_report(),
"Failed to send vacuum task to worker {}",
compactor.context_id(),
err
);
self.compactor_manager
.remove_compactor(compactor.context_id());
Expand Down
5 changes: 3 additions & 2 deletions src/object_store/src/object/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use aws_sdk_s3::primitives::ByteStreamError;
use aws_smithy_types::body::SdkBody;
use risingwave_common::error::BoxedError;
use thiserror::Error;
use thiserror_ext::AsReport;
use tokio::sync::oneshot::error::RecvError;

#[derive(Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)]
Expand Down Expand Up @@ -97,13 +98,13 @@ where

impl From<RecvError> for ObjectError {
fn from(e: RecvError) -> Self {
ObjectErrorInner::Internal(e.to_string()).into()
ObjectErrorInner::Internal(e.to_report_string()).into()
}
}

impl From<ByteStreamError> for ObjectError {
fn from(e: ByteStreamError) -> Self {
ObjectErrorInner::Internal(e.to_string()).into()
ObjectErrorInner::Internal(e.to_report_string()).into()
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub mod object_metrics;

pub use error::*;
use object_metrics::ObjectStoreMetrics;
use thiserror_ext::AsReport;

pub type ObjectStoreRef = Arc<ObjectStoreImpl>;
pub type ObjectStreamingUploader = MonitoredStreamingUploader;
Expand Down Expand Up @@ -274,7 +275,7 @@ fn try_update_failure_metric<T>(
operation_type: &'static str,
) {
if let Err(e) = &result {
tracing::error!("{:?} failed because of: {:?}", operation_type, e);
tracing::error!(error = %e.as_report(), "{} failed", operation_type);
metrics
.failure_count
.with_label_values(&[operation_type])
Expand Down
Loading

0 comments on commit 6ca7ee3

Please sign in to comment.