Skip to content

Commit

Permalink
feat(metrics): add cdc consume lag metrics (#13877)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Dec 11, 2023
1 parent 5d7f327 commit ddbd74b
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

67 changes: 45 additions & 22 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,28 +827,6 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_rowsps(
"CDC Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the cdc backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"CDC Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the cdc backfill upstream",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
Expand Down Expand Up @@ -965,6 +943,50 @@ def section_streaming(outer_panels):
)
]

def section_streaming_cdc(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Streaming CDC",
[
panels.timeseries_rowsps(
"CDC Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the cdc backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"CDC Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the cdc backfill upstream",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_latency_ms(
"CDC Consume Lag Latency",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('source_cdc_event_lag_duration_milliseconds_bucket')}[$__rate_interval])) by (le, table_name))",
f"lag p{legend}" + " - {{table_name}}",
),
[50, 99, "max"],
),
],
),
],
),
]

def section_streaming_actors(outer_panels):
panels = outer_panels.sub_panel()
Expand Down Expand Up @@ -3936,6 +3958,7 @@ def section_network_connection(outer_panels):
*section_cluster_node(panels),
*section_recovery_node(panels),
*section_streaming(panels),
*section_streaming_cdc(panels),
*section_streaming_actors(panels),
*section_streaming_actors_tokio(panels),
*section_streaming_exchange(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -114,13 +115,19 @@ var record = event.value();
committer.markProcessed(event);
continue;
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());

msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
var message = msgBuilder.build();
LOG.debug("record => {}", message.getPayload());
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ message CdcMessage {
string partition = 2;
string offset = 3;
string full_table_name = 4;
int64 source_ts_ms = 5;
}

enum SourceType {
Expand Down
12 changes: 12 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use self::util::get_kafka_topic;
use crate::common::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
Expand Down Expand Up @@ -545,6 +546,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
let _ = builder.take(batch_len);
}

let process_time_ms = chrono::Utc::now().timestamp_millis();
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
Expand All @@ -557,6 +559,16 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
continue;
}

// calculate process_time - event_time lag
if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
let lag_ms = process_time_ms - msg_meta.source_ts_ms;
// report to promethus
GLOBAL_SOURCE_METRICS
.direct_cdc_event_lag_latency
.with_label_values(&[&msg_meta.full_table_name])
.observe(lag_ms as f64);
}

split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone());

let old_op_num = builder.op_num();
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::source::SourceMeta;
#[derive(Debug, Clone)]
pub struct DebeziumCdcMeta {
pub full_table_name: String,
// extracted from `payload.source.ts_ms`, the time that the change event was made in the database
pub source_ts_ms: i64,
}

impl From<CdcMessage> for SourceMessage {
Expand All @@ -35,6 +37,7 @@ impl From<CdcMessage> for SourceMessage {
split_id: message.partition.into(),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: message.full_table_name,
source_ts_ms: message.source_ts_ms,
}),
}
}
Expand Down
15 changes: 14 additions & 1 deletion src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::sync::{Arc, LazyLock};

use prometheus::core::{AtomicI64, AtomicU64, GenericCounterVec, GenericGaugeVec};
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, Registry,
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec,
Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

Expand Down Expand Up @@ -65,6 +67,8 @@ pub struct SourceMetrics {
pub rdkafka_native_metric: Arc<RdKafkaStats>,

pub connector_source_rows_received: GenericCounterVec<AtomicU64>,

pub direct_cdc_event_lag_latency: HistogramVec,
}

pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
Expand Down Expand Up @@ -102,13 +106,22 @@ impl SourceMetrics {
)
.unwrap();

let opts = histogram_opts!(
"source_cdc_event_lag_duration_milliseconds",
"source_cdc_lag_latency",
exponential_buckets(1.0, 2.0, 21).unwrap(), // max 1048s
);
let direct_cdc_event_lag_latency =
register_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();

let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
SourceMetrics {
partition_input_count,
partition_input_bytes,
latest_message_id,
rdkafka_native_metric,
connector_source_rows_received,
direct_cdc_event_lag_latency,
}
}
}
Expand Down

0 comments on commit ddbd74b

Please sign in to comment.