From 870790cad2604f1d0172c56eb2ba6c4206e988cb Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 13 Sep 2024 17:58:46 +0800 Subject: [PATCH] fix: make guarded label metrics correctly drop (#18454) (#18530) --- src/connector/src/parser/mod.rs | 15 ++++-- src/connector/src/source/cdc/source/reader.rs | 8 +-- src/connector/src/source/common.rs | 53 ++++++++++++------- .../src/source/datagen/source/reader.rs | 31 +++++++---- .../opendal_source/opendal_reader.rs | 34 +++++------- .../src/source/filesystem/s3/source/reader.rs | 34 +++++------- .../src/source/kafka/enumerator/client.rs | 32 +++++++---- .../src/source/kafka/source/reader.rs | 37 +++++++------ .../src/source/nexmark/source/reader.rs | 39 +++++++------- 9 files changed, 155 insertions(+), 128 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 4b14654bf518d..32a1825697c43 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::sync::LazyLock; @@ -710,6 +711,7 @@ async fn into_chunk_stream_inner( len: usize, } let mut current_transaction = None; + let mut direct_cdc_event_lag_latency_metrics = HashMap::new(); #[for_await] for batch in data_stream { @@ -759,10 +761,15 @@ async fn into_chunk_stream_inner( if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta { let lag_ms = process_time_ms - msg_meta.source_ts_ms; // report to promethus - GLOBAL_SOURCE_METRICS - .direct_cdc_event_lag_latency - .with_guarded_label_values(&[&msg_meta.full_table_name]) - .observe(lag_ms as f64); + let full_table_name = msg_meta.full_table_name.clone(); + let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics + .entry(full_table_name) + .or_insert_with(|| { + GLOBAL_SOURCE_METRICS + .direct_cdc_event_lag_latency + .with_guarded_label_values(&[&msg_meta.full_table_name]) + }); + direct_cdc_event_lag_latency.observe(lag_ms as f64); } let old_len = builder.len(); diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index b29ef1312bbd9..e2fc405cd6297 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -213,15 +213,15 @@ impl CdcSplitReader { let mut rx = self.rx; let source_id = self.source_id.to_string(); let metrics = self.source_ctx.metrics.clone(); + let connector_source_rows_received_metrics = metrics + .connector_source_rows_received + .with_guarded_label_values(&[source_type.as_str_name(), &source_id]); while let Some(result) = rx.recv().await { match result { Ok(GetEventStreamResponse { events, .. }) => { tracing::trace!("receive {} cdc events ", events.len()); - metrics - .connector_source_rows_received - .with_guarded_label_values(&[source_type.as_str_name(), &source_id]) - .inc_by(events.len() as u64); + connector_source_rows_received_metrics.inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; } diff --git a/src/connector/src/source/common.rs b/src/connector/src/source/common.rs index 3acb85a87150e..80aacff2899c7 100644 --- a/src/connector/src/source/common.rs +++ b/src/connector/src/source/common.rs @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; @@ -33,6 +34,8 @@ pub(crate) async fn into_chunk_stream( let source_id = source_ctx.source_id.to_string(); let source_name = source_ctx.source_name.to_string(); let metrics = source_ctx.metrics.clone(); + let mut partition_input_count = HashMap::new(); + let mut partition_bytes_count = HashMap::new(); // add metrics to the data stream let data_stream = data_stream @@ -40,22 +43,38 @@ pub(crate) async fn into_chunk_stream( let mut by_split_id = std::collections::HashMap::new(); for msg in data_batch { + let split_id: String = msg.split_id.as_ref().to_string(); by_split_id - .entry(msg.split_id.as_ref()) + .entry(split_id.clone()) .or_insert_with(Vec::new) .push(msg); + partition_input_count + .entry(split_id.clone()) + .or_insert_with(|| { + metrics.partition_input_count.with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id.clone(), + &source_name, + &fragment_id, + ]) + }); + partition_bytes_count + .entry(split_id.clone()) + .or_insert_with(|| { + metrics.partition_input_bytes.with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]) + }); } - for (split_id, msgs) in by_split_id { - metrics - .partition_input_count - .with_guarded_label_values(&[ - &actor_id, - &source_id, - split_id, - &source_name, - &fragment_id, - ]) + partition_input_count + .get_mut(&split_id) + .unwrap() .inc_by(msgs.len() as u64); let sum_bytes = msgs @@ -63,15 +82,9 @@ pub(crate) async fn into_chunk_stream( .flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64)) .sum(); - metrics - .partition_input_bytes - .with_guarded_label_values(&[ - &actor_id, - &source_id, - split_id, - &source_name, - &fragment_id, - ]) + partition_input_count + .get_mut(&split_id) + .unwrap() .inc_by(sum_bytes); } }) diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index e6c6db6af5a71..33c0c4ea29261 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -18,6 +18,7 @@ use anyhow::Context; use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty}; +use risingwave_common_estimate_size::EstimateSize; use thiserror_ext::AsReport; use super::generator::DatagenEventGenerator; @@ -156,20 +157,30 @@ impl SplitReader for DatagenSplitReader { let source_name = self.source_ctx.source_name.to_string(); let split_id = self.split_id.to_string(); let metrics = self.source_ctx.metrics.clone(); + let partition_input_count_metric = + metrics.partition_input_count.with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]); + let partition_input_bytes_metric = + metrics.partition_input_bytes.with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]); + spawn_data_generation_stream( self.generator .into_native_stream() .inspect_ok(move |stream_chunk| { - metrics - .partition_input_count - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(stream_chunk.cardinality() as u64); + partition_input_count_metric.inc_by(stream_chunk.cardinality() as u64); + partition_input_bytes_metric + .inc_by(stream_chunk.estimated_size() as u64); }), BUFFER_SIZE, ) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 5757452d2b4cd..1cfc9c1355167 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -176,6 +176,16 @@ impl OpendalReader { let mut offset: usize = split.offset; let mut batch_size: usize = 0; let mut batch = Vec::new(); + let partition_input_bytes_metrics = source_ctx + .metrics + .partition_input_bytes + .with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]); let stream = ReaderStream::with_capacity(buf_reader, STREAM_READER_CAPACITY); #[for_await] for read in stream { @@ -193,34 +203,14 @@ impl OpendalReader { batch.push(msg); if batch.len() >= max_chunk_size { - source_ctx - .metrics - .partition_input_bytes - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(batch_size as u64); + partition_input_bytes_metrics.inc_by(batch_size as u64); let yield_batch = std::mem::take(&mut batch); batch_size = 0; yield yield_batch; } } if !batch.is_empty() { - source_ctx - .metrics - .partition_input_bytes - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(batch_size as u64); + partition_input_bytes_metrics.inc_by(batch_size as u64); yield batch; } } diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index 7e02102686d00..910c98c1a5dae 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -106,6 +106,16 @@ impl S3FileReader { let mut offset: usize = split.offset; let mut batch_size: usize = 0; let mut batch = Vec::new(); + let partition_input_bytes_metrics = source_ctx + .metrics + .partition_input_bytes + .with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]); #[for_await] for read in stream { let bytes = read?; @@ -121,34 +131,14 @@ impl S3FileReader { batch_size += len; batch.push(msg); if batch.len() >= max_chunk_size { - source_ctx - .metrics - .partition_input_bytes - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(batch_size as u64); + partition_input_bytes_metrics.inc_by(batch_size as u64); let yield_batch = std::mem::take(&mut batch); batch_size = 0; yield yield_batch; } } if !batch.is_empty() { - source_ctx - .metrics - .partition_input_bytes - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(batch_size as u64); + partition_input_bytes_metrics.inc_by(batch_size as u64); yield batch; } } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index ff007076c1338..194a8bed92645 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -17,10 +17,12 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use prometheus::core::{AtomicI64, GenericGauge}; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; use rdkafka::{Offset, TopicPartitionList}; use risingwave_common::bail; +use risingwave_common::metrics::LabelGuardedMetric; use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; @@ -49,6 +51,7 @@ pub struct KafkaSplitEnumerator { stop_offset: KafkaEnumeratorOffset, sync_call_timeout: Duration, + high_watermark_metrics: HashMap, 2>>, } impl KafkaSplitEnumerator {} @@ -124,6 +127,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { start_offset: scan_start_offset, stop_offset: KafkaEnumeratorOffset::None, sync_call_timeout: properties.common.sync_call_timeout, + high_watermark_metrics: HashMap::new(), }) } @@ -160,7 +164,10 @@ impl SplitEnumerator for KafkaSplitEnumerator { } impl KafkaSplitEnumerator { - async fn get_watermarks(&self, partitions: &[i32]) -> KafkaResult> { + async fn get_watermarks( + &mut self, + partitions: &[i32], + ) -> KafkaResult> { let mut map = HashMap::new(); for partition in partitions { let (low, high) = self @@ -357,15 +364,20 @@ impl KafkaSplitEnumerator { } #[inline] - fn report_high_watermark(&self, partition: i32, offset: i64) { - self.context - .metrics - .high_watermark - .with_guarded_label_values(&[ - &self.context.info.source_id.to_string(), - &partition.to_string(), - ]) - .set(offset); + fn report_high_watermark(&mut self, partition: i32, offset: i64) { + let high_watermark_metrics = + self.high_watermark_metrics + .entry(partition) + .or_insert_with(|| { + self.context + .metrics + .high_watermark + .with_guarded_label_values(&[ + &self.context.info.source_id.to_string(), + &partition.to_string(), + ]) + }); + high_watermark_metrics.set(offset); } pub async fn check_reachability(&self) -> bool { diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 5ace1820b4249..a97d34b408750 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -21,10 +21,12 @@ use anyhow::Context; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use prometheus::core::{AtomicI64, GenericGauge}; use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; +use risingwave_common::metrics::LabelGuardedMetric; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use crate::error::ConnectorResult as Result; @@ -157,21 +159,6 @@ impl SplitReader for KafkaSplitReader { } } -impl KafkaSplitReader { - fn report_latest_message_id(&self, split_id: &str, offset: i64) { - self.source_ctx - .metrics - .latest_message_id - .with_guarded_label_values(&[ - // source name is not available here - &self.source_ctx.source_id.to_string(), - &self.source_ctx.actor_id.to_string(), - split_id, - ]) - .set(offset); - } -} - impl KafkaSplitReader { #[try_stream(ok = Vec, error = crate::error::ConnectorError)] async fn into_data_stream(self) { @@ -208,6 +195,11 @@ impl KafkaSplitReader { ) }); + let mut latest_message_id_metrics: HashMap< + String, + LabelGuardedMetric, 3>, + > = HashMap::new(); + #[for_await] 'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) { let msgs: Vec<_> = msgs @@ -222,7 +214,20 @@ impl KafkaSplitReader { for (partition, offset) in split_msg_offsets { let split_id = partition.to_string(); - self.report_latest_message_id(&split_id, offset); + latest_message_id_metrics + .entry(split_id.clone()) + .or_insert_with(|| { + self.source_ctx + .metrics + .latest_message_id + .with_guarded_label_values(&[ + // source name is not available here + &self.source_ctx.source_id.to_string(), + &self.source_ctx.actor_id.to_string(), + &split_id, + ]) + }) + .set(offset); } for msg in msgs { diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index ebcbc0b0aaf32..aea85c5c551cf 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -115,31 +115,30 @@ impl SplitReader for NexmarkSplitReader { let split_id = self.split_id.clone(); let metrics = self.source_ctx.metrics.clone(); + let partition_input_count_metric = + metrics.partition_input_count.with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]); + let partition_input_bytes_metric = + metrics.partition_input_bytes.with_guarded_label_values(&[ + &actor_id, + &source_id, + &split_id, + &source_name, + &fragment_id, + ]); + // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; spawn_data_generation_stream( self.into_native_stream() .inspect_ok(move |chunk: &StreamChunk| { - metrics - .partition_input_count - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(chunk.cardinality() as u64); - metrics - .partition_input_bytes - .with_guarded_label_values(&[ - &actor_id, - &source_id, - &split_id, - &source_name, - &fragment_id, - ]) - .inc_by(chunk.estimated_size() as u64); + partition_input_count_metric.inc_by(chunk.cardinality() as u64); + partition_input_bytes_metric.inc_by(chunk.estimated_size() as u64); }), BUFFER_SIZE, )