From 67ae88a86e5e130dcc4804cefc555aabeec139cc Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 7 Feb 2024 10:52:10 +0800 Subject: [PATCH] refactor(error): clean-up direct error formatting (part 5, done) (#14956) Signed-off-by: Bugen Zhao --- src/connector/src/lib.rs | 1 - src/connector/src/parser/mod.rs | 3 +- src/connector/src/parser/mysql.rs | 21 ++++++------ src/connector/src/parser/postgres.rs | 17 +++++----- src/connector/src/parser/unified/json.rs | 3 +- src/connector/src/sink/clickhouse.rs | 7 ++-- src/connector/src/sink/deltalake.rs | 22 ++++++------- src/connector/src/sink/doris.rs | 27 ++++++++-------- .../src/sink/doris_starrocks_connector.rs | 9 ++---- src/connector/src/sink/encoder/avro.rs | 3 +- src/connector/src/sink/encoder/json.rs | 10 +++--- src/connector/src/sink/iceberg/mod.rs | 29 +++++++++-------- src/connector/src/sink/kafka.rs | 5 +-- src/connector/src/sink/mod.rs | 5 +-- src/connector/src/sink/nats.rs | 8 +++-- src/connector/src/sink/redis.rs | 2 +- src/connector/src/sink/remote.rs | 11 ++++--- src/connector/src/sink/starrocks.rs | 15 ++++++--- src/connector/src/source/base.rs | 8 ++--- .../src/source/cdc/enumerator/mod.rs | 4 +-- src/connector/src/source/cdc/external/mod.rs | 18 +++++------ .../src/source/cdc/external/postgres.rs | 21 +++++------- src/connector/src/source/cdc/source/reader.rs | 3 +- src/connector/src/source/cdc/split.rs | 32 +++++++++---------- .../src/source/datagen/source/reader.rs | 16 +++++----- .../src/source/filesystem/s3/source/reader.rs | 6 +--- .../source/google_pubsub/enumerator/client.rs | 10 +++--- .../src/source/google_pubsub/source/reader.rs | 4 +-- .../src/source/kafka/source/reader.rs | 4 +-- 29 files changed, 165 insertions(+), 159 deletions(-) diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index ae478b9c100ee..821c91e166b1e 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -33,7 +33,6 @@ #![feature(error_generic_member_access)] #![feature(register_tool)] #![register_tool(rw)] -#![allow(rw::format_error)] // TODO(error-handling): need further refactoring use std::time::Duration; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index aa36c610a4d03..c5b470db966ab 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -36,6 +36,7 @@ use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, }; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; +use thiserror_ext::AsReport; use self::avro::AvroAccessBuilder; use self::bytes_parser::BytesAccessBuilder; @@ -412,7 +413,7 @@ impl SourceStreamChunkRowWriter<'_> { LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::warn!( - %error, + error = %error.as_report(), split_id = self.row_meta.as_ref().map(|m| m.split_id), offset = self.row_meta.as_ref().map(|m| m.offset), column = desc.name, diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 6e40a6326dc66..4d0480ab46130 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -23,6 +23,7 @@ use risingwave_common::types::{ DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, }; use rust_decimal::Decimal as RustDecimal; +use thiserror_ext::AsReport; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -102,12 +103,12 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros())) }), Err(err) => { - if let Ok(suppressed) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column `{}` fail: {} ({} suppressed)", - name, - err, - suppressed + suppressed_count, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None @@ -121,12 +122,12 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne match res { Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), Err(err) => { - if let Ok(suppressed) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column `{}` fail: {} ({} suppressed)", - name, - err, - suppressed + suppressed_count, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index 0823fa5579557..fe1906614698c 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -23,6 +23,7 @@ use risingwave_common::types::{ Timestamptz, }; use rust_decimal::Decimal as RustDecimal; +use thiserror_ext::AsReport; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -159,10 +160,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None @@ -256,10 +257,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } } diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 1990c26ca3ee4..a765f333ef314 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -29,6 +29,7 @@ use simd_json::prelude::{ TypedValue, ValueAsContainer, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar, }; use simd_json::{BorrowedValue, ValueType}; +use thiserror_ext::AsReport; use super::{Access, AccessError, AccessResult}; use crate::parser::common::json_object_get_case_insensitive; @@ -468,7 +469,7 @@ impl JsonParseOptions { // TODO: is it possible to unify the logging with the one in `do_action`? static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::warn!(%error, suppressed_count, "undefined nested field, padding with `NULL`"); + tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`"); } &BorrowedValue::Static(simd_json::StaticNode::Null) }); diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 123739f4a5618..2a9a2e5a39eb6 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -26,6 +26,7 @@ use serde::ser::{SerializeSeq, SerializeStruct}; use serde::Serialize; use serde_derive::Deserialize; use serde_with::serde_as; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::{DummySinkCommitCoordinator, SinkWriterParam}; @@ -436,7 +437,7 @@ impl ClickHouseSinkWriter { .next() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))? + .map_err(|e| SinkError::ClickHouse(e.to_report_string()))? } else { 0_u8 }; @@ -455,7 +456,7 @@ impl ClickHouseSinkWriter { .first() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?; + .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?; if length > 38 { return Err(SinkError::ClickHouse( @@ -467,7 +468,7 @@ impl ClickHouseSinkWriter { .last() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?; + .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?; (length, scale) } else { (0_u8, 0_u8) diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index c092b9c995870..0b9252aca468a 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType}; use deltalake::protocol::{DeltaOperation, SaveMode}; @@ -26,6 +26,7 @@ use deltalake::table::builder::s3_storage_options::{ use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::DeltaTable; use risingwave_common::array::{to_deltalake_record_batch_with_schema, StreamChunk}; +use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; @@ -369,7 +370,8 @@ impl DeltaLakeSinkWriter { async fn write(&mut self, chunk: StreamChunk) -> Result<()> { let a = to_deltalake_record_batch_with_schema(self.dl_schema.clone(), &chunk) - .map_err(|err| SinkError::DeltaLake(anyhow!("convert record batch error: {}", err)))?; + .context("convert record batch error") + .map_err(SinkError::DeltaLake)?; self.writer.write(a).await?; Ok(()) } @@ -381,7 +383,8 @@ fn convert_schema(schema: &StructType) -> Result TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata { type Error = SinkError; fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result { - let metadata = serde_json::to_vec(&value.adds).map_err(|e| -> SinkError { - anyhow!("Can't serialized deltalake sink metadata: {}", e).into() - })?; + let metadata = + serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?; Ok(SinkMetadata { metadata: Some(Serialized(SerializedMetadata { metadata })), }) @@ -496,13 +498,11 @@ impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata { impl DeltaLakeWriteResult { fn try_from(value: &SinkMetadata) -> Result { if let Some(Serialized(v)) = &value.metadata { - let adds = - serde_json::from_slice::>(&v.metadata).map_err(|e| -> SinkError { - anyhow!("Can't deserialize deltalake sink metadata: {}", e).into() - })?; + let adds = serde_json::from_slice::>(&v.metadata) + .context("Can't deserialize deltalake sink metadata")?; Ok(DeltaLakeWriteResult { adds }) } else { - Err(anyhow!("Can't create deltalake sink write result from empty data!").into()) + bail!("Can't create deltalake sink write result from empty data!") } } } diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index dbe87b4cb51ca..caf478934b3d4 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; @@ -31,6 +31,7 @@ use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::doris_starrocks_connector::{ @@ -326,8 +327,9 @@ impl DorisSinkWriter { DORIS_DELETE_SIGN.to_string(), Value::String("0".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Doris(format!("Json derialize error: {}", e.as_report())) + })?; self.client .as_mut() .ok_or_else(|| { @@ -342,8 +344,9 @@ impl DorisSinkWriter { DORIS_DELETE_SIGN.to_string(), Value::String("1".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Doris(format!("Json derialize error: {}", e.as_report())) + })?; self.client .as_mut() .ok_or_else(|| { @@ -359,8 +362,9 @@ impl DorisSinkWriter { DORIS_DELETE_SIGN.to_string(), Value::String("0".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Doris(format!("Json derialize error: {}", e.as_report())) + })?; self.client .as_mut() .ok_or_else(|| { @@ -471,12 +475,9 @@ impl DorisSchemaClient { } else { raw_bytes }; - let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get schema from json {:?}", - err - )) - })?; + let schema: DorisSchema = serde_json::from_str(&json_data) + .context("Can't get schema from json") + .map_err(SinkError::DorisStarrocksConnect)?; Ok(schema) } } diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 550572a2b4bcc..ce019dd186005 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -16,6 +16,7 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; +use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; @@ -196,12 +197,8 @@ impl InserterInnerBuilder { )) })? .to_str() - .map_err(|err| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url in header {:?}", - err - )) - })? + .context("Can't get doris BE url in header") + .map_err(SinkError::DorisStarrocksConnect)? } else { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "Can't get doris BE url", diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index d63ab69951b08..924beb281eda7 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -20,6 +20,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType}; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; +use thiserror_ext::AsReport; use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo}; @@ -134,7 +135,7 @@ impl SerTo> for AvroEncoded { ))); }; let raw = apache_avro::to_avro_datum(&self.schema, self.value) - .map_err(|e| crate::sink::SinkError::Encode(e.to_string()))?; + .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?; let mut buf = Vec::with_capacity(1 + 4 + raw.len()); buf.put_u8(0); buf.put_i32(schema_id); diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index b4d2de84c0069..22c41a18c002a 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use anyhow::Context; use base64::engine::general_purpose; use base64::Engine as _; use chrono::{Datelike, NaiveDateTime, Timelike}; @@ -26,6 +27,7 @@ use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_json::{json, Map, Value}; +use thiserror_ext::AsReport; use super::{ CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result, @@ -134,7 +136,7 @@ impl RowEncoder for JsonEncoder { self.time_handling_mode, &self.custom_json_type, ) - .map_err(|e| SinkError::Encode(e.to_string()))?; + .map_err(|e| SinkError::Encode(e.to_report_string()))?; mappings.insert(key, value); } @@ -311,9 +313,9 @@ fn datum_to_json_object( )?; map.insert(sub_field.name.clone(), value); } - Value::String(serde_json::to_string(&map).map_err(|err| { - ArrayError::internal(format!("Json to string err{:?}", err)) - })?) + Value::String( + serde_json::to_string(&map).context("failed to serialize into JSON")?, + ) } CustomJsonType::Es | CustomJsonType::None => { let mut map = Map::with_capacity(st.len()); diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 6b759845373fa..68c5654533a64 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -21,7 +21,7 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, }; @@ -40,6 +40,7 @@ use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; use icelake::{Table, TableIdentifier}; use itertools::Itertools; use risingwave_common::array::{to_iceberg_record_batch_with_schema, Op, StreamChunk}; +use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; @@ -47,6 +48,7 @@ use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde::de; use serde_derive::Deserialize; +use thiserror_ext::AsReport; use url::Url; use with_options::WithOptions; @@ -384,14 +386,14 @@ impl IcebergConfig { let catalog = self .create_catalog() .await - .map_err(|e| anyhow!("Unable to load iceberg catalog: {e}"))?; + .context("Unable to load iceberg catalog")?; let table_id = TableIdentifier::new( vec![self.database_name.as_str()] .into_iter() .chain(self.table_name.split('.')), ) - .map_err(|e| anyhow!("Unable to parse table name: {e}"))?; + .context("Unable to parse table name")?; catalog .load_table(&table_id) @@ -803,12 +805,12 @@ impl WriteResult { fn try_from(value: &SinkMetadata, partition_type: &Any) -> Result { if let Some(Serialized(v)) = &value.metadata { let mut values = if let serde_json::Value::Object(v) = - serde_json::from_slice::(&v.metadata).map_err( - |e| -> SinkError { anyhow!("Can't parse iceberg sink metadata: {}", e).into() }, - )? { + serde_json::from_slice::(&v.metadata) + .context("Can't parse iceberg sink metadata")? + { v } else { - return Err(anyhow!("iceberg sink metadata should be a object").into()); + bail!("iceberg sink metadata should be a object"); }; let data_files: Vec; @@ -833,7 +835,7 @@ impl WriteResult { .into_iter() .map(|value| data_file_from_json(value, partition_type.clone())) .collect::, icelake::Error>>() - .map_err(|e| anyhow!("Failed to parse data file from json: {}", e))?; + .context("Failed to parse data file from json")?; } else { return Err(anyhow!("icberg sink metadata should have data_files object").into()); } @@ -858,7 +860,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { .cloned() .map(data_file_to_json) .collect::, icelake::Error>>() - .map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?, + .context("Can't serialize data files to json")?, ); let json_delete_files = serde_json::Value::Array( value @@ -867,7 +869,7 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { .cloned() .map(data_file_to_json) .collect::, icelake::Error>>() - .map_err(|e| anyhow!("Can't serialize data files to json: {}", e))?, + .context("Can't serialize data files to json")?, ); let json_value = serde_json::Value::Object( vec![ @@ -879,9 +881,8 @@ impl<'a> TryFrom<&'a WriteResult> for SinkMetadata { ); Ok(SinkMetadata { metadata: Some(Serialized(SerializedMetadata { - metadata: serde_json::to_vec(&json_value).map_err(|e| -> SinkError { - anyhow!("Can't serialized iceberg sink metadata: {}", e).into() - })?, + metadata: serde_json::to_vec(&json_value) + .context("Can't serialize iceberg sink metadata")?, })), }) } @@ -916,7 +917,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter { txn.append_delete_file(s.delete_files); }); txn.commit().await.map_err(|err| { - tracing::error!(?err, "Failed to commit iceberg table"); + tracing::error!(error = %err.as_report(), "Failed to commit iceberg table"); SinkError::Iceberg(anyhow!(err)) })?; diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 25681125f9069..6d5407578b29d 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -29,6 +29,7 @@ use risingwave_common::catalog::Schema; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; @@ -478,10 +479,10 @@ impl<'w> KafkaPayloadWriter<'w> { // We can retry for another round after sleeping for sometime Err((e, rec)) => { tracing::warn!( - "producing message (key {:?}) to topic {} failed, err {:?}.", + error = %e.as_report(), + "producing message (key {:?}) to topic {} failed", rec.key.map(|k| k.to_bytes()), rec.topic, - e ); record = rec; match e { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 26e946fa06d5e..fc6712f17604b 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -56,6 +56,7 @@ use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::MetaClient; use thiserror::Error; +use thiserror_ext::AsReport; pub use tracing; use self::catalog::{SinkFormatDesc, SinkType}; @@ -547,7 +548,7 @@ impl From for SinkError { impl From for SinkError { fn from(value: ClickHouseError) -> Self { - SinkError::ClickHouse(format!("{}", value)) + SinkError::ClickHouse(value.to_report_string()) } } @@ -559,6 +560,6 @@ impl From for SinkError { impl From for SinkError { fn from(value: RedisError) -> Self { - SinkError::Redis(format!("{}", value)) + SinkError::Redis(value.to_report_string()) } } diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 9f906b49fbd21..7a97771dee8ef 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -14,7 +14,7 @@ use core::fmt::Debug; use std::collections::HashMap; -use anyhow::anyhow; +use anyhow::{anyhow, Context as _}; use async_nats::jetstream::context::Context; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; @@ -159,13 +159,15 @@ impl NatsSinkWriter { self.context .publish(self.config.common.subject.clone(), item.into()) .await - .map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e)))?; + .context("nats sink error") + .map_err(SinkError::Nats)?; } Ok::<_, SinkError>(()) }, ) .await - .map_err(|e| SinkError::Nats(anyhow!("nats sink error: {:?}", e))) + .context("nats sink error") + .map_err(SinkError::Nats) } } diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 344201981fd5c..d79d67e4adc2e 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -63,7 +63,7 @@ impl RedisConfig { pub fn from_hashmap(properties: HashMap) -> Result { let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) - .map_err(|e| SinkError::Config(anyhow!("{:?}", e)))?; + .map_err(|e| SinkError::Config(anyhow!(e)))?; Ok(config) } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 943dd46a565b2..dfc3bed0e372c 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -48,6 +48,7 @@ use risingwave_rpc_client::{ DEFAULT_BUFFER_SIZE, }; use rw_futures_util::drop_either_future; +use thiserror_ext::AsReport; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; @@ -766,11 +767,13 @@ impl EmbeddedConnectorClient { let jvm = self.jvm; std::thread::spawn(move || { - let mut env = match jvm.attach_current_thread() { + let mut env = match jvm + .attach_current_thread() + .context("failed to attach current thread") + { Ok(env) => env, Err(e) => { - let _ = response_tx - .blocking_send(Err(anyhow!("failed to attach current thread: {:?}", e))); + let _ = response_tx.blocking_send(Err(e)); return; } }; @@ -789,7 +792,7 @@ impl EmbeddedConnectorClient { tracing::info!("end of jni call {}::{}", class_name, method_name); } Err(e) => { - tracing::error!("jni call error: {:?}", e); + tracing::error!(error = %e.as_report(), "jni call error"); } }; }); diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index d1c1f97f6a60c..4c9460abc431d 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -29,6 +29,7 @@ use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::doris_starrocks_connector::{ @@ -322,13 +323,17 @@ impl StarrocksSinkWriter { .first() .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + .map_err(|e| { + SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report())) + })?; let scale = decimal_all .last() .ok_or_else(|| SinkError::Starrocks("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::Starrocks(format!("starrocks sink error {}", e)))?; + .map_err(|e| { + SinkError::Starrocks(format!("starrocks sink error: {}", e.as_report())) + })?; decimal_map.insert(name.to_string(), (length, scale)); } } @@ -394,7 +399,7 @@ impl StarrocksSinkWriter { Value::String("0".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { - SinkError::Starrocks(format!("Json derialize error {:?}", e)) + SinkError::Starrocks(format!("Json derialize error: {}", e.as_report())) })?; self.client .as_mut() @@ -411,7 +416,7 @@ impl StarrocksSinkWriter { Value::String("1".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { - SinkError::Starrocks(format!("Json derialize error {:?}", e)) + SinkError::Starrocks(format!("Json derialize error: {}", e.as_report())) })?; self.client .as_mut() @@ -429,7 +434,7 @@ impl StarrocksSinkWriter { Value::String("0".to_string()), ); let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { - SinkError::Starrocks(format!("Json derialize error {:?}", e)) + SinkError::Starrocks(format!("Json derialize error: {}", e.as_report())) })?; self.client .as_mut() diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index b0e556c52b3be..5b909a2738f3c 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -85,8 +85,8 @@ pub trait UnknownFields { impl TryFromHashmap for P { fn try_from_hashmap(props: HashMap, deny_unknown_fields: bool) -> Result { - let json_value = serde_json::to_value(props).map_err(|e| anyhow!(e))?; - let res = serde_json::from_value::

(json_value).map_err(|e| anyhow!(e.to_string()))?; + let json_value = serde_json::to_value(props)?; + let res = serde_json::from_value::

(json_value)?; if !deny_unknown_fields || res.unknown_fields().is_empty() { Ok(res) @@ -310,8 +310,8 @@ pub fn extract_source_struct(info: &PbStreamSourceInfo) -> Result }; return Ok(SourceStruct::new(format, encode)); } - let source_format = info.get_format().map_err(|e| anyhow!("{e:?}"))?; - let source_encode = info.get_row_encode().map_err(|e| anyhow!("{e:?}"))?; + let source_format = info.get_format()?; + let source_encode = info.get_row_encode()?; let (format, encode) = match (source_format, source_encode) { (PbFormatType::Plain, PbEncodeType::Json) => (SourceFormat::Plain, SourceEncode::Json), (PbFormatType::Plain, PbEncodeType::Protobuf) => { diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index 1664640eef03f..58bc42e537578 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -16,7 +16,7 @@ use std::marker::PhantomData; use std::ops::Deref; use std::str::FromStr; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use itertools::Itertools; use prost::Message; @@ -111,7 +111,7 @@ where ) }) .await - .map_err(|e| anyhow!("failed to validate source: {:?}", e))??; + .context("failed to validate source")??; tracing::debug!("validate cdc source properties success"); Ok(Self { diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 1d0c0e3974404..78c6c714e2bc6 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -17,7 +17,7 @@ mod postgres; use std::collections::HashMap; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -191,19 +191,18 @@ pub struct DebeziumSourceOffset { impl MySqlOffset { pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { - let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| { - ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e)) - })?; + let dbz_offset: DebeziumOffset = serde_json::from_str(offset) + .with_context(|| format!("invalid upstream offset: {}", offset))?; Ok(Self { filename: dbz_offset .source_offset .file - .ok_or_else(|| anyhow!("binlog file not found in offset"))?, + .context("binlog file not found in offset")?, position: dbz_offset .source_offset .pos - .ok_or_else(|| anyhow!("binlog position not found in offset"))?, + .context("binlog position not found in offset")?, }) } } @@ -268,7 +267,8 @@ impl ExternalTableReader for MySqlExternalTableReader { let row = rs .iter_mut() .exactly_one() - .map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?; + .ok() + .context("expect exactly one row when reading binlog offset")?; Ok(CdcOffset::MySql(MySqlOffset { filename: row.take("File").unwrap(), @@ -296,9 +296,7 @@ impl MySqlExternalTableReader { let config = serde_json::from_value::( serde_json::to_value(with_properties).unwrap(), ) - .map_err(|e| { - ConnectorError::Config(anyhow!("fail to extract mysql connector properties: {}", e)) - })?; + .context("failed to extract mysql connector properties")?; let database_url = format!( "mysql://{}:{}@{}:{}/{}", diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 036e62abfe129..f8f0c9d402347 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -15,7 +15,7 @@ use std::cmp::Ordering; use std::collections::HashMap; -use anyhow::anyhow; +use anyhow::Context; use futures::stream::BoxStream; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -24,6 +24,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::DatumRef; use serde_derive::{Deserialize, Serialize}; +use thiserror_ext::AsReport; use tokio_postgres::types::PgLsn; use tokio_postgres::NoTls; @@ -51,19 +52,18 @@ impl PartialOrd for PostgresOffset { impl PostgresOffset { pub fn parse_debezium_offset(offset: &str) -> ConnectorResult { - let dbz_offset: DebeziumOffset = serde_json::from_str(offset).map_err(|e| { - ConnectorError::Internal(anyhow!("invalid upstream offset: {}, error: {}", offset, e)) - })?; + let dbz_offset: DebeziumOffset = serde_json::from_str(offset) + .with_context(|| format!("invalid upstream offset: {}", offset))?; Ok(Self { txid: dbz_offset .source_offset .txid - .ok_or_else(|| anyhow!("invalid postgres txid"))?, + .context("invalid postgres txid")?, lsn: dbz_offset .source_offset .lsn - .ok_or_else(|| anyhow!("invalid postgres lsn"))?, + .context("invalid postgres lsn")?, }) } } @@ -125,12 +125,7 @@ impl PostgresExternalTableReader { let config = serde_json::from_value::( serde_json::to_value(properties).unwrap(), ) - .map_err(|e| { - ConnectorError::Config(anyhow!( - "fail to extract postgres connector properties: {}", - e - )) - })?; + .context("failed to extract postgres connector properties")?; let database_url = format!( "postgresql://{}:{}@{}:{}/{}", @@ -141,7 +136,7 @@ impl PostgresExternalTableReader { tokio::spawn(async move { if let Err(e) = connection.await { - tracing::error!("connection error: {}", e); + tracing::error!(error = %e.as_report(), "postgres connection error"); } }); diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 19f7ca55cd302..c21d579df7778 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -26,6 +26,7 @@ use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType}; use risingwave_pb::connector_service::{ GetEventStreamRequest, GetEventStreamResponse, SourceCommonParam, }; +use thiserror_ext::AsReport; use tokio::sync::mpsc; use crate::parser::ParserConfig; @@ -137,7 +138,7 @@ impl SplitReader for CdcSplitReader { tracing::info!(?source_id, "end of jni call runJniDbzSourceThread"); } Err(e) => { - tracing::error!(?source_id, "jni call error: {:?}", e); + tracing::error!(?source_id, error = %e.as_report(), "jni call error"); } } }); diff --git a/src/connector/src/source/cdc/split.rs b/src/connector/src/source/cdc/split.rs index 4c46b27be75e8..a7357d231b78b 100644 --- a/src/connector/src/source/cdc/split.rs +++ b/src/connector/src/source/cdc/split.rs @@ -14,7 +14,7 @@ use std::marker::PhantomData; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; @@ -66,14 +66,13 @@ impl MySqlCdcSplit { pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { let mut snapshot_done = self.inner.snapshot_done; if !snapshot_done { - let dbz_offset: DebeziumOffset = serde_json::from_str(&start_offset).map_err(|e| { - anyhow!( - "invalid mysql offset: {}, error: {}, split: {}", - start_offset, - e, - self.inner.split_id - ) - })?; + let dbz_offset: DebeziumOffset = + serde_json::from_str(&start_offset).with_context(|| { + format!( + "invalid mysql offset: {}, split: {}", + start_offset, self.inner.split_id + ) + })?; // heartbeat event should not update the `snapshot_done` flag if !dbz_offset.is_heartbeat { @@ -106,14 +105,13 @@ impl PostgresCdcSplit { pub fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> { let mut snapshot_done = self.inner.snapshot_done; if !snapshot_done { - let dbz_offset: DebeziumOffset = serde_json::from_str(&start_offset).map_err(|e| { - anyhow!( - "invalid postgres offset: {}, error: {}, split: {}", - start_offset, - e, - self.inner.split_id - ) - })?; + let dbz_offset: DebeziumOffset = + serde_json::from_str(&start_offset).with_context(|| { + format!( + "invalid postgres offset: {}, split: {}", + start_offset, self.inner.split_id + ) + })?; // heartbeat event should not update the `snapshot_done` flag if !dbz_offset.is_heartbeat { diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 2bef27ef95fb0..2e1b5f7917261 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -14,10 +14,11 @@ use std::collections::HashMap; -use anyhow::{anyhow, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; +use thiserror_ext::AsReport; use super::generator::DatagenEventGenerator; use crate::parser::{EncodingProperties, ParserConfig, ProtocolProperties}; @@ -209,9 +210,9 @@ fn generator_from_data_type( Ok(seed) => seed ^ split_index, Err(e) => { tracing::warn!( - "cannot parse {:?} to u64 due to {:?}, will use {:?} as random seed", + error = %e.as_report(), + "cannot parse {:?} to u64, will use {:?} as random seed", seed, - e, split_index ); split_index @@ -230,11 +231,10 @@ fn generator_from_data_type( .map(|s| s.to_lowercase()); let basetime = match fields_option_map.get(format!("fields.{}.basetime", name).as_str()) { - Some(base) => { - Some(chrono::DateTime::parse_from_rfc3339(base).map_err(|e| { - anyhow!("cannot parse {:?} to rfc3339 due to {:?}", base, e) - })?) - } + Some(base) => Some( + chrono::DateTime::parse_from_rfc3339(base) + .with_context(|| format!("cannot parse `{base}` to rfc3339"))?, + ), None => None, }; diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index b8e7a2a71b0cd..884f1d19062ce 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -85,11 +85,7 @@ impl S3FileReader { return Ok(()); } Err(e) => { - return Err(anyhow!( - "S3 GetObject from {} error: {}", - bucket_name, - e.to_string() - )); + return Err(anyhow!(e).context(format!("S3 GetObject from {bucket_name} error"))); } }; diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index 01809a3c773b0..bc1d9d078b66a 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::{anyhow, bail}; +use anyhow::{bail, Context}; use async_trait::async_trait; use chrono::{TimeZone, Utc}; use google_cloud_pubsub::client::{Client, ClientConfig}; @@ -49,13 +49,13 @@ impl SplitEnumerator for PubsubSplitEnumerator { let config = ClientConfig::default().with_auth().await?; let client = Client::new(config) .await - .map_err(|e| anyhow!("error initializing pubsub client: {:?}", e))?; + .context("error initializing pubsub client")?; let sub = client.subscription(&subscription); if !sub .exists(None) .await - .map_err(|e| anyhow!("error checking subscription validity: {:?}", e))? + .context("error checking subscription validity")? { bail!("subscription {} does not exist", &subscription) } @@ -76,7 +76,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { (Some(start_offset), None) => { let ts = start_offset .parse::() - .map_err(|e| anyhow!("error parsing start_offset: {:?}", e)) + .context("error parsing start_offset") .map(|nanos| Utc.timestamp_nanos(nanos).into())?; Some(SeekTo::Timestamp(ts)) } @@ -89,7 +89,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { if let Some(seek_to) = seek_to { sub.seek(seek_to, None) .await - .map_err(|e| anyhow!("error seeking subscription: {:?}", e))?; + .context("error seeking subscription")?; } Ok(Self { diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index d18fcb0be258b..fd5fab15ed10b 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -135,12 +135,12 @@ impl SplitReader for PubsubSplitReader { .as_str() .parse::() .map(|nanos| Utc.timestamp_nanos(nanos)) - .map_err(|e| anyhow!("error parsing offset: {:?}", e))?; + .context("error parsing offset")?; subscription .seek(SeekTo::Timestamp(timestamp.into()), None) .await - .map_err(|e| anyhow!("error seeking to pubsub offset: {:?}", e))?; + .context("error seeking to pubsub offset")?; } let stop_offset = if let Some(ref offset) = split.stop_offset { diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 691590e361cde..bb8e70471282f 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::mem::swap; use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::{Context, Result}; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; @@ -98,7 +98,7 @@ impl SplitReader for KafkaSplitReader { .set_log_level(RDKafkaLogLevel::Info) .create_with_context(client_ctx) .await - .map_err(|e| anyhow!("failed to create kafka consumer: {}", e))?; + .context("failed to create kafka consumer")?; let mut tpl = TopicPartitionList::with_capacity(splits.len());