Skip to content

Commit

Permalink
fix: make guarded label metrics correctly drop (#18454) (#18530)
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] authored Sep 13, 2024
1 parent 0b78a01 commit 870790c
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 128 deletions.
15 changes: 11 additions & 4 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::LazyLock;

Expand Down Expand Up @@ -710,6 +711,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
len: usize,
}
let mut current_transaction = None;
let mut direct_cdc_event_lag_latency_metrics = HashMap::new();

#[for_await]
for batch in data_stream {
Expand Down Expand Up @@ -759,10 +761,15 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
let lag_ms = process_time_ms - msg_meta.source_ts_ms;
// report to promethus
GLOBAL_SOURCE_METRICS
.direct_cdc_event_lag_latency
.with_guarded_label_values(&[&msg_meta.full_table_name])
.observe(lag_ms as f64);
let full_table_name = msg_meta.full_table_name.clone();
let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
.entry(full_table_name)
.or_insert_with(|| {
GLOBAL_SOURCE_METRICS
.direct_cdc_event_lag_latency
.with_guarded_label_values(&[&msg_meta.full_table_name])
});
direct_cdc_event_lag_latency.observe(lag_ms as f64);
}

let old_len = builder.len();
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,15 @@ impl<T: CdcSourceTypeTrait> CdcSplitReader<T> {
let mut rx = self.rx;
let source_id = self.source_id.to_string();
let metrics = self.source_ctx.metrics.clone();
let connector_source_rows_received_metrics = metrics
.connector_source_rows_received
.with_guarded_label_values(&[source_type.as_str_name(), &source_id]);

while let Some(result) = rx.recv().await {
match result {
Ok(GetEventStreamResponse { events, .. }) => {
tracing::trace!("receive {} cdc events ", events.len());
metrics
.connector_source_rows_received
.with_guarded_label_values(&[source_type.as_str_name(), &source_id])
.inc_by(events.len() as u64);
connector_source_rows_received_metrics.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}
Expand Down
53 changes: 33 additions & 20 deletions src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;

use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
Expand All @@ -33,45 +34,57 @@ pub(crate) async fn into_chunk_stream(
let source_id = source_ctx.source_id.to_string();
let source_name = source_ctx.source_name.to_string();
let metrics = source_ctx.metrics.clone();
let mut partition_input_count = HashMap::new();
let mut partition_bytes_count = HashMap::new();

// add metrics to the data stream
let data_stream = data_stream
.inspect_ok(move |data_batch| {
let mut by_split_id = std::collections::HashMap::new();

for msg in data_batch {
let split_id: String = msg.split_id.as_ref().to_string();
by_split_id
.entry(msg.split_id.as_ref())
.entry(split_id.clone())
.or_insert_with(Vec::new)
.push(msg);
partition_input_count
.entry(split_id.clone())
.or_insert_with(|| {
metrics.partition_input_count.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id.clone(),
&source_name,
&fragment_id,
])
});
partition_bytes_count
.entry(split_id.clone())
.or_insert_with(|| {
metrics.partition_input_bytes.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
});
}

for (split_id, msgs) in by_split_id {
metrics
.partition_input_count
.with_guarded_label_values(&[
&actor_id,
&source_id,
split_id,
&source_name,
&fragment_id,
])
partition_input_count
.get_mut(&split_id)
.unwrap()
.inc_by(msgs.len() as u64);

let sum_bytes = msgs
.iter()
.flat_map(|msg| msg.payload.as_ref().map(|p| p.len() as u64))
.sum();

metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
split_id,
&source_name,
&fragment_id,
])
partition_input_count
.get_mut(&split_id)
.unwrap()
.inc_by(sum_bytes);
}
})
Expand Down
31 changes: 21 additions & 10 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::Context;
use async_trait::async_trait;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
use risingwave_common_estimate_size::EstimateSize;
use thiserror_ext::AsReport;

use super::generator::DatagenEventGenerator;
Expand Down Expand Up @@ -156,20 +157,30 @@ impl SplitReader for DatagenSplitReader {
let source_name = self.source_ctx.source_name.to_string();
let split_id = self.split_id.to_string();
let metrics = self.source_ctx.metrics.clone();
let partition_input_count_metric =
metrics.partition_input_count.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
]);
let partition_input_bytes_metric =
metrics.partition_input_bytes.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
]);

spawn_data_generation_stream(
self.generator
.into_native_stream()
.inspect_ok(move |stream_chunk| {
metrics
.partition_input_count
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(stream_chunk.cardinality() as u64);
partition_input_count_metric.inc_by(stream_chunk.cardinality() as u64);
partition_input_bytes_metric
.inc_by(stream_chunk.estimated_size() as u64);
}),
BUFFER_SIZE,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ impl<Src: OpendalSource> OpendalReader<Src> {
let mut offset: usize = split.offset;
let mut batch_size: usize = 0;
let mut batch = Vec::new();
let partition_input_bytes_metrics = source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
]);
let stream = ReaderStream::with_capacity(buf_reader, STREAM_READER_CAPACITY);
#[for_await]
for read in stream {
Expand All @@ -193,34 +203,14 @@ impl<Src: OpendalSource> OpendalReader<Src> {
batch.push(msg);

if batch.len() >= max_chunk_size {
source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(batch_size as u64);
partition_input_bytes_metrics.inc_by(batch_size as u64);
let yield_batch = std::mem::take(&mut batch);
batch_size = 0;
yield yield_batch;
}
}
if !batch.is_empty() {
source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(batch_size as u64);
partition_input_bytes_metrics.inc_by(batch_size as u64);
yield batch;
}
}
Expand Down
34 changes: 12 additions & 22 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ impl S3FileReader {
let mut offset: usize = split.offset;
let mut batch_size: usize = 0;
let mut batch = Vec::new();
let partition_input_bytes_metrics = source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
]);
#[for_await]
for read in stream {
let bytes = read?;
Expand All @@ -121,34 +131,14 @@ impl S3FileReader {
batch_size += len;
batch.push(msg);
if batch.len() >= max_chunk_size {
source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(batch_size as u64);
partition_input_bytes_metrics.inc_by(batch_size as u64);
let yield_batch = std::mem::take(&mut batch);
batch_size = 0;
yield yield_batch;
}
}
if !batch.is_empty() {
source_ctx
.metrics
.partition_input_bytes
.with_guarded_label_values(&[
&actor_id,
&source_id,
&split_id,
&source_name,
&fragment_id,
])
.inc_by(batch_size as u64);
partition_input_bytes_metrics.inc_by(batch_size as u64);
yield batch;
}
}
Expand Down
32 changes: 22 additions & 10 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use std::time::Duration;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use prometheus::core::{AtomicI64, GenericGauge};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaResult;
use rdkafka::{Offset, TopicPartitionList};
use risingwave_common::bail;
use risingwave_common::metrics::LabelGuardedMetric;

use crate::error::ConnectorResult;
use crate::source::base::SplitEnumerator;
Expand Down Expand Up @@ -49,6 +51,7 @@ pub struct KafkaSplitEnumerator {
stop_offset: KafkaEnumeratorOffset,

sync_call_timeout: Duration,
high_watermark_metrics: HashMap<i32, LabelGuardedMetric<GenericGauge<AtomicI64>, 2>>,
}

impl KafkaSplitEnumerator {}
Expand Down Expand Up @@ -124,6 +127,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
start_offset: scan_start_offset,
stop_offset: KafkaEnumeratorOffset::None,
sync_call_timeout: properties.common.sync_call_timeout,
high_watermark_metrics: HashMap::new(),
})
}

Expand Down Expand Up @@ -160,7 +164,10 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

impl KafkaSplitEnumerator {
async fn get_watermarks(&self, partitions: &[i32]) -> KafkaResult<HashMap<i32, (i64, i64)>> {
async fn get_watermarks(
&mut self,
partitions: &[i32],
) -> KafkaResult<HashMap<i32, (i64, i64)>> {
let mut map = HashMap::new();
for partition in partitions {
let (low, high) = self
Expand Down Expand Up @@ -357,15 +364,20 @@ impl KafkaSplitEnumerator {
}

#[inline]
fn report_high_watermark(&self, partition: i32, offset: i64) {
self.context
.metrics
.high_watermark
.with_guarded_label_values(&[
&self.context.info.source_id.to_string(),
&partition.to_string(),
])
.set(offset);
fn report_high_watermark(&mut self, partition: i32, offset: i64) {
let high_watermark_metrics =
self.high_watermark_metrics
.entry(partition)
.or_insert_with(|| {
self.context
.metrics
.high_watermark
.with_guarded_label_values(&[
&self.context.info.source_id.to_string(),
&partition.to_string(),
])
});
high_watermark_metrics.set(offset);
}

pub async fn check_reachability(&self) -> bool {
Expand Down
Loading

0 comments on commit 870790c

Please sign in to comment.