Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: minor refactor: remove SourceInfo #15563

Merged
merged 12 commits into from
Mar 11, 2024
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

42 changes: 10 additions & 32 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,53 +1676,31 @@ def section_streaming_errors(outer_panels):
[
panels.timeseries_count(
"Compute Errors by Type",
"",
"Errors that happened during computation. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_compute_error')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Errors by Type",
"",
[
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Reader Errors by Type",
"",
"Errors that happened during source data ingestion. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_source_reader_error_count')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
),
panels.target(
f"sum({metric('user_source_reader_error')}) by (error_type, error_msg, actor_id, source_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: actor_id={{actor_id}}, source_id={{source_id}})",
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Sink by Connector",
"",
"Sink Errors by Type",
"Errors that happened during data sink out. Check the logs for detailed error message.",
[
panels.target(
f"sum({metric('user_sink_error')}) by (connector_name, executor_id, error_msg)",
"{{connector_name}}: {{error_msg}} ({{executor_id}})",
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,16 @@ def section_overview(panels):
"Errors in the system group by type",
[
panels.target(
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"compute error {{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
f"sum({metric('user_compute_error')}) by (error_type, executor_name, fragment_id)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"parse error {{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
f"sum({metric('user_source_error')}) by (error_type, source_id, source_name, fragment_id)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"sum({metric('user_sink_error')}) by (error_type, sink_id, sink_name, fragment_id)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"{metric('source_status_is_up')} == 0",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
string sink_name = 8;
}

enum SinkPayloadFormat {
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl SourceExecutor {
self.source_ctrl_opts.clone(),
None,
ConnectorProperties::default(),
"NA".to_owned(), // FIXME: source name was not passed in batch plan
"NA".to_owned(), // source name was not passed in batch plan
));
let stream = self
.source
Expand Down
1 change: 1 addition & 0 deletions src/bench/sink_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ async fn main() {
.unwrap();
let sink_param = SinkParam {
sink_id: SinkId::new(1),
sink_name: cfg.sink.clone(),
properties,
columns: table_schema.get_sink_schema(),
downstream_pk: table_schema.pk_indices,
Expand Down
44 changes: 0 additions & 44 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};
use std::time::{Duration, SystemTime};

use thiserror::Error;
use thiserror_ext::Macro;
Expand All @@ -24,8 +22,6 @@ pub mod v2 {
pub use risingwave_error::*;
}

const ERROR_SUPPRESSOR_RESET_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h

pub trait Error = std::error::Error + Send + Sync + 'static;
pub type BoxedError = Box<dyn Error>;

Expand Down Expand Up @@ -183,46 +179,6 @@ macro_rules! bail {
};
}

#[derive(Debug)]
pub struct ErrorSuppressor {
max_unique: usize,
unique: HashSet<String>,
last_reset_time: SystemTime,
}

impl ErrorSuppressor {
pub fn new(max_unique: usize) -> Self {
Self {
max_unique,
last_reset_time: SystemTime::now(),
unique: Default::default(),
}
}

pub fn suppress_error(&mut self, error: &str) -> bool {
self.try_reset();
if self.unique.contains(error) {
false
} else if self.unique.len() < self.max_unique {
self.unique.insert(error.to_string());
false
} else {
// We have exceeded the capacity.
true
}
}

pub fn max(&self) -> usize {
self.max_unique
}

fn try_reset(&mut self) {
if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPPRESSOR_RESET_DURATION {
*self = Self::new(self.max_unique)
}
}
}

#[cfg(test)]
mod tests {
use std::convert::Into;
Expand Down
47 changes: 11 additions & 36 deletions src/common/src/metrics/error_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ impl<const N: usize> ErrorMetric<N> {

pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>;

/// Metrics for counting errors in the system.
/// The detailed error messages are not supposed to be stored in the metrics, but in the logs.
///
/// Please avoid adding new error metrics here. Instead, introduce new `error_type` for new errors.
#[derive(Clone)]
pub struct ErrorMetrics {
pub user_sink_error: ErrorMetricRef<3>,
pub user_compute_error: ErrorMetricRef<4>,
pub user_source_reader_error: ErrorMetricRef<5>,
pub user_source_error: ErrorMetricRef<5>,
pub cdc_source_error: ErrorMetricRef<3>,
pub user_sink_error: ErrorMetricRef<4>,
pub user_compute_error: ErrorMetricRef<3>,
pub user_source_error: ErrorMetricRef<4>,
}

impl ErrorMetrics {
Expand All @@ -95,40 +97,17 @@ impl ErrorMetrics {
user_sink_error: Arc::new(ErrorMetric::new(
"user_sink_error",
"Sink errors in the system, queryable by tags",
&["connector_name", "executor_id", "error_msg"],
&["error_type", "sink_id", "sink_name", "fragment_id"],
)),
user_compute_error: Arc::new(ErrorMetric::new(
"user_compute_error",
"Compute errors in the system, queryable by tags",
&["error_type", "error_msg", "executor_name", "fragment_id"],
)),
user_source_reader_error: Arc::new(ErrorMetric::new(
"user_source_reader_error",
"Source reader error count",
&[
"error_type",
"error_msg",
"executor_name",
"actor_id",
"source_id",
],
&["error_type", "executor_name", "fragment_id"],
)),
user_source_error: Arc::new(ErrorMetric::new(
"user_source_error_count",
"user_source_error",
"Source errors in the system, queryable by tags",
&[
"error_type",
"error_msg",
"executor_name",
"fragment_id",
"table_id",
],
)),
// cdc source is singleton, so we use source_id to identify the connector
cdc_source_error: Arc::new(ErrorMetric::new(
"cdc_source_error",
"CDC source errors in the system, queryable by tags",
&["connector_name", "source_id", "error_msg"],
&["error_type", "source_id", "source_name", "fragment_id"],
)),
}
}
Expand All @@ -137,19 +116,15 @@ impl ErrorMetrics {
vec![
&self.user_sink_error.desc,
&self.user_compute_error.desc,
&self.user_source_reader_error.desc,
&self.user_source_error.desc,
&self.cdc_source_error.desc,
]
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
vec![
self.user_sink_error.collect(),
self.user_compute_error.collect(),
self.user_source_reader_error.collect(),
self.user_source_error.collect(),
self.cdc_source_error.collect(),
]
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/debezium/debezium_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ mod tests {
}),
protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
};
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
};
let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
.await
.unwrap();
Expand Down
24 changes: 15 additions & 9 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
use risingwave_common::bail;
use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME};
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::{Datum, Scalar, ScalarImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::tracing::InstrumentStream;
Expand Down Expand Up @@ -588,7 +589,8 @@ impl<P: ByteStreamSourceParser> P {
///
/// A [`ChunkSourceStream`] which is a stream of parsed messages.
pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream {
let source_info = self.source_ctx().source_info.clone();
let actor_id = self.source_ctx().actor_id;
let source_id = self.source_ctx().source_id.table_id();

// Ensure chunk size is smaller than rate limit
let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit {
Expand All @@ -599,13 +601,8 @@ impl<P: ByteStreamSourceParser> P {

// The parser stream will be long-lived. We use `instrument_with` here to create
// a new span for the polling of each chunk.
into_chunk_stream(self, data_stream).instrument_with(move || {
tracing::info_span!(
"source_parse_chunk",
actor_id = source_info.actor_id,
source_id = source_info.source_id.table_id()
)
})
into_chunk_stream(self, data_stream)
.instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
}
}

Expand Down Expand Up @@ -712,7 +709,16 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
"failed to parse message, skipping"
);
}
parser.source_ctx().report_user_source_error(&error);

// report to error metrics
let context = parser.source_ctx();
GLOBAL_ERROR_METRICS.user_source_error.report([
// TODO(eric): output ConnectorError's variant as label
"source_parser".to_owned(),
context.source_id.to_string(),
context.source_name.clone(),
context.fragment_id.to_string(),
]);
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ mod tests {
.map(|c| SourceColumnDesc::from(&c.column_desc))
.collect::<Vec<_>>();

let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::PostgresCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::PostgresCdc(Box::default()),
..Default::default()
};
let source_ctx = Arc::new(source_ctx);
// format plain encode json parser
let parser = PlainParser::new(
Expand Down Expand Up @@ -343,8 +345,10 @@ mod tests {
.collect::<Vec<_>>();

// format plain encode json parser
let mut source_ctx = SourceContext::default();
source_ctx.connector_props = ConnectorProperties::MysqlCdc(Box::default());
let source_ctx = SourceContext {
connector_props: ConnectorProperties::MysqlCdc(Box::default()),
..Default::default()
};
let mut parser = PlainParser::new(
SpecificParserConfig::DEFAULT_PLAIN_JSON,
columns.clone(),
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,19 @@ pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SinkParam {
pub sink_id: SinkId,
pub sink_name: String,
pub properties: HashMap<String, String>,
pub columns: Vec<ColumnDesc>,
pub downstream_pk: Vec<usize>,
pub sink_type: SinkType,
pub format_desc: Option<SinkFormatDesc>,
pub db_name: String,

/// - For `CREATE SINK ... FROM ...`, the name of the source table.
/// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
///
/// See also `gen_sink_plan`.
// TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
pub sink_from_name: String,
}

Expand All @@ -171,6 +178,7 @@ impl SinkParam {
};
Self {
sink_id: SinkId::from(pb_param.sink_id),
sink_name: pb_param.sink_name,
properties: pb_param.properties,
columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
downstream_pk: table_schema
Expand All @@ -190,6 +198,7 @@ impl SinkParam {
pub fn to_proto(&self) -> PbSinkParam {
PbSinkParam {
sink_id: self.sink_id.sink_id,
sink_name: self.sink_name.clone(),
properties: self.properties.clone(),
table_schema: Some(TableSchema {
columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
Expand Down Expand Up @@ -217,6 +226,7 @@ impl From<SinkCatalog> for SinkParam {
.collect();
Self {
sink_id: sink_catalog.id,
sink_name: sink_catalog.name,
properties: sink_catalog.properties,
columns,
downstream_pk: sink_catalog.downstream_pk,
Expand Down
Loading
Loading