diff --git a/Cargo.lock b/Cargo.lock index fac9e97c5f8b9..587221e3c4dd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8490,6 +8490,7 @@ dependencies = [ "serde_with", "serde_yaml", "tempfile", + "thiserror-ext", "tracing", "tracing-subscriber", "workspace-hack", @@ -8622,6 +8623,7 @@ dependencies = [ "risingwave_stream", "serde", "serde_yaml", + "thiserror-ext", "tokio-stream", "toml 0.8.2", "tracing", @@ -9511,6 +9513,7 @@ dependencies = [ "sea-orm", "serde", "serde_json", + "thiserror-ext", "tracing", "workspace-hack", ] @@ -9536,6 +9539,7 @@ dependencies = [ "risingwave_pb", "sea-orm", "sync-point", + "thiserror-ext", "tokio-stream", "tracing", "workspace-hack", diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml index 9dcc493117d9b..86e88b53a6149 100644 --- a/src/bench/Cargo.toml +++ b/src/bench/Cargo.toml @@ -34,6 +34,7 @@ risingwave_storage = { workspace = true } risingwave_stream = { workspace = true } serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", "rt", diff --git a/src/bench/sink_bench/main.rs b/src/bench/sink_bench/main.rs index 943c2dc26e1b2..56e681cc3eac5 100644 --- a/src/bench/sink_bench/main.rs +++ b/src/bench/sink_bench/main.rs @@ -51,6 +51,7 @@ use risingwave_pb::connector_service::SinkPayloadFormat; use risingwave_stream::executor::test_utils::prelude::ColumnDesc; use risingwave_stream::executor::{Barrier, Message, MessageStreamItem, StreamExecutorError}; use serde::{Deserialize, Deserializer}; +use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::time::{sleep, Instant}; @@ -298,7 +299,7 @@ where } let log_sinker = sink.new_log_sinker(sink_writer_param).await.unwrap(); if let Err(e) = log_sinker.consume_log_and_sink(&mut log_reader).await { - return Err(e.to_string()); + return Err(e.to_report_string()); } Err("Stream closed".to_string()) } diff --git a/src/expr/core/src/error.rs b/src/expr/core/src/error.rs index a9646c1d77035..925d237374317 100644 --- a/src/expr/core/src/error.rs +++ b/src/expr/core/src/error.rs @@ -156,7 +156,7 @@ impl MultiExprError { impl Display for MultiExprError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for (i, e) in self.0.iter().enumerate() { - writeln!(f, "{i}: {e}")?; + writeln!(f, "{i}: {}", e.as_report())?; } Ok(()) } diff --git a/src/expr/impl/src/scalar/external/iceberg.rs b/src/expr/impl/src/scalar/external/iceberg.rs index 3973efee559d6..cd616aa5e475a 100644 --- a/src/expr/impl/src/scalar/external/iceberg.rs +++ b/src/expr/impl/src/scalar/external/iceberg.rs @@ -29,6 +29,7 @@ use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; use risingwave_expr::expr::BoxedExpression; use risingwave_expr::{build_function, ExprError, Result}; +use thiserror_ext::AsReport; pub struct IcebergTransform { child: BoxedExpression, @@ -93,23 +94,29 @@ fn build(return_type: DataType, mut children: Vec) -> Result Result auto_parallelism, SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => { - let fixed_parallelism = v.parse().map_err(|e| { + let fixed_parallelism = v.parse::().map_err(|e| { ErrorCode::InvalidInputSyntax(format!( "target parallelism must be a valid number or auto: {}", - e + e.as_report() )) })?; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f66bc33e32942..82023515d2c47 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::rc::Rc; use std::sync::LazyLock; +use anyhow::Context; use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; @@ -385,8 +386,7 @@ pub(crate) async fn bind_columns_from_source( (Format::Plain, Encode::Csv) => { let chars = consume_string_from_options(&mut format_encode_options_to_consume, "delimiter")?.0; - let delimiter = - get_delimiter(chars.as_str()).map_err(|e| RwError::from(e.to_string()))?; + let delimiter = get_delimiter(chars.as_str()).context("failed to parse delimiter")?; let has_header = try_consume_string_from_options( &mut format_encode_options_to_consume, "without_header", diff --git a/src/meta/node/Cargo.toml b/src/meta/node/Cargo.toml index 4c1237dc16d24..a799c99b98c88 100644 --- a/src/meta/node/Cargo.toml +++ b/src/meta/node/Cargo.toml @@ -41,6 +41,7 @@ sea-orm = { version = "0.12.0", features = [ ] } serde = { version = "1", features = ["derive"] } serde_json = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index acad2cfe73c64..71486dbece5cb 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use either::Either; use etcd_client::ConnectOptions; use futures::future::join_all; @@ -68,6 +69,7 @@ use risingwave_pb::meta::SystemParams; use risingwave_pb::user::user_service_server::UserServiceServer; use risingwave_rpc_client::ComputeClientPool; use sea_orm::{ConnectionTrait, DbBackend}; +use thiserror_ext::AsReport; use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver}; use tokio::sync::watch; use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; @@ -165,7 +167,7 @@ pub async fn rpc_serve( let client = EtcdClient::connect(endpoints.clone(), Some(options.clone()), auth_enabled) .await - .map_err(|e| anyhow::anyhow!("failed to connect etcd {}", e))?; + .context("failed to connect etcd")?; let meta_store = EtcdMetaStore::new(client).into_ref(); if election_client.is_none() { @@ -234,7 +236,7 @@ pub fn rpc_serve_with_store( .run_once(lease_interval_secs as i64, stop_rx.clone()) .await { - tracing::error!("election error happened, {}", e.to_string()); + tracing::error!(error = %e.as_report(), "election error happened"); } }); @@ -252,8 +254,8 @@ pub fn rpc_serve_with_store( tokio::select! { _ = svc_shutdown_rx_clone.changed() => return, res = is_leader_watcher.changed() => { - if let Err(err) = res { - tracing::error!("leader watcher recv failed {}", err.to_string()); + if res.is_err() { + tracing::error!("leader watcher recv failed"); } } } @@ -284,8 +286,8 @@ pub fn rpc_serve_with_store( return; } res = is_leader_watcher.changed() => { - if let Err(err) = res { - tracing::error!("leader watcher recv failed {}", err.to_string()); + if res.is_err() { + tracing::error!("leader watcher recv failed"); } } } @@ -771,13 +773,13 @@ pub async fn start_service_as_election_leader( match tokio::time::timeout(Duration::from_secs(1), join_all(handles)).await { Ok(results) => { for result in results { - if let Err(err) = result { - tracing::warn!("Failed to join shutdown: {:?}", err); + if result.is_err() { + tracing::warn!("Failed to join shutdown"); } } } - Err(e) => { - tracing::warn!("Join shutdown timeout: {:?}", e); + Err(_e) => { + tracing::warn!("Join shutdown timeout"); } } }; diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index c9b6619565cd7..2ba993d7eab55 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -35,6 +35,7 @@ sea-orm = { version = "0.12.0", features = [ "macros", ] } sync-point = { path = "../../utils/sync-point" } +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/meta/service/src/cloud_service.rs b/src/meta/service/src/cloud_service.rs index 3b490ef7b37d0..2ee28b1427edc 100644 --- a/src/meta/service/src/cloud_service.rs +++ b/src/meta/service/src/cloud_service.rs @@ -22,13 +22,14 @@ use risingwave_connector::source::kafka::private_link::insert_privatelink_broker use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, }; -use risingwave_meta::manager::MetadataManager; +use risingwave_meta::manager::{ConnectionId, MetadataManager}; use risingwave_pb::catalog::connection::Info::PrivateLinkService; use risingwave_pb::cloud_service::cloud_service_server::CloudService; use risingwave_pb::cloud_service::rw_cloud_validate_source_response::{Error, ErrorType}; use risingwave_pb::cloud_service::{ RwCloudValidateSourceRequest, RwCloudValidateSourceResponse, SourceType, }; +use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; use crate::rpc::cloud_provider::AwsEc2Client; @@ -77,8 +78,11 @@ impl CloudService for CloudServiceImpl { // if connection_id provided, check whether endpoint service is available and resolve // broker rewrite map currently only support aws privatelink connection if let Some(connection_id_str) = source_cfg.get("connection.id") { - let connection_id = connection_id_str.parse().map_err(|e| { - Status::invalid_argument(format!("connection.id is not an integer: {}", e)) + let connection_id = connection_id_str.parse::().map_err(|e| { + Status::invalid_argument(format!( + "connection.id is not an integer: {}", + e.as_report() + )) })?; let connection = match &self.metadata_manager { @@ -97,7 +101,7 @@ impl CloudService for CloudServiceImpl { if let Err(e) = connection { return Ok(new_rwc_validate_fail_response( ErrorType::PrivatelinkConnectionNotFound, - e.to_string(), + e.to_report_string(), )); } if let Some(PrivateLinkService(service)) = connection.unwrap().info { @@ -115,7 +119,7 @@ impl CloudService for CloudServiceImpl { Err(e) => { return Ok(new_rwc_validate_fail_response( ErrorType::PrivatelinkUnavailable, - e.to_string(), + e.to_report_string(), )); } Ok(false) => { diff --git a/src/meta/service/src/cluster_service.rs b/src/meta/service/src/cluster_service.rs index e94c40fe52719..2d2c9751689a4 100644 --- a/src/meta/service/src/cluster_service.rs +++ b/src/meta/service/src/cluster_service.rs @@ -22,6 +22,7 @@ use risingwave_pb::meta::{ ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse, }; +use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; use crate::MetaError; @@ -64,7 +65,7 @@ impl ClusterService for ClusterServiceImpl { return Ok(Response::new(AddWorkerNodeResponse { status: Some(risingwave_pb::common::Status { code: risingwave_pb::common::status::Code::UnknownWorker as i32, - message: format!("{}", e), + message: e.to_report_string(), }), node_id: None, })); diff --git a/src/meta/service/src/heartbeat_service.rs b/src/meta/service/src/heartbeat_service.rs index 7770acc06db59..d2c40b7b4ddb8 100644 --- a/src/meta/service/src/heartbeat_service.rs +++ b/src/meta/service/src/heartbeat_service.rs @@ -16,6 +16,7 @@ use itertools::Itertools; use risingwave_meta::manager::MetadataManager; use risingwave_pb::meta::heartbeat_service_server::HeartbeatService; use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse}; +use thiserror_ext::AsReport; use tonic::{Request, Response, Status}; #[derive(Clone)] @@ -58,7 +59,7 @@ impl HeartbeatService for HeartbeatServiceImpl { return Ok(Response::new(HeartbeatResponse { status: Some(risingwave_pb::common::Status { code: risingwave_pb::common::status::Code::UnknownWorker as i32, - message: format!("{}", e), + message: e.to_report_string(), }), })); } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index a082b723dd124..953bd93b1e756 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -24,6 +24,7 @@ use risingwave_pb::hummock::get_compaction_score_response::PickerInfo; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService; use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent; use risingwave_pb::hummock::*; +use thiserror_ext::AsReport; use tonic::{Request, Response, Status, Streaming}; use crate::hummock::compaction::selector::ManualCompactionOption; @@ -308,7 +309,7 @@ impl HummockManagerService for HummockServiceImpl { tracing::info!("Full GC results {} SSTs to delete", number); } Err(e) => { - tracing::warn!("Full GC SST failed: {:#?}", e); + tracing::warn!(error = %e.as_report(), "Full GC SST failed"); } } }); diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 5392e5ef8699a..d6e90f0b5c7d4 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -28,6 +28,7 @@ use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use thiserror_ext::AsReport; use tokio::task::JoinHandle; use crate::backup_restore::meta_snapshot_builder; @@ -134,10 +135,10 @@ impl BackupManager { if let Err(e) = self.set_store(new_config.clone()).await { // Retry is driven by periodic system params notification. tracing::warn!( - "failed to apply new backup config: url={}, dir={}, {:#?}", - new_config.0, - new_config.1, - e + url = &new_config.0, + dir = &new_config.1, + error = %e.as_report(), + "failed to apply new backup config", ); } } @@ -269,7 +270,7 @@ impl BackupManager { } BackupJobResult::Failed(e) => { self.metrics.job_latency_failure.observe(job_latency); - let message = format!("failed backup job {}: {}", job_id, e); + let message = format!("failed backup job {}: {}", job_id, e.as_report()); tracing::warn!(message); self.latest_job_info .store(Arc::new((job_id, BackupJobStatus::Failed, message))); diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 084aa8ad7192c..250477ee30410 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -24,6 +24,7 @@ use risingwave_hummock_sdk::version_checkpoint_path; use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_pb::hummock::PbHummockVersionCheckpoint; +use thiserror_ext::AsReport; use crate::backup_restore::restore_impl::v1::{LoaderV1, WriterModelV1ToMetaStoreV1}; use crate::backup_restore::restore_impl::v2::{LoaderV2, WriterModelV2ToMetaStoreV2}; @@ -193,7 +194,7 @@ pub async fn restore(opts: RestoreOpts) -> BackupResult<()> { tracing::info!("command succeeded"); } Err(e) => { - tracing::warn!("command failed: {}", e); + tracing::warn!(error = %e.as_report(), "command failed"); } } result diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 3e70d4d9aaaef..0e40085a2b97c 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use etcd_client::ConnectOptions; use risingwave_backup::error::BackupResult; use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; @@ -74,7 +75,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult Ok(MetaStoreBackendImpl::Mem(MemStore::new())), diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ae75453d8015a..de88c1ae17608 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -35,6 +35,7 @@ use risingwave_pb::stream_plan::{ UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; +use thiserror_ext::AsReport; use uuid::Uuid; use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo}; @@ -857,7 +858,7 @@ impl CommandContext { let table_id = table_fragments.table_id().table_id; tracing::warn!( table_id, - reason=?e, + error = %e.as_report(), "cancel_create_table_procedure failed for CancelStreamingJob", ); // If failed, check that table is not in meta store. diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index f91262117be16..0884560045398 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -37,6 +37,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::BarrierCompleteResponse; +use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -668,7 +669,7 @@ impl GlobalBarrierManager { // back to frontend fail_point!("inject_barrier_err_success"); let fail_node = self.checkpoint_control.barrier_failed(); - tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err); + tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); self.failure_recovery(err, fail_node).await; return; } @@ -693,7 +694,7 @@ impl GlobalBarrierManager { .drain(index..) .chain(self.checkpoint_control.barrier_failed().into_iter()) .collect_vec(); - tracing::warn!("Failed to commit epoch {}: {:?}", prev_epoch, err); + tracing::warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); self.failure_recovery(err, fail_nodes).await; } } @@ -728,7 +729,7 @@ impl GlobalBarrierManager { let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( "failure_recovery", - %err, + error = %err.as_report(), prev_epoch = prev_epoch.value().0 ); @@ -741,7 +742,7 @@ impl GlobalBarrierManager { .await; self.context.set_status(BarrierManagerStatus::Running).await; } else { - panic!("failed to execute barrier: {:?}", err); + panic!("failed to execute barrier: {}", err.as_report()); } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index d9a8b58226456..e0ace5f9678a4 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::TryStreamExt; @@ -31,6 +31,7 @@ use risingwave_pb::stream_plan::AddMutation; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, ForceStopActorsRequest, UpdateActorsRequest, }; +use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, warn, Instrument}; @@ -197,9 +198,7 @@ impl GlobalBarrierManagerContext { tokio::spawn(async move { let res: MetaResult<()> = try { tracing::debug!("recovering stream job {}", table.id); - finished - .await - .map_err(|e| anyhow!("failed to finish command: {}", e))?; + finished.await.context("failed to finish command")?; tracing::debug!("finished stream job {}", table.id); // Once notified that job is finished we need to notify frontend. @@ -212,8 +211,9 @@ impl GlobalBarrierManagerContext { }; if let Err(e) = res.as_ref() { tracing::error!( - "stream job {} interrupted, will retry after recovery: {e:?}", - table.id + id = table.id, + error = %e.as_report(), + "stream job interrupted, will retry after recovery", ); // NOTE(kwannoel): We should not cleanup stream jobs, // we don't know if it's just due to CN killed, @@ -283,16 +283,15 @@ impl GlobalBarrierManagerContext { tokio::spawn(async move { let res: MetaResult<()> = try { tracing::debug!("recovering stream job {}", id); - finished - .await - .map_err(|e| anyhow!("failed to finish command: {}", e))?; - tracing::debug!("finished stream job {}", id); + finished.await.ok().context("failed to finish command")?; + tracing::debug!(id, "finished stream job"); catalog_controller.finish_streaming_job(id).await?; }; if let Err(e) = &res { tracing::error!( - "stream job {} interrupted, will retry after recovery: {e:?}", - id + id, + error = %e.as_report(), + "stream job interrupted, will retry after recovery", ); // NOTE(kwannoel): We should not cleanup stream jobs, // we don't know if it's just due to CN killed, @@ -354,7 +353,7 @@ impl GlobalBarrierManagerContext { let mut info = if self.env.opts.enable_scale_in_when_recovery { let info = self.resolve_actor_info().await; let scaled = self.scale_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "scale actors failed"); + warn!(error = %err.as_report(), "scale actors failed"); })?; if scaled { self.resolve_actor_info().await @@ -364,13 +363,13 @@ impl GlobalBarrierManagerContext { } else { // Migrate actors in expired CN to newly joined one. self.migrate_actors().await.inspect_err(|err| { - warn!(err = ?err, "migrate actors failed"); + warn!(error = %err.as_report(), "migrate actors failed"); })? }; // Reset all compute nodes, stop and drop existing actors. self.reset_compute_nodes(&info).await.inspect_err(|err| { - warn!(err = ?err, "reset compute nodes failed"); + warn!(error = %err.as_report(), "reset compute nodes failed"); })?; if scheduled_barriers.pre_apply_drop_scheduled().await { @@ -379,10 +378,10 @@ impl GlobalBarrierManagerContext { // update and build all actors. self.update_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "update actors failed"); + warn!(error = %err.as_report(), "update actors failed"); })?; self.build_actors(&info).await.inspect_err(|err| { - warn!(err = ?err, "build_actors failed"); + warn!(error = %err.as_report(), "build_actors failed"); })?; // get split assignments for all actors @@ -424,14 +423,14 @@ impl GlobalBarrierManagerContext { let res = match await_barrier_complete.await.result { Ok(response) => { if let Err(err) = command_ctx.post_collect().await { - warn!(err = ?err, "post_collect failed"); + warn!(error = %err.as_report(), "post_collect failed"); Err(err) } else { Ok((new_epoch.clone(), response)) } } Err(err) => { - warn!(err = ?err, "inject_barrier failed"); + warn!(error = %err.as_report(), "inject_barrier failed"); Err(err) } }; @@ -674,8 +673,8 @@ impl GlobalBarrierManagerContext { .await { tracing::error!( - "failed to apply reschedule for offline scaling in recovery: {}", - e.to_string() + error = %e.as_report(), + "failed to apply reschedule for offline scaling in recovery", ); mgr.fragment_manager diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index b9661a37d8e83..55c9fce4c4081 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -98,9 +98,9 @@ impl GlobalBarrierManagerContext { } rx.map(move |result| match result { Ok(completion) => completion, - Err(e) => BarrierCompletion { + Err(_e) => BarrierCompletion { prev_epoch, - result: Err(anyhow!("failed to receive barrier completion result: {:?}", e).into()), + result: Err(anyhow!("failed to receive barrier completion result").into()), }, }) } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index aab3234d620cb..26fd3ea8143ef 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Instant; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use risingwave_common::catalog::TableId; use risingwave_pb::hummock::HummockSnapshot; @@ -279,20 +279,17 @@ impl BarrierScheduler { for (injected_rx, collect_rx, finish_rx) in contexts { // Wait for this command to be injected, and record the result. - let info = injected_rx - .await - .map_err(|e| anyhow!("failed to inject barrier: {}", e))?; + let info = injected_rx.await.ok().context("failed to inject barrier")?; infos.push(info); // Throw the error if it occurs when collecting this barrier. collect_rx .await - .map_err(|e| anyhow!("failed to collect barrier: {}", e))??; + .ok() + .context("failed to collect barrier")??; // Wait for this command to be finished. - finish_rx - .await - .map_err(|e| anyhow!("failed to finish command: {}", e))?; + finish_rx.await.ok().context("failed to finish command")?; } Ok(infos) diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index f8b1490e80062..042993bd5bd4c 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -42,6 +42,7 @@ use sea_orm::{ ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, TransactionTrait, }; +use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; @@ -268,7 +269,7 @@ impl ClusterController { { Ok(keys) => keys, Err(err) => { - tracing::warn!("Failed to load expire worker info from db: {}", err); + tracing::warn!(error = %err.as_report(), "Failed to load expire worker info from db"); continue; } }; @@ -278,7 +279,7 @@ impl ClusterController { .exec(&inner.db) .await { - tracing::warn!("Failed to delete expire workers from db: {}", err); + tracing::warn!(error = %err.as_report(), "Failed to delete expire workers from db"); continue; } diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 357c4a4cc887a..8640088f30193 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -31,6 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::heartbeat_request; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use thiserror_ext::AsReport; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; @@ -400,11 +401,11 @@ impl ClusterManager { } Err(err) => { tracing::warn!( - "Failed to delete expired worker {} {:#?}, current timestamp {}. {:?}", + error = %err.as_report(), + "Failed to delete expired worker {} {:#?}, current timestamp {}", worker_id, key, now, - err, ); } } diff --git a/src/meta/src/manager/id.rs b/src/meta/src/manager/id.rs index f8373e160961e..377589886ec6c 100644 --- a/src/meta/src/manager/id.rs +++ b/src/meta/src/manager/id.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use risingwave_common::catalog::{NON_RESERVED_SYS_CATALOG_ID, NON_RESERVED_USER_ID}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use thiserror_ext::AsReport; use tokio::sync::RwLock; use crate::manager::cluster::META_NODE_ID; @@ -57,7 +58,7 @@ impl StoredIdGenerator { let current_id = match res { Ok(value) => memcomparable::from_slice(&value).unwrap(), Err(MetaStoreError::ItemNotFound(_)) => start.unwrap_or(0), - Err(e) => panic!("{:?}", e), + Err(e) => panic!("{}", e.as_report()), }; let next_allocate_id = current_id + ID_PREALLOCATE_INTERVAL; @@ -69,7 +70,7 @@ impl StoredIdGenerator { ) .await { - panic!("{:?}", err) + panic!("{}", err.as_report()); } StoredIdGenerator { diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 2319ce35d5c8c..3132f662cd765 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -23,6 +23,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{ MetaSnapshot, Relation, RelationGroup, SubscribeResponse, SubscribeType, }; +use thiserror_ext::AsReport; use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::Mutex; use tonic::Status; @@ -265,7 +266,7 @@ impl NotificationManager { let mut core_guard = self.core.lock().await; core_guard.local_senders.retain(|sender| { if let Err(err) = sender.send(notification.clone()) { - tracing::warn!("Failed to notify local subscriber. {}", err); + tracing::warn!(error = %err.as_report(), "Failed to notify local subscriber"); return false; } true diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 24977acc57607..bebef2d307dcc 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -30,6 +30,7 @@ use risingwave_pb::connector_service::coordinate_response::{ use risingwave_pb::connector_service::{ coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, SinkMetadata, }; +use thiserror_ext::AsReport; use tokio::sync::mpsc::UnboundedReceiver; use tonic::Status; use tracing::{error, warn}; @@ -62,8 +63,9 @@ impl CoordinatorWorker { Ok(sink) => sink, Err(e) => { error!( - "unable to build sink with param {:?}: {:?}", - first_writer_request.param, e + error = %e.as_report(), + "unable to build sink with param {:?}", + first_writer_request.param ); send_await_with_err_check!( first_writer_request.response_tx, @@ -77,8 +79,9 @@ impl CoordinatorWorker { Ok(coordinator) => coordinator, Err(e) => { error!( - "unable to build coordinator with param {:?}: {:?}", - first_writer_request.param, e + error = %e.as_report(), + "unable to build coordinator with param {:?}", + first_writer_request.param ); send_await_with_err_check!( first_writer_request.response_tx, @@ -149,10 +152,9 @@ impl CoordinatorWorker { Either::Right((Some(Ok(None)), _)) => Err(anyhow!( "one sink writer stream reaches the end before initialize" )), - Either::Right((Some(Err(e)), _)) => Err(anyhow!( - "unable to poll from one sink writer stream: {:?}", - e - )), + Either::Right((Some(Err(e)), _)) => { + Err(anyhow!(e).context("unable to poll from one sink writer stream")) + } Either::Right((None, _)) => unreachable!("request_streams must not be empty"), } } @@ -265,10 +267,8 @@ impl CoordinatorWorker { )); } Err(e) => { - return Err(anyhow!( - "failed to poll from one of the writer request streams: {:?}", - e - )); + return Err(anyhow!(e) + .context("failed to poll from one of the writer request streams")); } }, Either::Right((None, _)) => { @@ -285,17 +285,16 @@ impl CoordinatorWorker { async fn start_coordination(&mut self, mut coordinator: impl SinkCommitCoordinator) { let result: Result<(), ()> = try { coordinator.init().await.map_err(|e| { - error!("failed to initialize coordinator: {:?}", e); + error!(error = %e.as_report(), "failed to initialize coordinator"); })?; loop { let (epoch, metadata_list) = self.collect_all_metadata().await.map_err(|e| { - error!("failed to collect all metadata: {:?}", e); + error!(error = %e.as_report(), "failed to collect all metadata"); })?; // TODO: measure commit time - coordinator - .commit(epoch, metadata_list) - .await - .map_err(|e| error!("failed to commit metadata of epoch {}: {:?}", epoch, e))?; + coordinator.commit(epoch, metadata_list).await.map_err( + |e| error!(epoch, error = %e.as_report(), "failed to commit metadata of epoch"), + )?; self.send_to_all_sink_writers(|| { Ok(CoordinateResponse { diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 2c1d248565d48..d174b8aca7c59 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -25,6 +25,7 @@ use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::coordinate_request::Msg; use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse}; use rw_futures_util::pending_on_none; +use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, Receiver, Sender}; @@ -292,14 +293,15 @@ impl ManagerWorker { match join_result { Ok(()) => { info!( - "sink coordinator of {} has gracefully finished", - sink_id.sink_id + id = sink_id.sink_id, + "sink coordinator has gracefully finished", ); } Err(err) => { error!( - "sink coordinator of {} finished with error {:?}", - sink_id.sink_id, err + id = sink_id.sink_id, + error = %err.as_report(), + "sink coordinator finished with error", ); } } diff --git a/src/meta/src/model/notification.rs b/src/meta/src/model/notification.rs index be3784836e5b0..0ea30bd61d7bf 100644 --- a/src/meta/src/model/notification.rs +++ b/src/meta/src/model/notification.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use thiserror_ext::AsReport; + use crate::storage::{MetaStore, MetaStoreError, DEFAULT_COLUMN_FAMILY}; /// `NotificationVersion` records the last sent notification version, this will be stored @@ -31,7 +33,7 @@ impl NotificationVersion { { Ok(byte_vec) => memcomparable::from_slice(&byte_vec).unwrap(), Err(MetaStoreError::ItemNotFound(_)) => 0, - Err(e) => panic!("{:?}", e), + Err(e) => panic!("{}", e.as_report()), }; Self(version) } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8a017f6aa2e50..e4296f7f403c2 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -57,6 +57,7 @@ use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph, StreamFragmentGraph as StreamFragmentGraphProto, }; +use thiserror_ext::AsReport; use tokio::sync::Semaphore; use tokio::time::sleep; use tracing::log::warn; @@ -779,7 +780,7 @@ impl DdlController { .await; match result { Err(e) => { - tracing::error!(id=stream_job_id, error = ?e, "finish stream job failed") + tracing::error!(id = stream_job_id, error = %e.as_report(), "finish stream job failed") } Ok(_) => { tracing::info!(id = stream_job_id, "finish stream job succeeded") @@ -1079,7 +1080,7 @@ impl DdlController { let job_id = stream_job.id(); tracing::debug!(id = job_id, "creating stream job"); - let result = try { + let result: MetaResult<()> = try { // Add table fragments to meta store with state: `State::Initial`. mgr.fragment_manager .start_create_table_fragments(table_fragments.clone()) @@ -1093,7 +1094,7 @@ impl DdlController { if let Err(e) = result { match stream_job.create_type() { CreateType::Background => { - tracing::error!(id = job_id, error = ?e, "finish stream job failed"); + tracing::error!(id = job_id, error = %e.as_report(), "finish stream job failed"); let should_cancel = match mgr .fragment_manager .select_table_fragments_by_table_id(&job_id.into()) @@ -1428,7 +1429,10 @@ impl DdlController { .await; creating_internal_table_ids.push(table.id); if let Err(e) = result { - tracing::warn!("Failed to cancel create table procedure, perhaps barrier manager has already cleaned it. Reason: {e:#?}"); + tracing::warn!( + error = %e.as_report(), + "Failed to cancel create table procedure, perhaps barrier manager has already cleaned it." + ); } } StreamingJob::Sink(sink, target_table) => { @@ -1450,7 +1454,10 @@ impl DdlController { ) .await; if let Err(e) = result { - tracing::warn!("Failed to cancel create table procedure, perhaps barrier manager has already cleaned it. Reason: {e:#?}"); + tracing::warn!( + error = %e.as_report(), + "Failed to cancel create table procedure, perhaps barrier manager has already cleaned it." + ); } } creating_internal_table_ids.push(table.id); diff --git a/src/meta/src/rpc/election/etcd.rs b/src/meta/src/rpc/election/etcd.rs index 3cfb0bea6eba2..96b16f537356e 100644 --- a/src/meta/src/rpc/election/etcd.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -18,6 +18,7 @@ use std::time::Duration; use etcd_client::{ConnectOptions, Error, GetOptions, LeaderKey, ResignOptions}; use risingwave_common::bail; +use thiserror_ext::AsReport; use tokio::sync::watch::Receiver; use tokio::sync::{oneshot, watch}; use tokio::time; @@ -118,9 +119,9 @@ impl ElectionClient for EtcdElectionClient { Ok(resp) => resp, Err(e) => { tracing::warn!( - "create lease keeper for {} failed {}", + error = %e.as_report(), + "create lease keeper for {} failed", lease_id, - e.to_string() ); keep_alive_fail_tx.send(()).unwrap(); return; @@ -148,7 +149,7 @@ impl ElectionClient for EtcdElectionClient { _ = ticker.tick(), if !keep_alive_sending => { if let Err(err) = keeper.keep_alive().await { - tracing::debug!("keep alive for lease {} failed {}", lease_id, err); + tracing::debug!(error = %err.as_report(), "keep alive for lease {} failed", lease_id); continue } @@ -179,7 +180,7 @@ impl ElectionClient for EtcdElectionClient { continue; } Err(e) => { - tracing::error!("lease keeper failed {}", e.to_string()); + tracing::error!(error = %e.as_report(), "lease keeper failed"); continue; } }; @@ -264,7 +265,7 @@ impl ElectionClient for EtcdElectionClient { } } Some(Err(e)) => { - tracing::warn!("error {} received from leader observe stream", e.to_string()); + tracing::warn!(error = %e.as_report(), "error received from leader observe stream"); continue } } diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index b076f542a3700..65c3ad613dde0 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -20,6 +20,7 @@ use sea_orm::{ ConnectionTrait, DatabaseBackend, DatabaseConnection, FromQueryResult, Statement, TransactionTrait, Value, }; +use thiserror_ext::AsReport; use tokio::sync::watch; use tokio::sync::watch::Receiver; use tokio::time; @@ -614,7 +615,7 @@ where .update_heartbeat(META_ELECTION_KEY, id.as_str()) .await { - tracing::debug!("keep alive for member {} failed {}", id, e); + tracing::debug!(error = %e.as_report(), "keep alive for member {} failed", id); continue } } @@ -669,7 +670,7 @@ where if is_leader { tracing::info!("leader {} resigning", self.id); if let Err(e) = self.driver.resign(META_ELECTION_KEY, self.id.as_str()).await { - tracing::warn!("resign failed {}", e); + tracing::warn!(error = %e.as_report(), "resign failed"); } } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 00a8c18885dcc..2674975488e45 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -42,6 +42,7 @@ use risingwave_pb::stream_plan::{DispatcherType, FragmentTypeFlag, StreamActor, use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, UpdateActorsRequest, }; +use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use tokio::task::JoinHandle; @@ -1780,13 +1781,8 @@ impl ScaleController { .flatten() .cloned() .exactly_one() - .map_err(|e| { - anyhow!( - "Cannot find a single target ParallelUnit for fragment {}: {}", - fragment_id, - e - ) - })?; + .ok() + .with_context(|| format!("Cannot find a single target ParallelUnit for fragment {fragment_id}"))?; target_plan.insert( fragment_id, @@ -2558,7 +2554,7 @@ impl GlobalStreamManager { changed = false; } Err(e) => { - tracing::warn!(error = e.to_string(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs()); + tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs()); ticker.reset(); } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 9f99aa0b405d8..f3ce3816522b9 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -31,6 +31,7 @@ use risingwave_connector::source::{ use risingwave_pb::catalog::Source; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_rpc_client::ConnectorClient; +use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{oneshot, Mutex}; use tokio::task::JoinHandle; @@ -172,11 +173,11 @@ impl ConnectorSourceWorker

{ _ = interval.tick() => { if self.fail_cnt > MAX_FAIL_CNT { if let Err(e) = self.refresh().await { - tracing::error!("error happened when refresh from connector source worker: {}", e.to_string()); + tracing::error!(error = %e.as_report(), "error happened when refresh from connector source worker"); } } if let Err(e) = self.tick().await { - tracing::error!("error happened when tick from connector source worker: {}", e.to_string()); + tracing::error!(error = %e.as_report(), "error happened when tick from connector source worker"); } } } @@ -289,7 +290,7 @@ impl SourceManagerCore { { Ok(actor_ids) => actor_ids, Err(err) => { - tracing::warn!("Failed to get the actor of the fragment {}, maybe the fragment doesn't exist anymore", err.to_string()); + tracing::warn!(error = %err.as_report(), "Failed to get the actor of the fragment, maybe the fragment doesn't exist anymore"); continue; } }; @@ -709,8 +710,11 @@ impl SourceManager { handle .sync_call_tx .send(tx) - .map_err(|e| anyhow!(e.to_string()))?; - rx.await.map_err(|e| anyhow!(e.to_string()))??; + .ok() + .context("failed to send sync call")?; + rx.await + .ok() + .context("failed to receive sync call response")??; } let splits = handle.discovered_splits().await.unwrap(); @@ -926,8 +930,8 @@ impl SourceManager { let _pause_guard = self.paused.lock().await; if let Err(e) = self.tick().await { tracing::error!( - "error happened while running source manager tick: {}", - e.to_string() + error = %e.as_report(), + "error happened while running source manager tick", ); } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 28168d3db64d1..1950a27013f3f 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -27,6 +27,7 @@ use risingwave_pb::stream_plan::Dispatcher; use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, UpdateActorsRequest, }; +use thiserror_ext::AsReport; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Mutex, RwLock}; use tracing::Instrument; @@ -272,7 +273,7 @@ impl GlobalStreamManager { }) .await .inspect_err(|_| { - tracing::warn!("failed to notify failed: {table_id}, err: {err}") + tracing::warn!(error = %err.as_report(), "failed to notify failed: {table_id}") }); } } @@ -601,7 +602,7 @@ impl GlobalStreamManager { .drop_streaming_jobs_impl(streaming_job_ids) .await .inspect_err(|err| { - tracing::error!(error = ?err, "Failed to drop streaming jobs"); + tracing::error!(error = %err.as_report(), "Failed to drop streaming jobs"); }); } } diff --git a/src/risedevtool/Cargo.toml b/src/risedevtool/Cargo.toml index 3b4acc0f9877e..47c5726ff145d 100644 --- a/src/risedevtool/Cargo.toml +++ b/src/risedevtool/Cargo.toml @@ -32,6 +32,7 @@ serde_json = "1" serde_with = "3" serde_yaml = "0.9" tempfile = "3" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/risedevtool/src/bin/risedev-docslt.rs b/src/risedevtool/src/bin/risedev-docslt.rs index d073ff40f3125..627e7b028861f 100644 --- a/src/risedevtool/src/bin/risedev-docslt.rs +++ b/src/risedevtool/src/bin/risedev-docslt.rs @@ -17,6 +17,7 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use itertools::Itertools; +use thiserror_ext::AsReport; use tracing::*; #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -89,7 +90,7 @@ fn main() -> Result<()> { let path = match entry { Ok(path) => path, Err(e) => { - error!("{:?}", e); + error!("{}", e.as_report()); continue; } }; diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index cd53cd019ea64..d627a692735c3 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::time::Duration; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::TryStreamExt; use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; use risingwave_common::monitor::connection::{EndpointExt, TcpConfig}; @@ -30,6 +30,7 @@ use risingwave_pb::connector_service::sink_writer_stream_request::{ }; use risingwave_pb::connector_service::sink_writer_stream_response::CommitResponse; use risingwave_pb::connector_service::*; +use thiserror_ext::AsReport; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::{Channel, Endpoint}; use tonic::Streaming; @@ -150,8 +151,9 @@ impl ConnectorClient { Ok(client) => Some(client), Err(e) => { error!( - "invalid connector endpoint {:?}: {:?}", - connector_endpoint, e + endpoint = connector_endpoint, + error = %e.as_report(), + "invalid connector endpoint", ); None } @@ -162,12 +164,7 @@ impl ConnectorClient { #[allow(clippy::unused_async)] pub async fn new(connector_endpoint: &String) -> Result { let endpoint = Endpoint::from_shared(format!("http://{}", connector_endpoint)) - .map_err(|e| { - RpcError::Internal(anyhow!(format!( - "invalid connector endpoint `{}`: {:?}", - &connector_endpoint, e - ))) - })? + .with_context(|| format!("invalid connector endpoint `{}`", connector_endpoint))? .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE) .initial_stream_window_size(STREAM_WINDOW_SIZE) .connect_timeout(Duration::from_secs(5)); diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 17168d94f3ac6..0485465499f5a 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -33,7 +33,7 @@ use std::future::Future; use std::iter::repeat; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use futures::future::try_join_all; use futures::stream::{BoxStream, Peekable}; @@ -46,7 +46,7 @@ use risingwave_pb::meta::heartbeat_request::extra_info; use tokio::sync::mpsc::{channel, Receiver, Sender}; pub mod error; -use error::{Result, RpcError}; +use error::Result; mod compactor_client; mod compute_client; mod connector_client; @@ -120,9 +120,7 @@ where S::new_clients(addr.clone(), self.connection_pool_size as usize), ) .await - .map_err(|e| -> RpcError { - anyhow!("failed to create RPC client to {addr}: {:?}", e).into() - })? + .with_context(|| format!("failed to create RPC client to {addr}"))? .choose(&mut rand::thread_rng()) .unwrap() .clone()) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 2a9d337fd3ae5..812c522e5e033 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::thread; use std::time::{Duration, SystemTime}; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use either::Either; use futures::stream::BoxStream; @@ -86,6 +86,7 @@ use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::user_service_client::UserServiceClient; use risingwave_pb::user::*; +use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, Receiver, UnboundedSender}; use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot, RwLock}; @@ -739,10 +740,10 @@ impl MetaClient { { Ok(Ok(_)) => {} Ok(Err(err)) => { - tracing::warn!("Failed to send_heartbeat: error {}", err); + tracing::warn!(error = %err.as_report(), "Failed to send_heartbeat"); } - Err(err) => { - tracing::warn!("Failed to send_heartbeat: timeout {}", err); + Err(_) => { + tracing::warn!("Failed to send_heartbeat: timeout"); } } } @@ -1422,7 +1423,7 @@ impl HummockMetaClient for MetaClient { .expect("Clock may have gone backwards") .as_millis() as u64, }) - .map_err(|err| RpcError::Internal(anyhow!(err.to_string())))?; + .context("Failed to subscribe compaction event")?; let stream = self .inner @@ -1438,7 +1439,10 @@ impl HummockMetaClient for MetaClient { #[async_trait] impl TelemetryInfoFetcher for MetaClient { async fn fetch_telemetry_info(&self) -> std::result::Result, String> { - let resp = self.get_telemetry_info().await.map_err(|e| e.to_string())?; + let resp = self + .get_telemetry_info() + .await + .map_err(|e| e.to_report_string())?; let tracking_id = resp.get_tracking_id().ok(); Ok(tracking_id.map(|id| id.to_owned())) } @@ -1574,12 +1578,12 @@ impl MetaMemberManagement { } }; if let Err(err) = client { - tracing::warn!("failed to create client from {}: {}", addr, err); + tracing::warn!(%addr, error = %err.as_report(), "failed to create client"); continue; } match client.unwrap().members(MembersRequest {}).await { Err(err) => { - tracing::warn!("failed to fetch members from {}: {}", addr, err); + tracing::warn!(%addr, error = %err.as_report(), "failed to fetch members"); continue; } Ok(resp) => { @@ -1696,7 +1700,7 @@ impl GrpcMetaClient { let tick_result = member_management.refresh_members().await; if let Err(e) = tick_result.as_ref() { - tracing::warn!("refresh meta member client failed {}", e); + tracing::warn!(error = %e.as_report(), "refresh meta member client failed"); } if let Some(sender) = event { @@ -1777,9 +1781,9 @@ impl GrpcMetaClient { } Err(e) => { tracing::warn!( - "Failed to connect to meta server {}, trying again: {}", + error = %e.as_report(), + "Failed to connect to meta server {}, trying again", addr, - e ) } } @@ -1948,7 +1952,7 @@ impl GrpcMetaClient { .is_ok() { if let Ok(Err(e)) = result_receiver.await { - tracing::warn!("force refresh meta client failed {}", e); + tracing::warn!(error = %e.as_report(), "force refresh meta client failed"); } } else { tracing::debug!("skipping the current refresh, somewhere else is already doing it") diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 269e1dd0490fc..31be9d1601294 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -60,6 +60,7 @@ use risingwave_storage::store::{ use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::table::{KeyedRow, TableDistribution}; use risingwave_storage::StateStore; +use thiserror_ext::AsReport; use tracing::{trace, Instrument}; use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; @@ -207,14 +208,14 @@ fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel let first = match row_serde.deserialize(first) { Ok(rows) => rows, Err(e) => { - error!(err = %e, value = ?first, "fail to deserialize serialized value"); + error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value"); return false; } }; let second = match row_serde.deserialize(second) { Ok(rows) => rows, Err(e) => { - error!(err = %e, value = ?second, "fail to deserialize serialized value"); + error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value"); return false; } }; diff --git a/src/utils/pgwire/src/pg_message.rs b/src/utils/pgwire/src/pg_message.rs index e6211c9a6569b..c69fd6ca5b374 100644 --- a/src/utils/pgwire/src/pg_message.rs +++ b/src/utils/pgwire/src/pg_message.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::ffi::CStr; use std::io::{Error, ErrorKind, IoSlice, Result, Write}; +use anyhow::anyhow; use byteorder::{BigEndian, ByteOrder, NetworkEndian}; /// Part of code learned from . use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -58,7 +59,7 @@ impl FeStartupMessage { Ok(v) => Ok(v.trim_end_matches('\0')), Err(err) => Err(Error::new( ErrorKind::InvalidInput, - format!("Input end error: {}", err), + anyhow!(err).context("Input end error"), )), }?; let mut map = HashMap::new(); @@ -242,12 +243,12 @@ impl FeQueryMessage { Ok(cstr) => cstr.to_str().map_err(|err| { Error::new( ErrorKind::InvalidInput, - format!("Invalid UTF-8 sequence: {}", err), + anyhow!(err).context("Invalid UTF-8 sequence"), ) }), Err(err) => Err(Error::new( ErrorKind::InvalidInput, - format!("Input end error: {}", err), + anyhow!(err).context("Input end error"), )), } } diff --git a/src/utils/pgwire/src/pg_protocol.rs b/src/utils/pgwire/src/pg_protocol.rs index 87eed8e2241f0..83da9f5dc0581 100644 --- a/src/utils/pgwire/src/pg_protocol.rs +++ b/src/utils/pgwire/src/pg_protocol.rs @@ -564,7 +564,9 @@ where ) -> PsqlResult<()> { // Parse sql. let stmts = Parser::parse_sql(&sql) - .inspect_err(|e| tracing::error!("failed to parse sql:\n{}:\n{}", sql, e)) + .inspect_err( + |e| tracing::error!(sql = &*sql, error = %e.as_report(), "failed to parse sql"), + ) .map_err(|err| PsqlError::SimpleQueryError(err.into()))?; if stmts.is_empty() { self.stream.write_no_flush(&BeMessage::EmptyQueryResponse)?; @@ -684,7 +686,9 @@ where let stmt = { let stmts = Parser::parse_sql(sql) - .inspect_err(|e| tracing::error!("failed to parse sql:\n{}:\n{}", sql, e)) + .inspect_err( + |e| tracing::error!(sql, error = %e.as_report(), "failed to parse sql"), + ) .map_err(|err| PsqlError::ExtendedPrepareError(err.into()))?; if stmts.len() > 1 { @@ -1039,7 +1043,7 @@ where let ssl = openssl::ssl::Ssl::new(ssl_ctx).unwrap(); let mut stream = tokio_openssl::SslStream::new(ssl, stream).unwrap(); if let Err(e) = Pin::new(&mut stream).accept().await { - tracing::warn!("Unable to set up an ssl connection, reason: {}", e); + tracing::warn!(error = %e.as_report(), "Unable to set up an ssl connection"); let _ = stream.shutdown().await; return Err(e.into()); } @@ -1081,7 +1085,7 @@ where Conn::Unencrypted(s) => s.write_no_flush(message), Conn::Ssl(s) => s.write_no_flush(message), } - .inspect_err(|error| tracing::error!(%error, "flush error")) + .inspect_err(|error| tracing::error!(error = %error.as_report(), "flush error")) } async fn write(&mut self, message: &BeMessage<'_>) -> io::Result<()> { @@ -1096,7 +1100,7 @@ where Conn::Unencrypted(s) => s.flush().await, Conn::Ssl(s) => s.flush().await, } - .inspect_err(|error| tracing::error!(%error, "flush error")) + .inspect_err(|error| tracing::error!(error = %error.as_report(), "flush error")) } async fn ssl(&mut self, ssl_ctx: &SslContextRef) -> PsqlResult>> {