Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: do not output suppressed count when it's zero #15333

Merged
merged 3 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/common/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU32;
use std::num::{NonZeroU32, NonZeroUsize};
use std::sync::atomic::{AtomicUsize, Ordering};

use governor::Quota;
Expand Down Expand Up @@ -46,10 +46,13 @@ impl LogSuppresser {

/// Check if the log should be suppressed.
/// If the log should be suppressed, return `Err(LogSuppressed)`.
/// Otherwise, return `Ok(usize)` with count of suppressed messages before.
pub fn check(&self) -> core::result::Result<usize, LogSuppressed> {
/// Otherwise, return `Ok(Some(..))` with count of suppressed messages since last check,
/// or `Ok(None)` if there's none.
pub fn check(&self) -> core::result::Result<Option<NonZeroUsize>, LogSuppressed> {
match self.rate_limiter.check() {
Ok(()) => Ok(self.suppressed_count.swap(0, Ordering::Relaxed)),
Ok(()) => Ok(NonZeroUsize::new(
self.suppressed_count.swap(0, Ordering::Relaxed),
)),
Err(_) => {
self.suppressed_count.fetch_add(1, Ordering::Relaxed);
Err(LogSuppressed)
Expand All @@ -72,10 +75,16 @@ mod tests {
use std::sync::LazyLock;
use std::time::Duration;

use tracing_subscriber::util::SubscriberInitExt;

use super::*;

#[tokio::test]
async fn demo() {
let _logger = tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(tracing::Level::ERROR)
.set_default();

let mut interval = tokio::time::interval(Duration::from_millis(100));
for _ in 0..100 {
interval.tick().await;
Expand All @@ -86,7 +95,7 @@ mod tests {
});

if let Ok(suppressed_count) = RATE_LIMITER.check() {
println!("failed to foo bar. suppressed_count = {}", suppressed_count);
tracing::error!(suppressed_count, "failed to foo bar");
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ fn avro_type_mapping(schema: &Schema) -> ConnectorResult<DataType> {
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
"RisingWave supports decimal precision up to {}, but got {}. Will truncate. ({} suppressed)",
Decimal::MAX_PRECISION,
suppressed_count,
precision
);
suppressed_count,
"RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
Decimal::MAX_PRECISION,
precision
);
}
}
DataType::Decimal
Expand Down
22 changes: 16 additions & 6 deletions src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
}
Expand All @@ -45,8 +50,13 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
tracing::error!("parse column `{}` fail: {} ({} suppressed)", $name, err, sc);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
}
Expand Down Expand Up @@ -106,7 +116,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column_name = name,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand All @@ -125,7 +135,7 @@ pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> Owne
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count,
column_name = name,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand Down
58 changes: 29 additions & 29 deletions src/connector/src/parser/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ macro_rules! handle_list_data_type {
}
}
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
}
Expand All @@ -61,12 +61,12 @@ macro_rules! handle_list_data_type {
}
}
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
}
Expand All @@ -80,12 +80,12 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v)),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
Expand All @@ -97,12 +97,12 @@ macro_rules! handle_data_type {
match res {
Ok(val) => val.map(|v| ScalarImpl::from(<$rw_type>::from(v))),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
"parse column \"{}\" fail: {} ({} suppressed)",
$name,
err,
sc
column = $name,
error = %err.as_report(),
suppressed_count,
"parse column failed",
);
}
None
Expand Down Expand Up @@ -147,10 +147,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.to_string())),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
suppressed_count,
column = name,
error = %err.as_report(),
"parse uuid column failed",
);
Expand Down Expand Up @@ -181,10 +181,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
match res {
Ok(val) => val.map(|v| ScalarImpl::from(v.into_boxed_slice())),
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand Down Expand Up @@ -278,10 +278,10 @@ pub fn postgres_row_to_owned_row(row: tokio_postgres::Row, schema: &Schema) -> O
}
}
Err(err) => {
if let Ok(sc) = LOG_SUPPERSSER.check() {
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
suppressed_count = sc,
column_name = name,
suppressed_count,
column = name,
error = %err.as_report(),
"parse column failed",
);
Expand Down
Loading