diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 5da838c8ff4f..cfc7f1fe76e6 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -25,6 +25,8 @@ use risingwave_common::types::{ use rust_decimal::Decimal as RustDecimal; use thiserror_ext::AsReport; +use crate::parser::util::log_error; + static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); macro_rules! handle_data_type { @@ -33,14 +35,7 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(v)), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = $name, - error = %err.as_report(), - suppressed_count, - "parse column failed", - ); - } + log_error!($name, err, "parse column failed"); None } } @@ -50,14 +45,7 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = $name, - error = %err.as_report(), - suppressed_count, - "parse column failed", - ); - } + log_error!($name, err, "parse column failed"); None } } @@ -113,14 +101,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne ScalarImpl::from(Timestamptz::from_micros(v.timestamp_micros())) }), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse column failed", - ); - } + log_error!(name, err, "parse column failed"); None } } @@ -132,14 +113,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne match res { Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse column failed", - ); - } + log_error!(name, err, "parse column failed"); None } } diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index 0f0f6b4badb4..efeb27b1ee4f 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -29,6 +29,8 @@ use rust_decimal::Decimal as RustDecimal; use thiserror_ext::AsReport; use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type}; +use crate::parser::util::log_error; + static LOG_SUPPERSSER: LazyLock = LazyLock::new(LogSuppresser::default); macro_rules! handle_list_data_type { @@ -42,14 +44,7 @@ macro_rules! handle_list_data_type { } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = $name, - error = %err.as_report(), - suppressed_count, - "parse column failed", - ); - } + log_error!($name, err, "parse column failed"); } } }; @@ -64,14 +59,7 @@ macro_rules! handle_list_data_type { } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = $name, - error = %err.as_report(), - suppressed_count, - "parse column failed", - ); - } + log_error!($name, err, "parse column failed"); } } }; @@ -83,14 +71,7 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(v)), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = $name, - error = %err.as_report(), - suppressed_count, - "parse column failed", - ); - } + log_error!($name, err, "parse column failed"); None } } @@ -100,14 +81,7 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = $name, - error = %err.as_report(), - suppressed_count, - "parse column failed", - ); - } + log_error!($name, err, "parse column failed"); None } } @@ -149,16 +123,9 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O // Note: It's only used to map the numeric type in upstream Postgres to RisingWave's rw_int256. let res = row.try_get::<_, Option>(i); match res { - Ok(val) => pg_numeric_to_rw_int256(val), + Ok(val) => pg_numeric_to_rw_int256(val, name), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = name, - error = %err.as_report(), - suppressed_count, - "parse numeric column as pg_numeric failed", - ); - } + log_error!(name, err, "parse numeric column as pg_numeric failed"); None } } @@ -170,14 +137,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.0)), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse enum column failed", - ); - } + log_error!(name, err, "parse enum column failed"); None } } @@ -189,14 +149,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse uuid column failed", - ); - } + log_error!(name, err, "parse uuid column failed"); None } } @@ -210,14 +163,11 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => pg_numeric_to_varchar(val), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - column = name, - error = %err.as_report(), - suppressed_count, - "parse numeric column as pg_numeric failed", - ); - } + log_error!( + name, + err, + "parse numeric column as pg_numeric failed" + ); None } } @@ -245,14 +195,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse column failed", - ); - } + log_error!(name, err, "parse column failed"); None } } @@ -279,14 +222,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse enum column failed", - ); - } + log_error!(name, err, "parse enum column failed"); } } } else { @@ -331,15 +267,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() - { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse uuid column failed", - ); - } + log_error!(name, err, "parse uuid column failed"); } }; } @@ -356,15 +284,11 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() - { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse numeric list column as pg_numeric list failed", - ); - } + log_error!( + name, + err, + "parse numeric list column as pg_numeric list failed" + ); } }; } @@ -429,14 +353,7 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse column failed", - ); - } + log_error!(name, err, "parse column failed"); } } } @@ -446,19 +363,19 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O Ok(val) => { if let Some(v) = val { v.into_iter().for_each(|val| { - builder.append(pg_numeric_to_rw_int256(Some(val))) + builder.append(pg_numeric_to_rw_int256( + Some(val), + name, + )) }); } } Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - suppressed_count, - column = name, - error = %err.as_report(), - "parse numeric list column as pg_numeric list failed", - ); - } + log_error!( + name, + err, + "parse numeric list column as pg_numeric list failed" + ); } }; } @@ -484,18 +401,12 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O OwnedRow::new(datums) } -fn pg_numeric_to_rw_int256(val: Option) -> Option { +fn pg_numeric_to_rw_int256(val: Option, name: &str) -> Option { let string = pg_numeric_to_string(val)?; match Int256::from_str(string.as_str()) { Ok(num) => Some(ScalarImpl::from(num)), Err(err) => { - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::error!( - error = %err.as_report(), - suppressed_count, - "parse numeric string as rw_int256 failed", - ); - } + log_error!(name, err, "parse numeric string as rw_int256 failed"); None } } diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 57091d701fef..27eb7a8144bd 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -25,6 +25,20 @@ use crate::connector_common::AwsAuthProps; use crate::error::ConnectorResult; use crate::source::SourceMeta; +macro_rules! log_error { + ($name:expr, $err:expr, $message:expr) => { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::error!( + column = $name, + error = %$err.as_report(), + suppressed_count, + $message, + ); + } + }; +} +pub(crate) use log_error; + /// get kafka topic name pub(super) fn get_kafka_topic(props: &HashMap) -> ConnectorResult<&String> { const KAFKA_TOPIC_KEY1: &str = "kafka.topic";