Skip to content

Commit

Permalink
fix error formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Feb 23, 2024
1 parent cb420cb commit b967c56
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
%error,
error = %error.as_report(),
split_id = &*msg.split_id,
offset = msg.offset,
suppressed_count,
Expand Down
17 changes: 7 additions & 10 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,12 @@ impl Sink for NatsSink {
"Nats sink only support append-only mode"
)));
}
match self.config.common.build_client().await {
Ok(_client) => {}
Err(error) => {
return Err(SinkError::Nats(anyhow!(
"validate nats sink error: {:?}",
error
)));
}
}
let _client = self
.config
.common
.build_client()
.await
.context("validate nats sink error")?;
Ok(())
}

Expand All @@ -134,7 +131,7 @@ impl NatsSinkWriter {
.common
.build_context()
.await
.map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e)))?;
.map_err(|e| SinkError::Nats(anyhow!(e)))?;
Ok::<_, SinkError>(Self {
config: config.clone(),
context,
Expand Down
22 changes: 11 additions & 11 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::Duration;

use anyhow::anyhow;
use anyhow::{anyhow, Context as _};
use async_trait::async_trait;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaResult;
Expand Down Expand Up @@ -106,11 +106,11 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

async fn list_splits(&mut self) -> ConnectorResult<Vec<KafkaSplit>> {
let topic_partitions = self.fetch_topic_partition().await.map_err(|e| {
anyhow!(format!(
"failed to fetch metadata from kafka ({}), error: {}",
self.broker_address, e
))
let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
format!(
"failed to fetch metadata from kafka ({})",
self.broker_address
)
})?;
let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
let mut start_offsets = self
Expand Down Expand Up @@ -154,11 +154,11 @@ impl KafkaSplitEnumerator {
expect_start_timestamp_millis: Option<i64>,
expect_stop_timestamp_millis: Option<i64>,
) -> ConnectorResult<Vec<KafkaSplit>> {
let topic_partitions = self.fetch_topic_partition().await.map_err(|e| {
anyhow!(format!(
"failed to fetch metadata from kafka ({}), error: {}",
self.broker_address, e
))
let topic_partitions = self.fetch_topic_partition().await.with_context(|| {
format!(
"failed to fetch metadata from kafka ({})",
self.broker_address
)
})?;

// here we are getting the start offset and end offset for each partition with the given
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioE
use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::ROWID_PREFIX;
use risingwave_common::{bail, ensure};
use thiserror_ext::AsReport;

use crate::error::ConnectorResult;
use crate::parser::ParserConfig;
Expand Down Expand Up @@ -370,8 +371,9 @@ impl PulsarIcebergReader {

#[for_await]
for msg in self.as_stream_chunk_stream() {
let (_chunk, mapping) =
msg.inspect_err(|e| tracing::error!("Failed to read message from iceberg: {}", e))?;
let (_chunk, mapping) = msg.inspect_err(
|e| tracing::error!(error = %e.as_report(), "Failed to read message from iceberg"),
)?;
last_msg_id = mapping.get(self.split.topic.to_string().as_str()).cloned();
}

Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::ColumnId;
use rw_futures_util::select_all;
use thiserror_ext::AsReport as _;

use crate::dispatch_source_prop;
use crate::error::ConnectorResult;
Expand Down Expand Up @@ -187,7 +188,7 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(lister: OpendalEnumera
}
}
Err(err) => {
tracing::error!("list object fail, err {}", err);
tracing::error!(error = %err.as_report(), "list object fail");
return Err(err);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ impl SourceManager {
break worker;
}
Err(e) => {
tracing::warn!("failed to create source worker: {}", e);
tracing::warn!(error = %e.as_report(), "failed to create source worker");
}
}
};
Expand Down

0 comments on commit b967c56

Please sign in to comment.