Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add cdc consume lag metrics #13877

Merged
merged 11 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

67 changes: 45 additions & 22 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,28 +827,6 @@ def section_streaming(outer_panels):
),
],
),
panels.timeseries_rowsps(
"CDC Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the cdc backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"CDC Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the cdc backfill upstream",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_count(
"Barrier Number",
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
Expand Down Expand Up @@ -965,6 +943,50 @@ def section_streaming(outer_panels):
)
]

def section_streaming_cdc(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Streaming CDC",
[
panels.timeseries_rowsps(
"CDC Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the cdc backfill snapshot",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_rowsps(
"CDC Backfill Upstream Throughput(rows)",
"Total number of rows that have been output from the cdc backfill upstream",
[
panels.target(
f"rate({table_metric('stream_cdc_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
panels.timeseries_latency_ms(
"CDC Consume Lag Latency",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('source_cdc_event_lag_duration_milliseconds_bucket')}[$__rate_interval])) by (le, table_name))",
f"lag p{legend}" + " - {{table_name}}",
),
[50, 99, "max"],
),
],
),
],
),
]

def section_streaming_actors(outer_panels):
panels = outer_panels.sub_panel()
Expand Down Expand Up @@ -3936,6 +3958,7 @@ def section_network_connection(outer_panels):
*section_cluster_node(panels),
*section_recovery_node(panels),
*section_streaming(panels),
*section_streaming_cdc(panels),
*section_streaming_actors(panels),
*section_streaming_actors_tokio(panels),
*section_streaming_exchange(panels),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -114,13 +115,19 @@ var record = event.value();
committer.markProcessed(event);
continue;
}
// get upstream event time from the "source" field
var sourceStruct = ((Struct) record.value()).getStruct("source");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be any difference in payload.source.ts_ms and payload.ts_ms for direct cdc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload.ts_ms is the time at which the connector processed the event, that is process time of the connector.

long sourceTsMs =
sourceStruct == null
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumb question: Is ts_ms guaranteed to be a utc based timestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, since timestamp in mysql and pg is stored as UTC.

byte[] payload =
converter.fromConnectData(
record.topic(), record.valueSchema(), record.value());

msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
var message = msgBuilder.build();
LOG.debug("record => {}", message.getPayload());
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ message CdcMessage {
string partition = 2;
string offset = 3;
string full_table_name = 4;
int64 source_ts_ms = 5;
}

enum SourceType {
Expand Down
12 changes: 12 additions & 0 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use self::util::get_kafka_topic;
use crate::common::AwsAuthProps;
use crate::parser::maxwell::MaxwellParser;
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
extract_source_struct, BoxSourceStream, SourceColumnDesc, SourceColumnType, SourceContext,
SourceContextRef, SourceEncode, SourceFormat, SourceMeta, SourceWithStateStream, SplitId,
Expand Down Expand Up @@ -545,6 +546,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
let _ = builder.take(batch_len);
}

let process_time_ms = chrono::Utc::now().timestamp_millis();
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
Expand All @@ -557,6 +559,16 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
continue;
}

// calculate process_time - event_time lag
if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
let lag_ms = process_time_ms - msg_meta.source_ts_ms;
// report to promethus
GLOBAL_SOURCE_METRICS
.direct_cdc_event_lag_latency
.with_label_values(&[&msg_meta.full_table_name])
.observe(lag_ms as f64);
}

split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone());

let old_op_num = builder.op_num();
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::source::SourceMeta;
#[derive(Debug, Clone)]
pub struct DebeziumCdcMeta {
pub full_table_name: String,
// payload.source.ts_ms the time that the change event was made in the database
pub source_ts_ms: i64,
}

impl From<CdcMessage> for SourceMessage {
Expand All @@ -35,6 +37,7 @@ impl From<CdcMessage> for SourceMessage {
split_id: message.partition.into(),
meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta {
full_table_name: message.full_table_name,
source_ts_ms: message.source_ts_ms,
}),
}
}
Expand Down
15 changes: 14 additions & 1 deletion src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::sync::{Arc, LazyLock};

use prometheus::core::{AtomicI64, AtomicU64, GenericCounterVec, GenericGaugeVec};
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, Registry,
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec,
Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

Expand Down Expand Up @@ -65,6 +67,8 @@ pub struct SourceMetrics {
pub rdkafka_native_metric: Arc<RdKafkaStats>,

pub connector_source_rows_received: GenericCounterVec<AtomicU64>,

pub direct_cdc_event_lag_latency: HistogramVec,
}

pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
Expand Down Expand Up @@ -102,13 +106,22 @@ impl SourceMetrics {
)
.unwrap();

let opts = histogram_opts!(
"source_cdc_event_lag_duration_milliseconds",
"source_cdc_lag_latency",
exponential_buckets(1.0, 2.0, 20).unwrap(), // max 1048s
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: the max will be 1*(2^(20-1)) ~= 524s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out.

);
let direct_cdc_event_lag_latency =
register_histogram_vec_with_registry!(opts, &["table_name"], registry).unwrap();

let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
SourceMetrics {
partition_input_count,
partition_input_bytes,
latest_message_id,
rdkafka_native_metric,
connector_source_rows_received,
direct_cdc_event_lag_latency,
}
}
}
Expand Down
Loading