Skip to content

Commit

Permalink
feat: support iceberg metrics (#13259)
Browse files Browse the repository at this point in the history
Co-authored-by: zenotme <[email protected]>
  • Loading branch information
ZENOTME and zenotme authored Nov 7, 2023
1 parent 4fdbade commit 2906017
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 18 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.4.0" }
tonic-build = { package = "madsim-tonic-build", version = "0.4.2" }
prost = { version = "0.12" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "5a5202d1f3502f0cf82041044b0427434da59adc" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "5cdcdffd24f4624a0a43f92c5f368988169a799b", features = ["prometheus"] }
arrow-array = "48"
arrow-cast = "48"
arrow-schema = "48"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3343,6 +3343,43 @@ def section_kafka_native_metrics(outer_panels):
)
]

def section_iceberg_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Iceberg Sink Metrics",
[
panels.timeseries_count(
"Write Qps Of Iceberg File Appender",
"iceberg file appender write qps",
[
panels.target(
f"{metric('iceberg_file_appender_write_qps')}",
"{{executor_id}} @ {{sink_id}}",
),
]
),
panels.timeseries_latency(
"Write latency Of Iceberg File Appender",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('iceberg_file_appender_write_latency_bucket')}[$__rate_interval])) by (le, sink_id))",
f"p{legend}" + " @ {{sink_id}}",
),
[50, 99, "max"],
),
panels.target(
f"sum by(le, sink_id)(rate({metric('iceberg_file_appender_write_latency_sum')}[$__rate_interval])) / sum by(le, type, job, instance) (rate({metric('iceberg_file_appender_write_latency_count')}[$__rate_interval]))",
"avg @ {{sink_id}}",
),
],
),
]
)
]


def section_memory_manager(outer_panels):
panels = outer_panels.sub_panel()
Expand Down Expand Up @@ -3922,5 +3959,6 @@ def section_network_connection(outer_panels):
*section_sink_metrics(panels),
*section_kafka_native_metrics(panels),
*section_network_connection(panels),
*section_iceberg_metrics(panels)
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

80 changes: 66 additions & 14 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -23,7 +24,8 @@ use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef};
use async_trait::async_trait;
use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE};
use icelake::io::file_writer::DeltaWriterResult;
use icelake::io::EmptyLayer;
use icelake::io::prometheus::{FileAppenderMetrics, WriterPrometheusLayer};
use icelake::io::{ChainedFileAppenderLayer, EmptyLayer, RollingWriter};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::{Table, TableIdentifier};
Expand Down Expand Up @@ -338,9 +340,9 @@ impl Sink for IcebergSink {
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
let table = self.create_table().await?;
let inner = if let Some(unique_column_ids) = &self.unique_column_ids {
IcebergWriter::new_upsert(table, unique_column_ids.clone()).await?
IcebergWriter::new_upsert(table, unique_column_ids.clone(), &writer_param).await?
} else {
IcebergWriter::new_append_only(table).await?
IcebergWriter::new_append_only(table, &writer_param).await?
};
Ok(CoordinatedSinkWriter::new(
writer_param
Expand Down Expand Up @@ -379,15 +381,37 @@ enum IcebergWriterEnum {
}

impl IcebergWriter {
pub async fn new_append_only(table: Table) -> Result<Self> {
fn metrics_layer(write_param: &SinkWriterParam) -> WriterPrometheusLayer {
let iceberg_metrics = FileAppenderMetrics::new(
write_param
.sink_metrics
.iceberg_file_appender_write_qps
.deref()
.clone(),
write_param
.sink_metrics
.iceberg_file_appender_write_latency
.deref()
.clone(),
);
WriterPrometheusLayer::new(iceberg_metrics)
}

pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result<Self> {
let metrics_layer = Self::metrics_layer(writer_param);
Ok(Self(IcebergWriterEnum::AppendOnly(
AppendOnlyWriter::new(table).await?,
AppendOnlyWriter::new(table, metrics_layer).await?,
)))
}

pub async fn new_upsert(table: Table, unique_column_ids: Vec<usize>) -> Result<Self> {
pub async fn new_upsert(
table: Table,
unique_column_ids: Vec<usize>,
writer_param: &SinkWriterParam,
) -> Result<Self> {
let metrics_layer = Self::metrics_layer(writer_param);
Ok(Self(IcebergWriterEnum::Upsert(
UpsertWriter::new(table, unique_column_ids).await?,
UpsertWriter::new(table, unique_column_ids, metrics_layer).await?,
)))
}
}
Expand Down Expand Up @@ -436,12 +460,15 @@ impl SinkWriter for IcebergWriter {

struct AppendOnlyWriter {
table: Table,
writer: icelake::io::task_writer::TaskWriter<EmptyLayer>,
writer: icelake::io::task_writer::TaskWriter<
ChainedFileAppenderLayer<EmptyLayer, RollingWriter, WriterPrometheusLayer>,
>,
schema: SchemaRef,
metrics_layer: WriterPrometheusLayer,
}

impl AppendOnlyWriter {
pub async fn new(table: Table) -> Result<Self> {
pub async fn new(table: Table, metrics_layer: WriterPrometheusLayer) -> Result<Self> {
let schema = Arc::new(
table
.current_table_metadata()
Expand All @@ -456,11 +483,13 @@ impl AppendOnlyWriter {
writer: table
.writer_builder()
.await?
.with_file_appender_layer(metrics_layer.clone())
.build_task_writer()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
table,
schema,
metrics_layer,
})
}

Expand All @@ -487,6 +516,7 @@ impl AppendOnlyWriter {
self.table
.writer_builder()
.await?
.with_file_appender_layer(self.metrics_layer.clone())
.build_task_writer()
.await
.map_err(|err| SinkError::Iceberg(anyhow!(err)))?,
Expand Down Expand Up @@ -514,7 +544,11 @@ enum UpsertWriterInner {
}

impl UpsertWriter {
pub async fn new(table: Table, unique_column_ids: Vec<usize>) -> Result<Self> {
pub async fn new(
table: Table,
unique_column_ids: Vec<usize>,
metrics_layer: WriterPrometheusLayer,
) -> Result<Self> {
let schema = Arc::new(
table
.current_table_metadata()
Expand All @@ -529,10 +563,11 @@ impl UpsertWriter {
table,
partition_splitter,
unique_column_ids,
metrics_layer,
))
} else {
UpsertWriterInner::Unpartition(
UnpartitionDeltaWriter::new(table, unique_column_ids).await?,
UnpartitionDeltaWriter::new(table, unique_column_ids, metrics_layer).await?,
)
};
Ok(Self {
Expand Down Expand Up @@ -611,18 +646,27 @@ impl UpsertWriter {

struct UnpartitionDeltaWriter {
table: Table,
writer: icelake::io::file_writer::EqualityDeltaWriter<EmptyLayer>,
writer: icelake::io::file_writer::EqualityDeltaWriter<
ChainedFileAppenderLayer<EmptyLayer, RollingWriter, WriterPrometheusLayer>,
>,
metrics_layer: WriterPrometheusLayer,
unique_column_ids: Vec<usize>,
}

impl UnpartitionDeltaWriter {
pub async fn new(table: Table, unique_column_ids: Vec<usize>) -> Result<Self> {
pub async fn new(
table: Table,
unique_column_ids: Vec<usize>,
metrics_layer: WriterPrometheusLayer,
) -> Result<Self> {
Ok(Self {
writer: table
.writer_builder()
.await?
.with_file_appender_layer(metrics_layer.clone())
.build_equality_delta_writer(unique_column_ids.clone())
.await?,
metrics_layer,
table,
unique_column_ids,
})
Expand All @@ -644,6 +688,7 @@ impl UnpartitionDeltaWriter {
self.table
.writer_builder()
.await?
.with_file_appender_layer(self.metrics_layer.clone())
.build_equality_delta_writer(self.unique_column_ids.clone())
.await?,
);
Expand All @@ -655,21 +700,26 @@ struct PartitionDeltaWriter {
table: Table,
writers: HashMap<
icelake::types::PartitionKey,
icelake::io::file_writer::EqualityDeltaWriter<EmptyLayer>,
icelake::io::file_writer::EqualityDeltaWriter<
ChainedFileAppenderLayer<EmptyLayer, RollingWriter, WriterPrometheusLayer>,
>,
>,
partition_splitter: icelake::types::PartitionSplitter,
unique_column_ids: Vec<usize>,
metrics_layer: WriterPrometheusLayer,
}

impl PartitionDeltaWriter {
pub fn new(
table: Table,
partition_splitter: icelake::types::PartitionSplitter,
unique_column_ids: Vec<usize>,
metrics_layer: WriterPrometheusLayer,
) -> Self {
Self {
table,
writers: HashMap::new(),
metrics_layer,
partition_splitter,
unique_column_ids,
}
Expand All @@ -684,6 +734,7 @@ impl PartitionDeltaWriter {
self.table
.writer_builder()
.await?
.with_file_appender_layer(self.metrics_layer.clone())
.build_equality_delta_writer(self.unique_column_ids.clone())
.await?,
)
Expand All @@ -705,6 +756,7 @@ impl PartitionDeltaWriter {
self.table
.writer_builder()
.await?
.with_file_appender_layer(self.metrics_layer.clone())
.build_equality_delta_writer(self.unique_column_ids.clone())
.await?,
)
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ pub struct SinkMetrics {
pub log_store_write_rows: LabelGuardedIntCounter<3>,
pub log_store_latest_read_epoch: LabelGuardedIntGauge<3>,
pub log_store_read_rows: LabelGuardedIntCounter<3>,

pub iceberg_file_appender_write_qps: LabelGuardedIntCounter<2>,
pub iceberg_file_appender_write_latency: LabelGuardedHistogram<2>,
}

impl SinkMetrics {
Expand All @@ -237,6 +240,8 @@ impl SinkMetrics {
log_store_latest_read_epoch: LabelGuardedIntGauge::test_int_gauge(),
log_store_write_rows: LabelGuardedIntCounter::test_int_counter(),
log_store_read_rows: LabelGuardedIntCounter::test_int_counter(),
iceberg_file_appender_write_qps: LabelGuardedIntCounter::test_int_counter(),
iceberg_file_appender_write_latency: LabelGuardedHistogram::test_histogram(),
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ pub struct StreamingMetrics {
pub kv_log_store_storage_read_count: LabelGuardedIntCounterVec<4>,
pub kv_log_store_storage_read_size: LabelGuardedIntCounterVec<4>,

// Sink iceberg metrics
pub iceberg_file_appender_write_qps: LabelGuardedIntCounterVec<2>,
pub iceberg_file_appender_write_latency: LabelGuardedHistogramVec<2>,

// Memory management
// FIXME(yuhao): use u64 here
pub lru_current_watermark_time_ms: IntGauge,
Expand Down Expand Up @@ -901,6 +905,22 @@ impl StreamingMetrics {
)
.unwrap();

let iceberg_file_appender_write_qps = register_guarded_int_counter_vec_with_registry!(
"iceberg_file_appender_write_qps",
"The qps of iceberg file appender write",
&["executor_id", "sink_id"],
registry
)
.unwrap();

let iceberg_file_appender_write_latency = register_guarded_histogram_vec_with_registry!(
"iceberg_file_appender_write_latency",
"The latency of iceberg file appender write",
&["executor_id", "sink_id"],
registry
)
.unwrap();

Self {
level,
executor_row_count,
Expand Down Expand Up @@ -977,6 +997,8 @@ impl StreamingMetrics {
kv_log_store_storage_write_size,
kv_log_store_storage_read_count,
kv_log_store_storage_read_size,
iceberg_file_appender_write_qps,
iceberg_file_appender_write_latency,
lru_current_watermark_time_ms,
lru_physical_now_ms,
lru_runtime_loop_count,
Expand Down Expand Up @@ -1024,6 +1046,14 @@ impl StreamingMetrics {
let log_store_write_rows = self.log_store_write_rows.with_label_values(&label_list);
let log_store_read_rows = self.log_store_read_rows.with_label_values(&label_list);

let label_list = [identity, sink_id_str];
let iceberg_file_appender_write_qps = self
.iceberg_file_appender_write_qps
.with_label_values(&label_list);
let iceberg_file_appender_write_latency = self
.iceberg_file_appender_write_latency
.with_label_values(&label_list);

SinkMetrics {
sink_commit_duration_metrics,
connector_sink_rows_received,
Expand All @@ -1032,6 +1062,8 @@ impl StreamingMetrics {
log_store_write_rows,
log_store_latest_read_epoch,
log_store_read_rows,
iceberg_file_appender_write_qps,
iceberg_file_appender_write_latency,
}
}
}

0 comments on commit 2906017

Please sign in to comment.