Skip to content

Commit

Permalink
feat(source): suppress parser log (#14005)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Dec 18, 2023
1 parent 69f24de commit beabcb2
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 55 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ethnum = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.4", features = ["std"] }
fs-err = "2"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
governor = { version = "0.6", default-features = false, features = ["std"] }
hex = "0.4.3"
http = "0.2"
humantime = "2.1"
Expand Down
8 changes: 4 additions & 4 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod error;
pub mod array;
#[macro_use]
pub mod util;
pub mod acl;
pub mod buffer;
pub mod cache;
pub mod cast;
Expand All @@ -65,17 +66,16 @@ pub mod constants;
pub mod estimate_size;
pub mod field_generator;
pub mod hash;
pub mod log;
pub mod memory;
pub mod metrics;
pub mod monitor;
pub mod row;
pub mod session_config;
pub mod system_param;
pub mod telemetry;
pub mod transaction;

pub mod acl;
pub mod metrics;
pub mod test_utils;
pub mod transaction;
pub mod types;
pub mod vnode_mapping;

Expand Down
93 changes: 93 additions & 0 deletions src/common/src/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU32;
use std::sync::atomic::{AtomicUsize, Ordering};

use governor::Quota;

type RateLimiter = governor::RateLimiter<
governor::state::NotKeyed,
governor::state::InMemoryState,
governor::clock::MonotonicClock,
>;

/// `LogSuppresser` is a helper to suppress log spamming.
pub struct LogSuppresser {
/// The number of times the log has been suppressed. Will be returned and cleared when the
/// rate limiter allows next log to be printed.
suppressed_count: AtomicUsize,

/// Inner rate limiter.
rate_limiter: RateLimiter,
}

#[derive(Debug)]
pub struct LogSuppressed;

impl LogSuppresser {
pub fn new(rate_limiter: RateLimiter) -> Self {
Self {
suppressed_count: AtomicUsize::new(0),
rate_limiter,
}
}

/// Check if the log should be suppressed.
/// If the log should be suppressed, return `Err(LogSuppressed)`.
/// Otherwise, return `Ok(usize)` with count of suppressed messages before.
pub fn check(&self) -> core::result::Result<usize, LogSuppressed> {
match self.rate_limiter.check() {
Ok(()) => Ok(self.suppressed_count.swap(0, Ordering::Relaxed)),
Err(_) => {
self.suppressed_count.fetch_add(1, Ordering::Relaxed);
Err(LogSuppressed)
}
}
}
}

impl Default for LogSuppresser {
/// Default rate limiter allows 1 log per second.
fn default() -> Self {
Self::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(1).unwrap(),
)))
}
}

#[cfg(test)]
mod tests {
use std::sync::LazyLock;
use std::time::Duration;

use super::*;

#[tokio::test]
async fn demo() {
let mut interval = tokio::time::interval(Duration::from_millis(100));
for _ in 0..100 {
interval.tick().await;
static RATE_LIMITER: LazyLock<LogSuppresser> = LazyLock::new(|| {
LogSuppresser::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(5).unwrap(),
)))
});

if let Ok(suppressed_count) = RATE_LIMITER.check() {
println!("failed to foo bar. suppressed_count = {}", suppressed_count);
}
}
}
}
12 changes: 10 additions & 2 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use apache_avro::schema::{DecimalSchema, RecordSchema, Schema};
use itertools::Itertools;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Decimal};
use risingwave_pb::plan_common::ColumnDesc;

Expand Down Expand Up @@ -82,11 +85,16 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result<DataType> {
Schema::Double => DataType::Float64,
Schema::Decimal(DecimalSchema { precision, .. }) => {
if *precision > Decimal::MAX_PRECISION.into() {
tracing::warn!(
"RisingWave supports decimal precision up to {}, but got {}. Will truncate.",
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
"RisingWave supports decimal precision up to {}, but got {}. Will truncate. ({} suppressed)",
Decimal::MAX_PRECISION,
suppressed_count,
precision
);
}
}
DataType::Decimal
}
Expand Down
38 changes: 25 additions & 13 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::LazyLock;

use auto_enums::auto_enum;
pub use avro::AvroParserConfig;
Expand All @@ -28,6 +29,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{Datum, Scalar};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::{
Expand Down Expand Up @@ -328,13 +330,18 @@ impl SourceStreamChunkRowWriter<'_> {
// TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
// TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
// see #13105
tracing::warn!(
%error,
split_id = self.row_meta.as_ref().map(|m| m.split_id),
offset = self.row_meta.as_ref().map(|m| m.offset),
column = desc.name,
"failed to parse non-pk column, padding with `NULL`"
);
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(
%error,
split_id = self.row_meta.as_ref().map(|m| m.split_id),
offset = self.row_meta.as_ref().map(|m| m.offset),
column = desc.name,
suppressed_count,
"failed to parse non-pk column, padding with `NULL`"
);
}
Ok(A::output_for(Datum::None))
}
}
Expand Down Expand Up @@ -598,12 +605,17 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
if let Err(error) = res {
// TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
// see #13105
tracing::error!(
%error,
split_id = &*msg.split_id,
offset = msg.offset,
"failed to parse message, skipping"
);
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(
%error,
split_id = &*msg.split_id,
offset = msg.offset,
suppressed_count,
"failed to parse message, skipping"
);
}
parser.source_ctx().report_user_source_error(error);
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use chrono::NaiveDate;
use mysql_async::Row as MysqlRow;
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{
DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
Expand Down Expand Up @@ -88,7 +91,11 @@ pub fn mysql_row_to_datums(mysql_row: &mut MysqlRow, schema: &Schema) -> Vec<Dat
| DataType::Int256
| DataType::Serial => {
// Interval, Struct, List, Int256 are not supported
tracing::warn!(rw_field.name, ?rw_field.data_type, "unsupported data type, set to null");
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(rw_field.name, ?rw_field.data_type, suppressed_count, "unsupported data type, set to null");
}
None
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use anyhow::anyhow;
use apache_avro::schema::{DecimalSchema, RecordSchema};
Expand All @@ -24,6 +25,7 @@ use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::error::Result as RwResult;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time};
use risingwave_common::util::iter_util::ZipEqFast;

Expand Down Expand Up @@ -56,7 +58,13 @@ impl<'a> AvroParseOptions<'a> {
self.schema
.map(|schema| avro_extract_field_schema(schema, key))
.transpose()
.map_err(|_err| tracing::error!("extract sub-schema"))
.map_err(|_err| {
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(suppressed_count, "extract sub-schema");
}
})
.ok()
.flatten()
}
Expand Down
7 changes: 6 additions & 1 deletion src/connector/src/parser/unified/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use base64::Engine;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz, str_to_bytea};
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
Expand Down Expand Up @@ -464,7 +466,10 @@ impl JsonParseOptions {
path: struct_type_info.to_string(), // TODO: this is not good, we should maintain a path stack
};
// TODO: is it possible to unify the logging with the one in `do_action`?
tracing::warn!(%error, "undefined nested field, padding with `NULL`");
static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::warn!(%error, suppressed_count, "undefined nested field, padding with `NULL`");
}
&BorrowedValue::Static(simd_json::StaticNode::Null)
});
self.parse(field_value, Some(field_type))
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/parser/unified/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use anyhow::anyhow;
use prost_reflect::{DescriptorPool, DynamicMessage, ReflectMessage};
use risingwave_common::error::ErrorCode::ProtocolError;
use risingwave_common::error::RwError;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::DataType;

use super::{Access, AccessResult};
Expand Down Expand Up @@ -47,7 +48,11 @@ impl Access for ProtobufAccess {
.get_field_by_name(path[0])
.ok_or_else(|| {
let err_msg = format!("protobuf schema don't have field {}", path[0]);
tracing::error!(err_msg);
static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
LazyLock::new(LogSuppresser::default);
if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
tracing::error!(suppressed_count, err_msg);
}
RwError::from(ProtocolError(err_msg))
})
.map_err(|e| AccessError::Other(anyhow!(e)))?;
Expand Down
Loading

0 comments on commit beabcb2

Please sign in to comment.