From cef18462433eb48d53ace310c3266478c72906a5 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 13 Sep 2023 20:04:55 +0800 Subject: [PATCH 1/2] migrate cdc source metric from connector to compute --- src/connector/src/source/cdc/source/reader.rs | 7 +++++++ src/connector/src/source/monitor/metrics.rs | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index f85367d32e5bf..330902fec6fec 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -131,6 +131,9 @@ impl CommonSplitReader for CdcSplitReader { snapshot_done: self.snapshot_done, }; + let source_id = get_event_stream_request.source_id.to_string(); + let source_type = get_event_stream_request.source_type.to_string(); + std::thread::spawn(move || { let mut env = JVM .as_ref() @@ -163,6 +166,10 @@ impl CommonSplitReader for CdcSplitReader { while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { tracing::debug!("receive events {:?}", events.len()); + self.source_ctx.metrics.connector_source_rows_received.with_label_values(&[ + &source_type, + &source_id, + ]).inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; } diff --git a/src/connector/src/source/monitor/metrics.rs b/src/connector/src/source/monitor/metrics.rs index c6ea9998e55e4..fa3e836993c4f 100644 --- a/src/connector/src/source/monitor/metrics.rs +++ b/src/connector/src/source/monitor/metrics.rs @@ -62,6 +62,8 @@ pub struct SourceMetrics { /// Report latest message id pub latest_message_id: GenericGaugeVec, pub rdkafka_native_metric: Arc, + + pub connector_source_rows_received: GenericCounterVec, } pub static GLOBAL_SOURCE_METRICS: LazyLock = @@ -103,6 +105,15 @@ impl SourceMetrics { registry, ) .unwrap(); + + let connector_source_rows_received = register_int_counter_vec_with_registry!( + "connector_source_rows_received", + "Number of rows received by source", + &["source_type", "source_id"], + registry + ) + .unwrap(); + let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone())); SourceMetrics { partition_input_count, @@ -110,6 +121,7 @@ impl SourceMetrics { user_source_error_count, latest_message_id, rdkafka_native_metric, + connector_source_rows_received, } } } From 8b1ff31956708f4fd930ddae046ac70088c5607f Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 14 Sep 2023 13:11:07 +0800 Subject: [PATCH 2/2] fmt --- src/connector/src/source/cdc/source/reader.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 330902fec6fec..d4b20a86d7a2e 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -166,10 +166,11 @@ impl CommonSplitReader for CdcSplitReader { while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { tracing::debug!("receive events {:?}", events.len()); - self.source_ctx.metrics.connector_source_rows_received.with_label_values(&[ - &source_type, - &source_id, - ]).inc_by(events.len() as u64); + self.source_ctx + .metrics + .connector_source_rows_received + .with_label_values(&[&source_type, &source_id]) + .inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; }