From 49ebdaaa3793236d5b2caafead438e481bb1126e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 2 Feb 2024 17:12:05 +0800 Subject: [PATCH] fix some Signed-off-by: Bugen Zhao --- src/connector/src/lib.rs | 1 - src/connector/src/parser/mod.rs | 3 ++- src/connector/src/parser/mysql.rs | 21 ++++++++------- src/connector/src/parser/postgres.rs | 17 ++++++------ src/connector/src/parser/unified/json.rs | 3 ++- src/connector/src/sink/clickhouse.rs | 7 ++--- src/connector/src/sink/deltalake.rs | 22 +++++++-------- src/connector/src/sink/doris.rs | 27 ++++++++++--------- .../src/sink/doris_starrocks_connector.rs | 9 +++---- src/connector/src/sink/encoder/avro.rs | 3 ++- src/connector/src/sink/encoder/json.rs | 10 ++++--- src/connector/src/sink/iceberg/mod.rs | 8 +++--- 12 files changed, 69 insertions(+), 62 deletions(-) diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index ae478b9c100ee..821c91e166b1e 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -33,7 +33,6 @@ #![feature(error_generic_member_access)] #![feature(register_tool)] #![register_tool(rw)] -#![allow(rw::format_error)] // TODO(error-handling): need further refactoring use std::time::Duration; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index be0bf9b8d6ca8..439659cfa56fe 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -36,6 +36,7 @@ use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, }; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; +use thiserror_ext::AsReport; use self::avro::AvroAccessBuilder; use self::bytes_parser::BytesAccessBuilder; @@ -412,7 +413,7 @@ impl SourceStreamChunkRowWriter<'_> { LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::warn!( - %error, + error = %error.as_report(), split_id = self.row_meta.as_ref().map(|m| m.split_id), offset = self.row_meta.as_ref().map(|m| m.offset), column = desc.name, diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 6e40a6326dc66..4d0480ab46130 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -23,6 +23,7 @@ use risingwave_common::types::{ DataType, Date, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz, }; use rust_decimal::Decimal as RustDecimal; +use thiserror_ext::AsReport; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -102,12 +103,12 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros())) }), Err(err) => { - if let Ok(suppressed) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column `{}` fail: {} ({} suppressed)", - name, - err, - suppressed + suppressed_count, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None @@ -121,12 +122,12 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne match res { Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), Err(err) => { - if let Ok(suppressed) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column `{}` fail: {} ({} suppressed)", - name, - err, - suppressed + suppressed_count, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index 0823fa5579557..fe1906614698c 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -23,6 +23,7 @@ use risingwave_common::types::{ Timestamptz, }; use rust_decimal::Decimal as RustDecimal; +use thiserror_ext::AsReport; static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); @@ -159,10 +160,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } None @@ -256,10 +257,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Err(err) => { if let Ok(sc) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - name, - err, - sc + suppressed_count = sc, + column_name = name, + error = %err.as_report(), + "parse column failed", ); } } diff --git a/src/connector/src/parser/unified/json.rs b/src/connector/src/parser/unified/json.rs index 1990c26ca3ee4..a765f333ef314 100644 --- a/src/connector/src/parser/unified/json.rs +++ b/src/connector/src/parser/unified/json.rs @@ -29,6 +29,7 @@ use simd_json::prelude::{ TypedValue, ValueAsContainer, ValueAsScalar, ValueObjectAccess, ValueTryAsScalar, }; use simd_json::{BorrowedValue, ValueType}; +use thiserror_ext::AsReport; use super::{Access, AccessError, AccessResult}; use crate::parser::common::json_object_get_case_insensitive; @@ -468,7 +469,7 @@ impl JsonParseOptions { // TODO: is it possible to unify the logging with the one in `do_action`? static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::warn!(%error, suppressed_count, "undefined nested field, padding with `NULL`"); + tracing::warn!(error = %error.as_report(), suppressed_count, "undefined nested field, padding with `NULL`"); } &BorrowedValue::Static(simd_json::StaticNode::Null) }); diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 123739f4a5618..2a9a2e5a39eb6 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -26,6 +26,7 @@ use serde::ser::{SerializeSeq, SerializeStruct}; use serde::Serialize; use serde_derive::Deserialize; use serde_with::serde_as; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::{DummySinkCommitCoordinator, SinkWriterParam}; @@ -436,7 +437,7 @@ impl ClickHouseSinkWriter { .next() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))? + .map_err(|e| SinkError::ClickHouse(e.to_report_string()))? } else { 0_u8 }; @@ -455,7 +456,7 @@ impl ClickHouseSinkWriter { .first() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?; + .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?; if length > 38 { return Err(SinkError::ClickHouse( @@ -467,7 +468,7 @@ impl ClickHouseSinkWriter { .last() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? .parse::() - .map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?; + .map_err(|e| SinkError::ClickHouse(e.to_report_string()))?; (length, scale) } else { (0_u8, 0_u8) diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index c092b9c995870..0b9252aca468a 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType}; use deltalake::protocol::{DeltaOperation, SaveMode}; @@ -26,6 +26,7 @@ use deltalake::table::builder::s3_storage_options::{ use deltalake::writer::{DeltaWriter, RecordBatchWriter}; use deltalake::DeltaTable; use risingwave_common::array::{to_deltalake_record_batch_with_schema, StreamChunk}; +use risingwave_common::bail; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; @@ -369,7 +370,8 @@ impl DeltaLakeSinkWriter { async fn write(&mut self, chunk: StreamChunk) -> Result<()> { let a = to_deltalake_record_batch_with_schema(self.dl_schema.clone(), &chunk) - .map_err(|err| SinkError::DeltaLake(anyhow!("convert record batch error: {}", err)))?; + .context("convert record batch error") + .map_err(SinkError::DeltaLake)?; self.writer.write(a).await?; Ok(()) } @@ -381,7 +383,8 @@ fn convert_schema(schema: &StructType) -> Result TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata { type Error = SinkError; fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result { - let metadata = serde_json::to_vec(&value.adds).map_err(|e| -> SinkError { - anyhow!("Can't serialized deltalake sink metadata: {}", e).into() - })?; + let metadata = + serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?; Ok(SinkMetadata { metadata: Some(Serialized(SerializedMetadata { metadata })), }) @@ -496,13 +498,11 @@ impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata { impl DeltaLakeWriteResult { fn try_from(value: &SinkMetadata) -> Result { if let Some(Serialized(v)) = &value.metadata { - let adds = - serde_json::from_slice::>(&v.metadata).map_err(|e| -> SinkError { - anyhow!("Can't deserialize deltalake sink metadata: {}", e).into() - })?; + let adds = serde_json::from_slice::>(&v.metadata) + .context("Can't deserialize deltalake sink metadata")?; Ok(DeltaLakeWriteResult { adds }) } else { - Err(anyhow!("Can't create deltalake sink write result from empty data!").into()) + bail!("Can't create deltalake sink write result from empty data!") } } } diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index dbe87b4cb51ca..caf478934b3d4 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use base64::engine::general_purpose; use base64::Engine; @@ -31,6 +31,7 @@ use serde::Deserialize; use serde_derive::Serialize; use serde_json::Value; use serde_with::serde_as; +use thiserror_ext::AsReport; use with_options::WithOptions; use super::doris_starrocks_connector::{ @@ -326,8 +327,9 @@ impl DorisSinkWriter { DORIS_DELETE_SIGN.to_string(), Value::String("0".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Doris(format!("Json derialize error: {}", e.as_report())) + })?; self.client .as_mut() .ok_or_else(|| { @@ -342,8 +344,9 @@ impl DorisSinkWriter { DORIS_DELETE_SIGN.to_string(), Value::String("1".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Doris(format!("Json derialize error: {}", e.as_report())) + })?; self.client .as_mut() .ok_or_else(|| { @@ -359,8 +362,9 @@ impl DorisSinkWriter { DORIS_DELETE_SIGN.to_string(), Value::String("0".to_string()), ); - let row_json_string = serde_json::to_string(&row_json_value) - .map_err(|e| SinkError::Doris(format!("Json derialize error {:?}", e)))?; + let row_json_string = serde_json::to_string(&row_json_value).map_err(|e| { + SinkError::Doris(format!("Json derialize error: {}", e.as_report())) + })?; self.client .as_mut() .ok_or_else(|| { @@ -471,12 +475,9 @@ impl DorisSchemaClient { } else { raw_bytes }; - let schema: DorisSchema = serde_json::from_str(&json_data).map_err(|err| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get schema from json {:?}", - err - )) - })?; + let schema: DorisSchema = serde_json::from_str(&json_data) + .context("Can't get schema from json") + .map_err(SinkError::DorisStarrocksConnect)?; Ok(schema) } } diff --git a/src/connector/src/sink/doris_starrocks_connector.rs b/src/connector/src/sink/doris_starrocks_connector.rs index 550572a2b4bcc..ce019dd186005 100644 --- a/src/connector/src/sink/doris_starrocks_connector.rs +++ b/src/connector/src/sink/doris_starrocks_connector.rs @@ -16,6 +16,7 @@ use core::mem; use core::time::Duration; use std::collections::HashMap; +use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; use bytes::{BufMut, Bytes, BytesMut}; @@ -196,12 +197,8 @@ impl InserterInnerBuilder { )) })? .to_str() - .map_err(|err| { - SinkError::DorisStarrocksConnect(anyhow::anyhow!( - "Can't get doris BE url in header {:?}", - err - )) - })? + .context("Can't get doris BE url in header") + .map_err(SinkError::DorisStarrocksConnect)? } else { return Err(SinkError::DorisStarrocksConnect(anyhow::anyhow!( "Can't get doris BE url", diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index d63ab69951b08..924beb281eda7 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -20,6 +20,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType}; use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast}; +use thiserror_ext::AsReport; use super::{FieldEncodeError, Result as SinkResult, RowEncoder, SerTo}; @@ -134,7 +135,7 @@ impl SerTo> for AvroEncoded { ))); }; let raw = apache_avro::to_avro_datum(&self.schema, self.value) - .map_err(|e| crate::sink::SinkError::Encode(e.to_string()))?; + .map_err(|e| crate::sink::SinkError::Encode(e.to_report_string()))?; let mut buf = Vec::with_capacity(1 + 4 + raw.len()); buf.put_u8(0); buf.put_i32(schema_id); diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index b4d2de84c0069..22c41a18c002a 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use anyhow::Context; use base64::engine::general_purpose; use base64::Engine as _; use chrono::{Datelike, NaiveDateTime, Timelike}; @@ -26,6 +27,7 @@ use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef, Decimal, JsonbVal, ScalarRefImpl, ToText}; use risingwave_common::util::iter_util::ZipEqDebug; use serde_json::{json, Map, Value}; +use thiserror_ext::AsReport; use super::{ CustomJsonType, DateHandlingMode, KafkaConnectParams, KafkaConnectParamsRef, Result, @@ -134,7 +136,7 @@ impl RowEncoder for JsonEncoder { self.time_handling_mode, &self.custom_json_type, ) - .map_err(|e| SinkError::Encode(e.to_string()))?; + .map_err(|e| SinkError::Encode(e.to_report_string()))?; mappings.insert(key, value); } @@ -311,9 +313,9 @@ fn datum_to_json_object( )?; map.insert(sub_field.name.clone(), value); } - Value::String(serde_json::to_string(&map).map_err(|err| { - ArrayError::internal(format!("Json to string err{:?}", err)) - })?) + Value::String( + serde_json::to_string(&map).context("failed to serialize into JSON")?, + ) } CustomJsonType::Es | CustomJsonType::None => { let mut map = Map::with_capacity(st.len()); diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 35288a8809882..8336792587726 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -21,7 +21,7 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, }; @@ -384,14 +384,16 @@ async fn create_catalog(config: &IcebergConfig) -> Result { pub async fn create_table(config: &IcebergConfig) -> Result { let catalog = create_catalog(config) .await - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; + .context("Unable to load iceberg catalog") + .map_err(SinkError::Iceberg)?; let table_id = TableIdentifier::new( vec![config.database_name.as_str()] .into_iter() .chain(config.table_name.split('.')), ) - .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; + .context("Unable to parse table name") + .map_err(SinkError::Iceberg)?; catalog .load_table(&table_id)