diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs index 8c3d3dcf17f38..d0193a7717d2c 100644 --- a/src/jni_core/src/jvm_runtime.rs +++ b/src/jni_core/src/jvm_runtime.rs @@ -219,14 +219,14 @@ pub fn execute_with_jni_env( Ok(true) => env .exception_clear() .inspect_err(|e| { - tracing::warn!("Exception occurred but failed to clear: {:?}", e); + tracing::warn!(error = %e.as_report(), "Exception occurred but failed to clear"); }) .unwrap(), Ok(false) => { // No exception, do nothing } Err(e) => { - tracing::warn!("Failed to check exception: {:?}", e); + tracing::warn!(error = %e.as_report(), "Failed to check exception"); } } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index ab569f5ed468e..51f8e63dd6eb0 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -34,6 +34,7 @@ use risingwave_pb::hummock::{ compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct, GroupDelta, GroupDestroy, GroupMetaChange, GroupTableChange, }; +use thiserror_ext::AsReport; use tokio::sync::{OnceCell, RwLock}; use tracing::warn; @@ -397,7 +398,9 @@ impl HummockManager { pub async fn unregister_table_ids_fail_fast(&self, table_ids: &[StateTableId]) { self.unregister_table_ids(table_ids) .await - .unwrap_or_else(|e| panic!("unregister table ids fail: {table_ids:?} {e}")); + .unwrap_or_else(|e| { + panic!("unregister table ids fail: {table_ids:?} {}", e.as_report()) + }); } pub async fn update_compaction_config( diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index e17f90aaad170..a9b4b04c35f49 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; use std::ops::DerefMut; use std::time::Duration; +use anyhow::Context; use function_name::named; use futures::{stream, StreamExt}; use itertools::Itertools; @@ -266,8 +267,7 @@ pub async fn collect_global_gc_watermark( } let mut buffered = stream::iter(worker_futures).buffer_unordered(workers.len()); while let Some(worker_result) = buffered.next().await { - let worker_watermark = worker_result - .map_err(|e| anyhow::anyhow!("Failed to collect GC watermark: {:#?}", e))?; + let worker_watermark = worker_result.context("Failed to collect GC watermark")?; // None means either the worker has gone or the worker has not set a watermark. global_watermark = cmp::min( global_watermark, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 63567145d0254..aaeb7b3d105f5 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicBool; use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant, SystemTime}; +use anyhow::Context; use arc_swap::ArcSwap; use bytes::Bytes; use fail::fail_point; @@ -2037,9 +2038,9 @@ impl HummockManager { Ok(_) => true, Err(e) => { tracing::error!( - "failed to send compaction request for compaction group {}. {}", + error = %e.as_report(), + "failed to send compaction request for compaction group {}", compaction_group, - e ); false } @@ -2081,27 +2082,28 @@ impl HummockManager { .into()); } Err(err) => { - tracing::warn!("Failed to get compaction task: {:#?}.", err); - return Err(anyhow::anyhow!( - "Failed to get compaction task: {:#?} compaction_group {}", - err, - compaction_group - ) - .into()); + tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); + + return Err(anyhow::anyhow!(err) + .context(format!( + "Failed to get compaction task for compaction_group {}", + compaction_group, + )) + .into()); } }; // 3. send task to compactor let compact_task_string = compact_task_to_string(&compact_task); - if let Err(e) = compactor.send_event(ResponseEvent::CompactTask(compact_task)) { - // TODO: shall we need to cancel on meta ? - return Err(anyhow::anyhow!( - "Failed to trigger compaction task: {:#?} compaction_group {}", - e, - compaction_group - ) - .into()); - } + // TODO: shall we need to cancel on meta ? + compactor + .send_event(ResponseEvent::CompactTask(compact_task)) + .with_context(|| { + format!( + "Failed to trigger compaction task for compaction_group {}", + compaction_group, + ) + })?; tracing::info!( "Trigger manual compaction task. {}. cost time: {:?}", @@ -2422,8 +2424,12 @@ impl HummockManager { ) .await { - tracing::error!("Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat - until we can successfully report its status. task_id: {}, ERR: {e:?}", task.task_id); + tracing::error!( + task_id = task.task_id, + error = %e.as_report(), + "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat + until we can successfully report its status", + ); } } } @@ -2745,7 +2751,7 @@ impl HummockManager { } (Some(Err(err)), _stream) => { - tracing::warn!("compactor {} leaving the cluster with err {:?}", context_id, err); + tracing::warn!(error = %err.as_report(), "compactor {} leaving the cluster with err", context_id); hummock_manager.compactor_manager .remove_compactor(context_id); continue @@ -2815,10 +2821,10 @@ impl HummockManager { ResponseEvent::CompactTask(compact_task) ) { tracing::warn!( - "Failed to send task {} to {}. {:#?}", + error = %e.as_report(), + "Failed to send task {} to {}", task_id, compactor.context_id(), - e ); compactor_alive = false; @@ -2831,7 +2837,7 @@ impl HummockManager { break; } Err(err) => { - tracing::warn!("Failed to get compaction task: {:#?}.", err); + tracing::warn!(error = %err.as_report(), "Failed to get compaction task"); break; } }; @@ -2842,9 +2848,9 @@ impl HummockManager { if compactor_alive { if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})){ tracing::warn!( - "Failed to send ask to {}. {:#?}", + error = %e.as_report(), + "Failed to send ask to {}", context_id, - e ); compactor_alive = false; @@ -2864,7 +2870,7 @@ impl HummockManager { }) => { if let Err(e) = hummock_manager.report_compact_task(task_id, TaskStatus::try_from(task_status).unwrap(), sorted_output_ssts, Some(table_stats_change)) .await { - tracing::error!("report compact_tack fail {e:?}"); + tracing::error!(error = %e.as_report(), "report compact_tack fail"); } }, @@ -2888,8 +2894,12 @@ impl HummockManager { .cancel_compact_task(task.task_id, TaskStatus::HeartbeatCanceled) .await { - tracing::error!("Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat - until we can successfully report its status. task_id: {}, ERR: {e:?}", task.task_id); + tracing::error!( + task_id = task.task_id, + error = %e.as_report(), + "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat + until we can successfully report its status." + ); } if let Some(compactor) = compactor_manager.get_compactor(context_id) { @@ -2943,10 +2953,10 @@ impl HummockManager { for cg_id in self.compaction_group_ids().await { if let Err(e) = self.compaction_state.try_sched_compaction(cg_id, task_type) { tracing::warn!( - "Failed to schedule {:?} compaction for compaction group {}. {}", + error = %e.as_report(), + "Failed to schedule {:?} compaction for compaction group {}", task_type, cg_id, - e ); } } @@ -3065,10 +3075,10 @@ impl HummockManager { } Err(e) => { tracing::info!( - "failed to move state table [{}] from group-{} because {:?}", + error = %e.as_report(), + "failed to move state table [{}] from group-{}", table_id, parent_group_id, - e ) } } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index bca0d13cfd241..5cf270076e6c3 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -55,15 +55,12 @@ pub struct HummockVersionSafePoint { impl Drop for HummockVersionSafePoint { fn drop(&mut self) { - if let Err(e) = self + if self .event_sender .send(HummockManagerEvent::DropSafePoint(self.id)) + .is_err() { - tracing::debug!( - "failed to drop hummock version safe point {}. {}", - self.id, - e - ); + tracing::debug!("failed to drop hummock version safe point {}", self.id); } } } diff --git a/src/meta/src/hummock/manager/worker.rs b/src/meta/src/hummock/manager/worker.rs index e496a94e614ff..549365607b9b3 100644 --- a/src/meta/src/hummock/manager/worker.rs +++ b/src/meta/src/hummock/manager/worker.rs @@ -17,6 +17,7 @@ use std::time::Duration; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::WorkerType; use sync_point::sync_point; +use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -107,9 +108,9 @@ impl HummockManager { || async { if let Err(err) = self.release_contexts(vec![worker_node.id]).await { tracing::warn!( - "Failed to release hummock context {}. {}. Will retry.", + error = %err.as_report(), + "Failed to release hummock context {}, will retry", worker_node.id, - err ); return Err(err); } diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index fc77e9284fca8..bdc674830458b 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -40,6 +40,7 @@ use risingwave_pb::hummock::{ }; use risingwave_rpc_client::error::{Result, RpcError}; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient}; +use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -119,7 +120,7 @@ impl MockHummockMetaClient { } fn mock_err(error: super::error::Error) -> RpcError { - anyhow!("mock error: {}", error).into() + anyhow!(error).context("mock error").into() } #[async_trait] @@ -326,7 +327,7 @@ impl HummockMetaClient for MockHummockMetaClient { ) .await { - tracing::error!("report compact_tack fail {e:?}"); + tracing::error!(error = %e.as_report(), "report compact_tack fail"); } } } diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index 6f948fd413ba7..65017943cb90f 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -16,6 +16,7 @@ pub mod compactor_manager; pub mod error; mod manager; pub use manager::*; +use thiserror_ext::AsReport; mod level_handler; mod metrics_utils; @@ -84,7 +85,7 @@ pub fn start_vacuum_metadata_loop( } } if let Err(err) = vacuum.vacuum_metadata().await { - tracing::warn!("Vacuum metadata error {:#?}", err); + tracing::warn!(error = %err.as_report(), "Vacuum metadata error"); } } }); @@ -111,7 +112,7 @@ pub fn start_vacuum_object_loop( } } if let Err(err) = vacuum.vacuum_object().await { - tracing::warn!("Vacuum object error {:#?}", err); + tracing::warn!(error = %err.as_report(), "Vacuum object error"); } } }); @@ -146,7 +147,7 @@ pub fn start_checkpoint_loop( .create_version_checkpoint(min_delta_log_num) .await { - tracing::warn!("Hummock version checkpoint error {:#?}", err); + tracing::warn!(error = %err.as_report(), "Hummock version checkpoint error"); } } }); diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index d03a581087bb2..10f748b671659 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -20,6 +20,7 @@ use itertools::Itertools; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::VacuumTask; +use thiserror_ext::AsReport; use super::CompactorManagerRef; use crate::backup_restore::BackupManagerRef; @@ -148,9 +149,9 @@ impl VacuumManager { } Err(err) => { tracing::warn!( - "Failed to send vacuum task to worker {}: {:#?}", + error = %err.as_report(), + "Failed to send vacuum task to worker {}", compactor.context_id(), - err ); self.compactor_manager .remove_compactor(compactor.context_id()); diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index c84527e6bdb03..f1c18bb391188 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -22,6 +22,7 @@ use aws_sdk_s3::primitives::ByteStreamError; use aws_smithy_types::body::SdkBody; use risingwave_common::error::BoxedError; use thiserror::Error; +use thiserror_ext::AsReport; use tokio::sync::oneshot::error::RecvError; #[derive(Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] @@ -97,13 +98,13 @@ where impl From for ObjectError { fn from(e: RecvError) -> Self { - ObjectErrorInner::Internal(e.to_string()).into() + ObjectErrorInner::Internal(e.to_report_string()).into() } } impl From for ObjectError { fn from(e: ByteStreamError) -> Self { - ObjectErrorInner::Internal(e.to_string()).into() + ObjectErrorInner::Internal(e.to_report_string()).into() } } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 32adba09d2bb1..cfb3d3d699b42 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -37,6 +37,7 @@ pub mod object_metrics; pub use error::*; use object_metrics::ObjectStoreMetrics; +use thiserror_ext::AsReport; pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; @@ -274,7 +275,7 @@ fn try_update_failure_metric( operation_type: &'static str, ) { if let Err(e) = &result { - tracing::error!("{:?} failed because of: {:?}", operation_type, e); + tracing::error!(error = %e.as_report(), "{} failed", operation_type); metrics .failure_count .with_label_values(&[operation_type]) diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 9cf89c65f5581..19bddcfc7ac52 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -20,6 +20,7 @@ use futures::{stream, StreamExt, TryStreamExt}; use opendal::services::Memory; use opendal::{Metakey, Operator, Writer}; use risingwave_common::range::RangeBoundsExt; +use thiserror_ext::AsReport; use crate::object::{ BoxedStreamingUploader, ObjectDataStream, ObjectError, ObjectMetadata, ObjectMetadataIter, @@ -116,9 +117,9 @@ impl ObjectStore for OpendalObjectStore { )); let range: Range = (range.start as u64)..(range.end as u64); let reader = self.op.reader_with(path).range(range).await?; - let stream = reader - .into_stream() - .map(|item| item.map_err(|e| ObjectError::internal(format!("OpendalError: {:?}", e)))); + let stream = reader.into_stream().map(|item| { + item.map_err(|e| ObjectError::internal(format!("OpendalError: {}", e.as_report()))) + }); Ok(Box::pin(stream)) } diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 5c6f056d9e535..7bff2292a1c7f 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -47,6 +47,7 @@ use itertools::Itertools; use risingwave_common::config::{ObjectStoreConfig, S3ObjectStoreConfig}; use risingwave_common::monitor::connection::monitor_connector; use risingwave_common::range::RangeBoundsExt; +use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -281,7 +282,7 @@ impl StreamingUploader for S3StreamingUploader { Ok(()) } } else if let Err(e) = self.flush_multipart_and_complete().await { - tracing::warn!("Failed to upload object {}: {:?}", self.key, e); + tracing::warn!(key = self.key, error = %e.as_report(), "Failed to upload object"); self.abort_multipart_upload().await?; Err(e) } else { diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index d93e0b46ade9e..6d2c01527ceb9 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -304,12 +304,12 @@ pub async fn compactor_serve( _ = tokio::signal::ctrl_c() => {}, _ = &mut shutdown_recv => { for (join_handle, shutdown_sender) in sub_tasks { - if let Err(err) = shutdown_sender.send(()) { - tracing::warn!("Failed to send shutdown: {:?}", err); + if shutdown_sender.send(()).is_err() { + tracing::warn!("Failed to send shutdown"); continue; } - if let Err(err) = join_handle.await { - tracing::warn!("Failed to join shutdown: {:?}", err); + if join_handle.await.is_err() { + tracing::warn!("Failed to join shutdown"); } } }, @@ -414,12 +414,12 @@ pub async fn shared_compactor_serve( tokio::select! { _ = tokio::signal::ctrl_c() => {}, _ = &mut shutdown_recv => { - if let Err(err) = shutdown_sender.send(()) { - tracing::warn!("Failed to send shutdown: {:?}", err); - } - if let Err(err) = join_handle.await { - tracing::warn!("Failed to join shutdown: {:?}", err); - } + if shutdown_sender.send(()).is_err() { + tracing::warn!("Failed to send shutdown"); + } + if join_handle.await.is_err() { + tracing::warn!("Failed to join shutdown"); + } }, } }, diff --git a/src/storage/hummock_test/src/bin/replay/main.rs b/src/storage/hummock_test/src/bin/replay/main.rs index e8637de487734..d5190532f2d98 100644 --- a/src/storage/hummock_test/src/bin/replay/main.rs +++ b/src/storage/hummock_test/src/bin/replay/main.rs @@ -16,6 +16,9 @@ #![feature(coroutines)] #![feature(stmt_expr_attributes)] #![feature(proc_macro_hygiene)] +#![feature(register_tool)] +#![register_tool(rw)] +#![allow(rw::format_error)] // test code #[macro_use] mod replay_impl; diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index ac3ef6eb02e8c..019139eedeedc 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -27,6 +27,7 @@ use risingwave_hummock_sdk::key::{get_table_id, TABLE_PREFIX_LEN}; use risingwave_pb::catalog::Table; use risingwave_rpc_client::error::{Result as RpcResult, RpcError}; use risingwave_rpc_client::MetaClient; +use thiserror_ext::AsReport; use crate::hummock::{HummockError, HummockResult}; @@ -315,8 +316,8 @@ impl FilterKeyExtractorManagerInner { .await .map_err(|e| { HummockError::other(format!( - "request rpc list_tables for meta failed because {:?}", - e + "request rpc list_tables for meta failed: {}", + e.as_report() )) })?; let mut guard = self.table_id_to_filter_key_extractor.write(); diff --git a/src/storage/src/hummock/backup_reader.rs b/src/storage/src/hummock/backup_reader.rs index d8757e5f5ea10..0b99970f26e75 100644 --- a/src/storage/src/hummock/backup_reader.rs +++ b/src/storage/src/hummock/backup_reader.rs @@ -30,6 +30,7 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use thiserror_ext::AsReport; use crate::error::{StorageError, StorageResult}; use crate::hummock::local_version::pinned_version::{PinVersionAction, PinnedVersion}; @@ -131,7 +132,7 @@ impl BackupReader { } if let Err(e) = current_store.0.refresh_manifest().await { // reschedule refresh request - tracing::warn!("failed to refresh backup manifest, will retry. {}", e); + tracing::warn!(error = %e.as_report(), "failed to refresh backup manifest, will retry"); let backup_reader_clone = backup_reader.clone(); tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(60)).await; @@ -155,10 +156,9 @@ impl BackupReader { } pub fn try_refresh_manifest(self: &BackupReaderRef, min_manifest_id: u64) { - let _ = self - .refresh_tx - .send(min_manifest_id) - .inspect_err(|e| tracing::warn!("failed to send refresh_manifest request {}", e)); + let _ = self.refresh_tx.send(min_manifest_id).inspect_err(|_| { + tracing::warn!(min_manifest_id, "failed to send refresh_manifest request") + }); } /// Tries to get a hummock version eligible for querying `epoch`. @@ -198,7 +198,11 @@ impl BackupReader { // TODO: change to v2 let snapshot: meta_snapshot_v1::MetaSnapshotV1 = current_store.0.get(snapshot_id).await.map_err(|e| { - format!("failed to get meta snapshot {}. {}", snapshot_id, e) + format!( + "failed to get meta snapshot {}: {}", + snapshot_id, + e.as_report() + ) })?; let version_holder = build_version_holder(snapshot); let version_clone = version_holder.0.clone(); @@ -238,10 +242,9 @@ impl BackupReader { if let Err(e) = self.set_store(config.clone()).await { // Retry is driven by periodic system params notification. tracing::warn!( - "failed to update backup reader: url={}, dir={}, {:#?}", - config.0, - config.1, - e + url = config.0, dir = config.1, + error = %e.as_report(), + "failed to update backup reader", ); } } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 1778e263c54f9..b542eb141492e 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -30,6 +30,7 @@ use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, Table use risingwave_hummock_sdk::{can_concat, EpochWithGap, HummockEpoch}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType}; +use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; use super::iterator::MonitoredCompactorIterator; @@ -327,7 +328,7 @@ pub async fn compact( .await { Err(e) => { - tracing::error!("Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error {:?}", compact_task.existing_table_ids, e); + tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); let task_status = TaskStatus::ExecuteFailed; return compact_done(compact_task, context.clone(), vec![], task_status); } @@ -407,7 +408,7 @@ pub async fn compact( } } Err(e) => { - tracing::warn!("Failed to generate_splits {:#?}", e); + tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); task_status = TaskStatus::ExecuteFailed; return compact_done(compact_task, context.clone(), vec![], task_status); } @@ -527,9 +528,9 @@ pub async fn compact( Err(e) => { task_status = TaskStatus::ExecuteFailed; tracing::warn!( - "Compaction task {} failed with error: {:#?}", + error = %e.as_report(), + "Compaction task {} failed with error", compact_task.task_id, - e ); } } @@ -597,18 +598,18 @@ pub async fn compact( Some(Ok(Err(e))) => { task_status = TaskStatus::ExecuteFailed; tracing::warn!( - "Compaction task {} failed with error: {:#?}", + error = %e.as_report(), + "Compaction task {} failed with error", compact_task.task_id, - e ); break; } Some(Err(e)) => { task_status = TaskStatus::JoinHandleFailed; tracing::warn!( - "Compaction task {} failed with join handle error: {:#?}", + error = %e.as_report(), + "Compaction task {} failed with join handle error", compact_task.task_id, - e ); break; } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 0b59f590ac188..158b5b33d414a 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -22,6 +22,7 @@ use risingwave_pb::hummock::report_compaction_task_request::{ }; use risingwave_pb::hummock::{ReportFullScanTaskRequest, ReportVacuumTaskRequest}; use risingwave_rpc_client::GrpcCompactorProxyClient; +use thiserror_ext::AsReport; use tokio::sync::mpsc; use tonic::Request; @@ -344,8 +345,8 @@ pub fn start_compactor( Err(e) => { tracing::warn!( - "Subscribing to compaction tasks failed with error: {}. Will retry.", - e + error = %e.as_report(), + "Subscribing to compaction tasks failed with error. Will retry.", ); continue 'start_stream; } @@ -385,7 +386,7 @@ pub fn start_compactor( .expect("Clock may have gone backwards") .as_millis() as u64, }) { - tracing::warn!("Failed to report task progress. {e:?}"); + tracing::warn!(error = %e.as_report(), "Failed to report task progress"); // re subscribe stream continue 'start_stream; } @@ -408,7 +409,7 @@ pub fn start_compactor( .expect("Clock may have gone backwards") .as_millis() as u64, }) { - tracing::warn!("Failed to pull task {e:?}"); + tracing::warn!(error = %e.as_report(), "Failed to pull task"); // re subscribe stream continue 'start_stream; @@ -476,7 +477,7 @@ pub fn start_compactor( compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await }, Err(err) => { - tracing::warn!("Failed to track pending SST object id. {:#?}", err); + tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id"); let mut compact_task = compact_task; compact_task.set_task_status(TaskStatus::TrackSstObjectIdFailed); (compact_task, HashMap::default()) @@ -500,11 +501,11 @@ pub fn start_compactor( .expect("Clock may have gone backwards") .as_millis() as u64, }) { - tracing::warn!("Failed to report task {task_id:?} . {e:?}"); + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); if enable_check_compaction_result && need_check_task { match check_compaction_result(&compact_task, context.clone()).await { Err(e) => { - tracing::warn!("Failed to check compaction task {} because: {:?}",compact_task.task_id, e); + tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); }, Ok(true) => (), Ok(false) => { @@ -525,7 +526,7 @@ pub fn start_compactor( Vacuum::report_vacuum_task(vacuum_task, meta_client).await; } Err(e) => { - tracing::warn!("Failed to vacuum task: {:#?}", e) + tracing::warn!(error = %e.as_report(), "Failed to vacuum task") } } } @@ -535,7 +536,7 @@ pub fn start_compactor( Vacuum::report_full_scan_task(object_ids, total_object_count, total_object_size, meta_client).await; } Err(e) => { - tracing::warn!("Failed to iter object: {:#?}", e); + tracing::warn!(error = %e.as_report(), "Failed to iter object"); } } } @@ -620,7 +621,7 @@ pub fn start_shared_compactor( )), }; if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{ - tracing::warn!("Failed to report heartbeat {:#?}", e); + tracing::warn!(error = %e.as_report(), "Failed to report heartbeat"); } continue } @@ -696,7 +697,7 @@ pub fn start_shared_compactor( if enable_check_compaction_result && need_check_task { match check_compaction_result(&compact_task, context.clone()).await { Err(e) => { - tracing::warn!("Failed to check compaction task {} because: {:?}",compact_task.task_id, e); + tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", compact_task.task_id); }, Ok(true) => (), Ok(false) => { @@ -705,7 +706,7 @@ pub fn start_shared_compactor( } } } - Err(e) => tracing::warn!("Failed to report task {task_id:?} . {e:?}"), + Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"), } } @@ -722,11 +723,11 @@ pub fn start_shared_compactor( }; match cloned_grpc_proxy_client.report_vacuum_task(report_vacuum_task_request).await { Ok(_) => tracing::info!("Finished vacuuming SSTs"), - Err(e) => tracing::warn!("Failed to report vacuum task: {:#?}", e), + Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report vacuum task"), } } Err(e) => { - tracing::warn!("Failed to vacuum task: {:#?}", e) + tracing::warn!(error = %e.as_report(), "Failed to vacuum task") } } } @@ -745,11 +746,11 @@ pub fn start_shared_compactor( .await { Ok(_) => tracing::info!("Finished full scan SSTs"), - Err(e) => tracing::warn!("Failed to report full scan task: {:#?}", e), + Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report full scan task"), } } Err(e) => { - tracing::warn!("Failed to iter object: {:#?}", e); + tracing::warn!(error = %e.as_report(), "Failed to iter object"); } } } diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 73f5d154d79ff..8b94364fb2f8a 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -28,6 +28,7 @@ use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, UserKey}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; +use thiserror_ext::AsReport; use tracing::error; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; @@ -203,14 +204,14 @@ async fn compact_shared_buffer( } Ok(Err(e)) => { compact_success = false; - tracing::warn!("Shared Buffer Compaction failed with error: {:#?}", e); + tracing::warn!(error = %e.as_report(), "Shared Buffer Compaction failed with error"); err = Some(e); } Err(e) => { compact_success = false; tracing::warn!( - "Shared Buffer Compaction failed with future error: {:#?}", - e + error = %e.as_report(), + "Shared Buffer Compaction failed with future error", ); err = Some(HummockError::compaction_executor( "failed while execute in tokio", @@ -265,7 +266,7 @@ async fn compact_shared_buffer( .await { Err(e) => { - tracing::warn!("Failed check flush result of memtable because of {:?}", e); + tracing::warn!(error = %e.as_report(), "Failed check flush result of memtable"); } Ok(true) => (), Ok(false) => { diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index e3dcf712f0e51..d1b662ce148bd 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -14,6 +14,7 @@ use risingwave_object_store::object::ObjectError; use thiserror::Error; +use thiserror_ext::AsReport; use tokio::sync::oneshot::error::RecvError; // TODO(error-handling): should prefer use error types than strings. @@ -152,7 +153,7 @@ impl HummockError { impl From for HummockError { fn from(error: prost::DecodeError) -> Self { - HummockErrorInner::DecodeError(error.to_string()).into() + HummockErrorInner::DecodeError(error.to_report_string()).into() } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 1fefdb0f5ea58..9d6e72dc488df 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -22,6 +22,7 @@ use await_tree::InstrumentAwait; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, trace, warn}; @@ -140,7 +141,7 @@ async fn flush_imms( .add_watermark_object_id(Some(*epoch)) .await .inspect_err(|e| { - error!("unable to set watermark sst id. epoch: {}, {:?}", epoch, e); + error!(epoch, error = %e.as_report(), "unable to set watermark sst id"); }); } compact( @@ -707,6 +708,9 @@ fn to_sync_result(result: &HummockResult) -> HummockResult Err(HummockError::other(format!("sync task failed for {:?}", e))), + Err(e) => Err(HummockError::other(format!( + "sync task failed: {}", + e.as_report() + ))), } } diff --git a/src/storage/src/hummock/event_handler/mod.rs b/src/storage/src/hummock/event_handler/mod.rs index 42b636fa8a1bd..ffce8c622fbd6 100644 --- a/src/storage/src/hummock/event_handler/mod.rs +++ b/src/storage/src/hummock/event_handler/mod.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use parking_lot::RwLock; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockEpoch; +use thiserror_ext::AsReport; use tokio::sync::{mpsc, oneshot}; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; @@ -192,10 +193,10 @@ impl Drop for LocalInstanceGuard { }) .unwrap_or_else(|err| { tracing::error!( - "LocalInstanceGuard table_id {:?} instance_id {} Drop SendError {:?}", - self.table_id, - self.instance_id, - err + error = %err.as_report(), + table_id = %self.table_id, + instance_id = self.instance_id, + "LocalInstanceGuard Drop SendError", ) }) } diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index d124d067ac67d..5e7a285d70700 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -33,6 +33,7 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; use risingwave_hummock_sdk::{HummockSstableObjectId, KeyComparator}; +use thiserror_ext::AsReport; use tokio::sync::Semaphore; use tokio::task::JoinHandle; @@ -304,7 +305,7 @@ impl CacheRefillTask { let holders = match Self::meta_cache_refill(&context, delta).await { Ok(holders) => holders, Err(e) => { - tracing::warn!("meta cache refill error: {:?}", e); + tracing::warn!(error = %e.as_report(), "meta cache refill error"); return; } }; @@ -514,7 +515,9 @@ impl CacheRefillTask { }); let parent_ssts = match try_join_all(futures).await { Ok(parent_ssts) => parent_ssts.into_iter().flatten().collect_vec(), - Err(e) => return tracing::error!("get old meta from cache error: {}", e), + Err(e) => { + return tracing::error!(error = %e.as_report(), "get old meta from cache error") + } }; let units = Self::get_units_to_refill_by_inheritance(context, &holders, &parent_ssts); @@ -525,7 +528,7 @@ impl CacheRefillTask { async move { let sst = ssts.get(&unit.sst_obj_id).unwrap(); if let Err(e) = Self::data_file_cache_refill_unit(context, sst, unit).await { - tracing::error!("data file cache unit refill error: {}", e); + tracing::error!(error = %e.as_report(), "data file cache unit refill error"); } } }); diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index af64d62acf16d..12c775165a75b 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -35,6 +35,7 @@ use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, }; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, LocalSstableInfo}; +use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tracing::{debug, error, info}; @@ -137,8 +138,8 @@ impl MergingImmTask { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result, Err(err) => Err(HummockError::other(format!( - "fail to join imm merge join handle: {:?}", - err + "fail to join imm merge join handle: {}", + err.as_report() ))), }) } @@ -230,8 +231,8 @@ impl UploadingTask { }), Err(err) => Err(HummockError::other(format!( - "fail to join upload join handle: {:?}", - err + "fail to join upload join handle: {}", + err.as_report() ))), }) } @@ -244,8 +245,9 @@ impl UploadingTask { Ok(sstables) => return Poll::Ready(sstables), Err(e) => { error!( - "a flush task {:?} failed, start retry. Task info: {:?}", - self.task_info, e + error = %e.as_report(), + task_info = ?self.task_info, + "a flush task failed, start retry", ); self.join_handle = (self.spawn_upload_task)(self.payload.clone(), self.task_info.clone()); diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 419bcfc6f5155..46ef8edc442b3 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -25,6 +25,7 @@ use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSIO use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::PbLevel; use risingwave_rpc_client::HummockMetaClient; +use thiserror_ext::AsReport; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_retry::strategy::jitter; @@ -274,8 +275,8 @@ pub(crate) async fn start_pinned_version_worker( Err(err) => { let retry_after = retry_backoff.next().unwrap_or(max_retry_interval); tracing::warn!( - "Failed to unpin version {:?}. Will retry after about {} milliseconds", - err, + error = %err.as_report(), + "Failed to unpin version. Will retry after about {} milliseconds", retry_after.as_millis() ); tokio::time::sleep(retry_after).await; diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 2921eb064de9f..e96d575ce599b 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -81,7 +81,7 @@ impl ObserverState for HummockObserverNode { ), )) .inspect_err(|e| { - tracing::error!("unable to send version delta: {:?}", e); + tracing::error!(event = ?e.0, "unable to send version delta"); }); } @@ -131,7 +131,7 @@ impl ObserverState for HummockObserverNode { )), )) .inspect_err(|e| { - tracing::error!("unable to send full version: {:?}", e); + tracing::error!(event = ?e.0, "unable to send full version"); }); let snapshot_version = snapshot.version.unwrap(); self.version = snapshot_version.catalog_version; diff --git a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs index 729ffb59303c4..7acf9070ae994 100644 --- a/src/storage/src/hummock/sstable/forward_sstable_iterator.rs +++ b/src/storage/src/hummock/sstable/forward_sstable_iterator.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use risingwave_hummock_sdk::key::FullKey; +use thiserror_ext::AsReport; use super::super::{HummockResult, HummockValue}; use crate::hummock::block_stream::BlockStream; @@ -131,14 +132,20 @@ impl SstableIterator { if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx { match self .sstable_store - .prefetch_blocks(self.sst.value(), idx, self.preload_end_block_idx, - self.options.cache_policy, - &mut self.stats, + .prefetch_blocks( + self.sst.value(), + idx, + self.preload_end_block_idx, + self.options.cache_policy, + &mut self.stats, ) .verbose_instrument_await("prefetch_blocks") - .await { + .await + { Ok(preload_stream) => self.preload_stream = Some(preload_stream), - Err(e) => tracing::warn!("failed to create stream for prefetch data because of {:?}, fall back to block get.", e), + Err(e) => { + tracing::warn!(error = %e.as_report(), "failed to create stream for prefetch data, fall back to block get") + } } } @@ -177,7 +184,7 @@ impl SstableIterator { } if self.preload_stream.is_none() && idx + 1 < self.preload_end_block_idx { if let Err(e) = ret { - tracing::warn!("recreate stream because the connection to remote storage has closed, reason: {:?}", e); + tracing::warn!(error = %e.as_report(), "recreate stream because the connection to remote storage has closed"); if self.preload_retry_times >= self.options.max_preload_retry_times { break; } @@ -200,7 +207,7 @@ impl SstableIterator { self.preload_stream = Some(stream); } Err(e) => { - tracing::warn!("failed to recreate stream meet IO error: {:?}", e); + tracing::warn!(error = %e.as_report(), "failed to recreate stream meet IO error"); break; } } diff --git a/src/storage/src/hummock/sstable/sstable_object_id_manager.rs b/src/storage/src/hummock/sstable/sstable_object_id_manager.rs index 21fe86dcaad74..c32867e45d1e8 100644 --- a/src/storage/src/hummock/sstable/sstable_object_id_manager.rs +++ b/src/storage/src/hummock/sstable/sstable_object_id_manager.rs @@ -26,6 +26,7 @@ use risingwave_pb::hummock::GetNewSstIdsRequest; use risingwave_pb::meta::heartbeat_request::extra_info::Info; use risingwave_rpc_client::{ExtraInfoSource, GrpcCompactorProxyClient, HummockMetaClient}; use sync_point::sync_point; +use thiserror_ext::AsReport; use tokio::sync::oneshot; use crate::hummock::{HummockError, HummockResult}; @@ -284,8 +285,8 @@ impl GetObjectId for SharedComapctorObjectIdManager { Ok(start_id) } Err(e) => Err(HummockError::other(format!( - "Fail to get new sst id, {}", - e + "Fail to get new sst id: {}", + e.as_report() ))), } } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 9918274b8ee91..06bb5ee054866 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -33,6 +33,7 @@ use risingwave_object_store::object::{ ObjectError, ObjectMetadataIter, ObjectStoreRef, ObjectStreamingUploader, }; use risingwave_pb::hummock::SstableInfo; +use thiserror_ext::AsReport; use tokio::task::JoinHandle; use tokio::time::Instant; use zstd::zstd_safe::WriteBuf; @@ -286,7 +287,7 @@ impl SstableStore { pub fn delete_cache(&self, object_id: HummockSstableObjectId) { self.meta_cache.erase(object_id, &object_id); if let Err(e) = self.meta_file_cache.remove(&object_id) { - tracing::warn!("meta file cache remove error: {}", e); + tracing::warn!(error = %e.as_report(), "meta file cache remove error"); } } @@ -561,7 +562,7 @@ impl SstableStore { pub fn clear_block_cache(&self) { self.block_cache.clear(); if let Err(e) = self.data_file_cache.clear() { - tracing::warn!("data file cache clear error: {}", e); + tracing::warn!(error = %e.as_report(), "data file cache clear error"); } } @@ -569,7 +570,7 @@ impl SstableStore { pub fn clear_meta_cache(&self) { self.meta_cache.clear(); if let Err(e) = self.meta_file_cache.clear() { - tracing::warn!("meta file cache clear error: {}", e); + tracing::warn!(error = %e.as_report(), "meta file cache clear error"); } } @@ -714,8 +715,8 @@ impl SstableStore { Ok(Err(e)) => return Err(HummockError::from(e)), Err(e) => { return Err(HummockError::other(format!( - "failed to get result, this read request may be canceled: {:?}", - e + "failed to get result, this read request may be canceled: {}", + e.as_report() ))) } }; diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 96049bd90d140..d1308e32a9aef 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -32,7 +32,7 @@ use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; -use tracing::log::error; +use tracing::error; use super::local_hummock_storage::LocalHummockStorage; use super::version::{CommittedVersion, HummockVersionReader}; @@ -69,7 +69,7 @@ impl Drop for HummockStorageShutdownGuard { let _ = self .shutdown_sender .send(HummockEvent::Shutdown) - .inspect_err(|e| error!("unable to send shutdown: {:?}", e)); + .inspect_err(|e| error!(event = ?e.0, "unable to send shutdown")); } } diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 0b4700500f40b..f40e6b1aa0433 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -578,7 +578,7 @@ pub(crate) async fn wait_for_epoch( } loop { match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await { - Err(elapsed) => { + Err(_) => { // The reason that we need to retry here is batch scan in // chain/rearrange_chain is waiting for an // uncommitted epoch carried by the CreateMV barrier, which @@ -589,9 +589,8 @@ pub(crate) async fn wait_for_epoch( // CN with the same distribution as the upstream MV. // See #3845 for more details. tracing::warn!( - "wait_epoch {:?} timeout when waiting for version update elapsed {:?}s", - wait_epoch, - elapsed + epoch = wait_epoch, + "wait_epoch timeout when waiting for version update", ); continue; } diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs index 0a5bbf1445417..5242a6eae0784 100644 --- a/src/storage/src/hummock/vacuum.rs +++ b/src/storage/src/hummock/vacuum.rs @@ -21,6 +21,7 @@ use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_object_store::object::ObjectMetadataIter; use risingwave_pb::hummock::{FullScanTask, VacuumTask}; use risingwave_rpc_client::HummockMetaClient; +use thiserror_ext::AsReport; use super::{HummockError, HummockResult}; use crate::hummock::{SstableStore, SstableStoreRef}; @@ -48,7 +49,7 @@ impl Vacuum { tracing::info!("Finished vacuuming SSTs"); } Err(e) => { - tracing::warn!("Failed to report vacuum task: {:#?}", e); + tracing::warn!(error = %e.as_report(), "Failed to report vacuum task"); return false; } } @@ -128,7 +129,7 @@ impl Vacuum { tracing::info!("Finished full scan SSTs"); } Err(e) => { - tracing::warn!("Failed to report full scan task: {:#?}", e); + tracing::warn!(error = %e.as_report(), "Failed to report full scan task"); return false; } } diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index 426c2f9cd035d..de57a3f98f6e5 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -29,6 +29,7 @@ use risingwave_common::hash::VnodeBitmapExt; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange}; use risingwave_hummock_sdk::table_watermark::WatermarkDirection; use thiserror::Error; +use thiserror_ext::AsReport; use tracing::error; use crate::error::{StorageError, StorageResult}; @@ -652,7 +653,7 @@ impl LocalStateStore for MemtableLocalState table_id: self.table_id, }, ) { - error!(err = ?e, "failed to write delete ranges of table watermark"); + error!(error = %e.as_report(), "failed to write delete ranges of table watermark"); } } } diff --git a/src/storage/src/monitor/hummock_state_store_metrics.rs b/src/storage/src/monitor/hummock_state_store_metrics.rs index 45b72bd1eee86..6072d2676f492 100644 --- a/src/storage/src/monitor/hummock_state_store_metrics.rs +++ b/src/storage/src/monitor/hummock_state_store_metrics.rs @@ -29,6 +29,7 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::{ register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, }; +use thiserror_ext::AsReport; use tracing::warn; /// [`HummockStateStoreMetrics`] stores the performance and IO metrics of `XXXStore` such as @@ -516,8 +517,8 @@ pub fn monitor_cache(memory_collector: Arc) { let collector = Box::new(StateStoreCollector::new(memory_collector)); if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) { warn!( - "unable to monitor cache. May have been registered if in all-in-one deployment: {:?}", - e + "unable to monitor cache. May have been registered if in all-in-one deployment: {}", + e.as_report() ); } } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index facea1f090903..4043015163449 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -22,6 +22,7 @@ use futures_async_stream::try_stream; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; +use thiserror_ext::AsReport; use tokio::time::Instant; use tracing::error; @@ -99,7 +100,7 @@ impl MonitoredStateStore { let iter_stream = iter_stream_future .verbose_instrument_await("store_create_iter") .await - .inspect_err(|e| error!("Failed in iter: {:?}", e))?; + .inspect_err(|e| error!(error = %e.as_report(), "Failed in iter"))?; self.storage_metrics .iter_init_duration @@ -153,7 +154,7 @@ impl MonitoredStateStore { .verbose_instrument_await("store_get") .instrument(tracing::trace_span!("store_get")) .await - .inspect_err(|e| error!("Failed in get: {:?}", e))?; + .inspect_err(|e| error!(error = %e.as_report(), "Failed in get"))?; timer.observe_duration(); @@ -303,7 +304,7 @@ impl StateStore for MonitoredStateStore { self.inner .try_wait_epoch(epoch) .verbose_instrument_await("store_wait_epoch") - .inspect_err(|e| error!("Failed in wait_epoch: {:?}", e)) + .inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch")) } async fn sync(&self, epoch: u64) -> StorageResult { @@ -315,7 +316,7 @@ impl StateStore for MonitoredStateStore { .sync(epoch) .instrument_await("store_sync") .await - .inspect_err(|e| error!("Failed in sync: {:?}", e))?; + .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?; timer.observe_duration(); if sync_result.sync_size != 0 { self.storage_metrics @@ -340,7 +341,7 @@ impl StateStore for MonitoredStateStore { self.inner .clear_shared_buffer() .verbose_instrument_await("store_clear_shared_buffer") - .inspect_err(|e| error!("Failed in clear_shared_buffer: {:?}", e)) + .inspect_err(|e| error!(error = %e.as_report(), "Failed in clear_shared_buffer")) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { @@ -393,7 +394,7 @@ impl MonitoredStateStoreIter { while let Some((key, value)) = inner .try_next() .await - .inspect_err(|e| error!("Failed in next: {:?}", e))? + .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))? { stats.total_items += 1; stats.total_size += key.encoded_len() + value.len();