diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index d4b20a86d7a2e..f85367d32e5bf 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -131,9 +131,6 @@ impl CommonSplitReader for CdcSplitReader { snapshot_done: self.snapshot_done, }; - let source_id = get_event_stream_request.source_id.to_string(); - let source_type = get_event_stream_request.source_type.to_string(); - std::thread::spawn(move || { let mut env = JVM .as_ref() @@ -166,11 +163,6 @@ impl CommonSplitReader for CdcSplitReader { while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { tracing::debug!("receive events {:?}", events.len()); - self.source_ctx - .metrics - .connector_source_rows_received - .with_label_values(&[&source_type, &source_id]) - .inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; } diff --git a/src/connector/src/source/monitor/metrics.rs b/src/connector/src/source/monitor/metrics.rs index fa3e836993c4f..c6ea9998e55e4 100644 --- a/src/connector/src/source/monitor/metrics.rs +++ b/src/connector/src/source/monitor/metrics.rs @@ -62,8 +62,6 @@ pub struct SourceMetrics { /// Report latest message id pub latest_message_id: GenericGaugeVec, pub rdkafka_native_metric: Arc, - - pub connector_source_rows_received: GenericCounterVec, } pub static GLOBAL_SOURCE_METRICS: LazyLock = @@ -105,15 +103,6 @@ impl SourceMetrics { registry, ) .unwrap(); - - let connector_source_rows_received = register_int_counter_vec_with_registry!( - "connector_source_rows_received", - "Number of rows received by source", - &["source_type", "source_id"], - registry - ) - .unwrap(); - let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone())); SourceMetrics { partition_input_count, @@ -121,7 +110,6 @@ impl SourceMetrics { user_source_error_count, latest_message_id, rdkafka_native_metric, - connector_source_rows_received, } } }