diff --git a/src/common/src/log.rs b/src/common/src/log.rs index f462c6c5e13da..b320a111f4915 100644 --- a/src/common/src/log.rs +++ b/src/common/src/log.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::num::NonZeroU32; +use std::num::{NonZeroU32, NonZeroUsize}; use std::sync::atomic::{AtomicUsize, Ordering}; use governor::Quota; @@ -46,10 +46,13 @@ impl LogSuppresser { /// Check if the log should be suppressed. /// If the log should be suppressed, return `Err(LogSuppressed)`. - /// Otherwise, return `Ok(usize)` with count of suppressed messages before. - pub fn check(&self) -> core::result::Result { + /// Otherwise, return `Ok(Some(..))` with count of suppressed messages since last check, + /// or `Ok(None)` if there's none. + pub fn check(&self) -> core::result::Result, LogSuppressed> { match self.rate_limiter.check() { - Ok(()) => Ok(self.suppressed_count.swap(0, Ordering::Relaxed)), + Ok(()) => Ok(NonZeroUsize::new( + self.suppressed_count.swap(0, Ordering::Relaxed), + )), Err(_) => { self.suppressed_count.fetch_add(1, Ordering::Relaxed); Err(LogSuppressed) @@ -72,10 +75,16 @@ mod tests { use std::sync::LazyLock; use std::time::Duration; + use tracing_subscriber::util::SubscriberInitExt; + use super::*; #[tokio::test] async fn demo() { + let _logger = tracing_subscriber::fmt::Subscriber::builder() + .with_max_level(tracing::Level::ERROR) + .set_default(); + let mut interval = tokio::time::interval(Duration::from_millis(100)); for _ in 0..100 { interval.tick().await; @@ -86,7 +95,7 @@ mod tests { }); if let Ok(suppressed_count) = RATE_LIMITER.check() { - println!("failed to foo bar. suppressed_count = {}", suppressed_count); + tracing::error!(suppressed_count, "failed to foo bar"); } } } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 958f4c9ca5db5..a58ad884fd886 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -97,11 +97,11 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult { LazyLock::new(LogSuppresser::default); if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::warn!( - "RisingWave supports decimal precision up to {}, but got {}. Will truncate. ({} suppressed)", - Decimal::MAX_PRECISION, - suppressed_count, - precision - ); + suppressed_count, + "RisingWave supports decimal precision up to {}, but got {}. Will truncate.", + Decimal::MAX_PRECISION, + precision + ); } } DataType::Decimal diff --git a/src/connector/src/parser/mysql.rs b/src/connector/src/parser/mysql.rs index 4d0480ab46130..5da838c8ff4fb 100644 --- a/src/connector/src/parser/mysql.rs +++ b/src/connector/src/parser/mysql.rs @@ -33,8 +33,13 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(v)), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { - tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::error!( + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", + ); } None } @@ -45,8 +50,13 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { - tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::error!( + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", + ); } None } @@ -106,7 +116,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( suppressed_count, - column_name = name, + column = name, error = %err.as_report(), "parse column failed", ); @@ -125,7 +135,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( suppressed_count, - column_name = name, + column = name, error = %err.as_report(), "parse column failed", ); diff --git a/src/connector/src/parser/postgres.rs b/src/connector/src/parser/postgres.rs index acfbe5e4ae435..f2c541423eb71 100644 --- a/src/connector/src/parser/postgres.rs +++ b/src/connector/src/parser/postgres.rs @@ -39,12 +39,12 @@ macro_rules! handle_list_data_type { } } Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } } @@ -61,12 +61,12 @@ macro_rules! handle_list_data_type { } } Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } } @@ -80,12 +80,12 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(v)), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } None @@ -97,12 +97,12 @@ macro_rules! handle_data_type { match res { Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - "parse column \"{}\" fail: {} ({} suppressed)", - $name, - err, - sc + column = $name, + error = %err.as_report(), + suppressed_count, + "parse column failed", ); } None @@ -147,10 +147,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - suppressed_count = sc, - column_name = name, + suppressed_count, + column = name, error = %err.as_report(), "parse uuid column failed", ); @@ -181,10 +181,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O match res { Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())), Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - suppressed_count = sc, - column_name = name, + suppressed_count, + column = name, error = %err.as_report(), "parse column failed", ); @@ -278,10 +278,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O } } Err(err) => { - if let Ok(sc) = LOG_SUPPERSSER.check() { + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { tracing::error!( - suppressed_count = sc, - column_name = name, + suppressed_count, + column = name, error = %err.as_report(), "parse column failed", );