diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 4edba24c54168..e49331590ff69 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -682,7 +682,7 @@ async fn into_chunk_stream(mut parser: P, data_stream LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - %error, + error = %error.as_report(), split_id = &*msg.split_id, offset = msg.offset, suppressed_count, diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 7a97771dee8ef..2bc4160e7a263 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -107,15 +107,12 @@ impl Sink for NatsSink { "Nats sink only support append-only mode" ))); } - match self.config.common.build_client().await { - Ok(_client) => {} - Err(error) => { - return Err(SinkError::Nats(anyhow!( - "validate nats sink error: {:?}", - error - ))); - } - } + let _client = self + .config + .common + .build_client() + .await + .context("validate nats sink error")?; Ok(()) } @@ -134,7 +131,7 @@ impl NatsSinkWriter { .common .build_context() .await - .map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e)))?; + .map_err(|e| SinkError::Nats(anyhow!(e)))?; Ok::<_, SinkError>(Self { config: config.clone(), context, diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index c76da907e45e4..16314d21dbc1e 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::Duration; -use anyhow::anyhow; +use anyhow::{anyhow, Context as _}; use async_trait::async_trait; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; @@ -106,11 +106,11 @@ impl SplitEnumerator for KafkaSplitEnumerator { } async fn list_splits(&mut self) -> ConnectorResult> { - let topic_partitions = self.fetch_topic_partition().await.map_err(|e| { - anyhow!(format!( - "failed to fetch metadata from kafka ({}), error: {}", - self.broker_address, e - )) + let topic_partitions = self.fetch_topic_partition().await.with_context(|| { + format!( + "failed to fetch metadata from kafka ({})", + self.broker_address + ) })?; let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?; let mut start_offsets = self @@ -154,11 +154,11 @@ impl KafkaSplitEnumerator { expect_start_timestamp_millis: Option, expect_stop_timestamp_millis: Option, ) -> ConnectorResult> { - let topic_partitions = self.fetch_topic_partition().await.map_err(|e| { - anyhow!(format!( - "failed to fetch metadata from kafka ({}), error: {}", - self.broker_address, e - )) + let topic_partitions = self.fetch_topic_partition().await.with_context(|| { + format!( + "failed to fetch metadata from kafka ({})", + self.broker_address + ) })?; // here we are getting the start offset and end offset for each partition with the given diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 8e0c6c7cc1c53..9ed810dfc933a 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -31,6 +31,7 @@ use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioE use risingwave_common::array::{DataChunk, StreamChunk}; use risingwave_common::catalog::ROWID_PREFIX; use risingwave_common::{bail, ensure}; +use thiserror_ext::AsReport; use crate::error::ConnectorResult; use crate::parser::ParserConfig; @@ -370,8 +371,9 @@ impl PulsarIcebergReader { #[for_await] for msg in self.as_stream_chunk_stream() { - let (_chunk, mapping) = - msg.inspect_err(|e| tracing::error!("Failed to read message from iceberg: {}", e))?; + let (_chunk, mapping) = msg.inspect_err( + |e| tracing::error!(error = %e.as_report(), "Failed to read message from iceberg"), + )?; last_msg_id = mapping.get(self.split.topic.to_string().as_str()).cloned(); } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index c32d941a1fcc4..833c9661c3ca1 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -24,6 +24,7 @@ use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::ColumnId; use rw_futures_util::select_all; +use thiserror_ext::AsReport as _; use crate::dispatch_source_prop; use crate::error::ConnectorResult; @@ -187,7 +188,7 @@ async fn build_opendal_fs_list_stream(lister: OpendalEnumera } } Err(err) => { - tracing::error!("list object fail, err {}", err); + tracing::error!(error = %err.as_report(), "list object fail"); return Err(err); } } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 4e81ee38f7ca3..eb3d6b3205c4c 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -824,7 +824,7 @@ impl SourceManager { break worker; } Err(e) => { - tracing::warn!("failed to create source worker: {}", e); + tracing::warn!(error = %e.as_report(), "failed to create source worker"); } } };