Skip to content

Commit

Permalink
Revert "refactor(connector): migrate cdc source metric from connector…
Browse files Browse the repository at this point in the history
… to compute (#12283)"

This reverts commit 827ed5e.
  • Loading branch information
StrikeW committed Sep 24, 2023
1 parent f42d3db commit f400b52
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 20 deletions.
8 changes: 0 additions & 8 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
snapshot_done: self.snapshot_done,
};

let source_id = get_event_stream_request.source_id.to_string();
let source_type = get_event_stream_request.source_type.to_string();

std::thread::spawn(move || {
let mut env = JVM
.as_ref()
Expand Down Expand Up @@ -166,11 +163,6 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {

while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await {
tracing::debug!("receive events {:?}", events.len());
self.source_ctx
.metrics
.connector_source_rows_received
.with_label_values(&[&source_type, &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}
Expand Down
12 changes: 0 additions & 12 deletions src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ pub struct SourceMetrics {
/// Report latest message id
pub latest_message_id: GenericGaugeVec<AtomicI64>,
pub rdkafka_native_metric: Arc<RdKafkaStats>,

pub connector_source_rows_received: GenericCounterVec<AtomicU64>,
}

pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
Expand Down Expand Up @@ -105,23 +103,13 @@ impl SourceMetrics {
registry,
)
.unwrap();

let connector_source_rows_received = register_int_counter_vec_with_registry!(
"connector_source_rows_received",
"Number of rows received by source",
&["source_type", "source_id"],
registry
)
.unwrap();

let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
SourceMetrics {
partition_input_count,
partition_input_bytes,
user_source_error_count,
latest_message_id,
rdkafka_native_metric,
connector_source_rows_received,
}
}
}
Expand Down

0 comments on commit f400b52

Please sign in to comment.