From 06e28e8dec8b2f2f2ff16bd32f4bcfd79efeaa85 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 28 Feb 2024 16:27:25 +0800 Subject: [PATCH 1/2] refactor: do not output suppressed count when it's zero Signed-off-by: Bugen Zhao --- src/common/src/log.rs | 13 +++--- src/connector/src/parser/avro/util.rs | 10 ++--- src/connector/src/parser/mysql.rs | 22 +++++++--- src/connector/src/parser/postgres.rs | 58 +++++++++++++-------------- 4 files changed, 58 insertions(+), 45 deletions(-) diff --git a/src/common/src/log.rs b/src/common/src/log.rs index f462c6c5e13d..e7030fa1883f 100644 --- a/src/common/src/log.rs +++ b/src/common/src/log.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::num::NonZeroU32; +use std::num::{NonZeroU32, NonZeroUsize}; use std::sync::atomic::{AtomicUsize, Ordering}; use governor::Quota; @@ -46,10 +46,13 @@ impl LogSuppresser { /// Check if the log should be suppressed. /// If the log should be suppressed, return `Err(LogSuppressed)`. - /// Otherwise, return `Ok(usize)` with count of suppressed messages before. - pub fn check(&self) -> core::result::Result { + /// Otherwise, return `Ok(Some(..))` with count of suppressed messages since last check, + /// or `Ok(None)` if there's none. + pub fn check(&self) -> core::result::Result, LogSuppressed> { match self.rate_limiter.check() { - Ok(()) => Ok(self.suppressed_count.swap(0, Ordering::Relaxed)), + Ok(()) => Ok(NonZeroUsize::new( + self.suppressed_count.swap(0, Ordering::Relaxed), + )), Err(_) => { self.suppressed_count.fetch_add(1, Ordering::Relaxed); Err(LogSuppressed) @@ -86,7 +89,7 @@ mod tests { }); if let Ok(suppressed_count) = RATE_LIMITER.check() { - println!("failed to foo bar. suppressed_count = {}", suppressed_count); + println!("failed to foo bar. suppressed_count = {:?}", suppressed_count); } } } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 958f4c9ca5db..a58ad884fd88 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -97,11 +97,11 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult { LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::warn!( - "RisingWave supports decimal precision up to {}, but got {}. Will truncate. ({} suppressed)", - Decimal::MAX_PRECISION, - suppressed_count, - precision - ); + suppressed_count, + "RisingWave supports decimal precision up to {}, but got {}. Will truncate.", + Decimal::MAX_PRECISION, + precision + ); } } DataType::Decimal diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 4d0480ab4613..5da838c8ff4f 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -33,8 +33,13 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(v)), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { - tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::error!( + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", + ); } None } @@ -45,8 +50,13 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { - tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::error!( + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", + ); } None } @@ -106,7 +116,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( suppressed_count, - column_name = name, + column = name, error = %err.as_report(), "parse column failed", ); @@ -125,7 +135,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( suppressed_count, - column_name = name, + column = name, error = %err.as_report(), "parse column failed", ); diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index acfbe5e4ae43..f2c541423eb7 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -39,12 +39,12 @@ macro_rules! handle_list_data_type { } } Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } } @@ -61,12 +61,12 @@ macro_rules! handle_list_data_type { } } Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } } @@ -80,12 +80,12 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(v)), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } None @@ -97,12 +97,12 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } None @@ -147,10 +147,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - suppressed_count = sc, - column_name = name, + suppressed_count, + column = name, error = %err.as_report(), "parse uuid column failed", ); @@ -181,10 +181,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - suppressed_count = sc, - column_name = name, + suppressed_count, + column = name, error = %err.as_report(), "parse column failed", ); @@ -278,10 +278,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - suppressed_count = sc, - column_name = name, + suppressed_count, + column = name, error = %err.as_report(), "parse column failed", ); From 356eb25ef5813485f506f04b72da345057887083 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 29 Feb 2024 13:29:34 +0800 Subject: [PATCH 2/2] use tracing in tests Signed-off-by: Bugen Zhao --- src/common/src/log.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/common/src/log.rs b/src/common/src/log.rs index e7030fa1883f..b320a111f491 100644 --- a/src/common/src/log.rs +++ b/src/common/src/log.rs @@ -75,10 +75,16 @@ mod tests { use std::sync::LazyLock; use std::time::Duration; + use tracing_subscriber::util::SubscriberInitExt; + use super::*; #[tokio::test] async fn demo() { + let _logger = tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::ERROR) + .set_default(); + let mut interval = tokio::time::interval(Duration::from_millis(100)); for _ in 0..100 { interval.tick().await; @@ -89,7 +95,7 @@ mod tests { }); if let Ok(suppressed_count) = RATE_LIMITER.check() { - println!("failed to foo bar. suppressed_count = {:?}", suppressed_count); + tracing::error!(suppressed_count, "failed to foo bar"); } } }