Skip to content

Commit

Permalink
feat(metrics): add metrics to udf (#14090)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
wangrunji0408 authored Dec 21, 2023
1 parent 09b3c3c commit 95466f5
Showing 9 changed files with 263 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

128 changes: 112 additions & 16 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
@@ -943,6 +943,7 @@ def section_streaming(outer_panels):
)
]


def section_streaming_cdc(outer_panels):
panels = outer_panels.sub_panel()
return [
@@ -957,7 +958,7 @@ def section_streaming_cdc(outer_panels):
f"rate({table_metric('stream_cdc_backfill_snapshot_read_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
),
],
),
panels.timeseries_rowsps(
@@ -968,7 +969,7 @@ def section_streaming_cdc(outer_panels):
f"rate({table_metric('stream_cdc_backfill_upstream_output_row_count')}[$__rate_interval])",
"table_id={{table_id}} actor={{actor_id}} @ {{%s}}"
% NODE_LABEL,
),
),
],
),
panels.timeseries_latency_ms(
@@ -979,7 +980,7 @@ def section_streaming_cdc(outer_panels):
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('source_cdc_event_lag_duration_milliseconds_bucket')}[$__rate_interval])) by (le, table_name))",
f"lag p{legend}" + " - {{table_name}}",
),
),
[50, 99, "max"],
),
],
@@ -988,6 +989,7 @@ def section_streaming_cdc(outer_panels):
),
]


def section_streaming_actors(outer_panels):
panels = outer_panels.sub_panel()
return [
@@ -1124,7 +1126,6 @@ def section_streaming_actors(outer_panels):
f"rate({table_metric('stream_over_window_cache_miss_count')}[$__rate_interval])",
"cache miss count - table {{table_id}} actor {{actor_id}}",
),

panels.target(
f"sum(rate({table_metric('stream_over_window_range_cache_lookup_count')}[$__rate_interval])) by (table_id, fragment_id)",
"partition range cache lookup count - table {{table_id}} fragment {{fragment_id}}",
@@ -1403,7 +1404,6 @@ def section_streaming_actors(outer_panels):
f"{metric('stream_over_window_cached_entry_count')}",
"over window cached count | table {{table_id}} actor {{actor_id}}",
),

panels.target(
f"sum({metric('stream_over_window_range_cache_entry_count')}) by (table_id, fragment_id)",
"over window partition range cache entry count | table {{table_id}} fragment {{fragment_id}}",
@@ -2390,7 +2390,8 @@ def section_hummock_write(outer_panels):
[
panels.target(
f"sum(irate({table_metric('state_store_mem_table_spill_counts')}[$__rate_interval])) by ({COMPONENT_LABEL},{NODE_LABEL},table_id)",
"mem table spill table id - {{table_id}} @ {{%s}}" % NODE_LABEL,
"mem table spill table id - {{table_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
@@ -3391,6 +3392,7 @@ def section_kafka_native_metrics(outer_panels):
)
]


def section_iceberg_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
@@ -3403,9 +3405,9 @@ def section_iceberg_metrics(outer_panels):
[
panels.target(
f"{metric('iceberg_write_qps')}",
"{{executor_id}} @ {{sink_id}}",
"{{executor_id}} @ {{sink_id}}",
),
]
],
),
panels.timeseries_latency(
"Write Latency Of Iceberg Writer",
@@ -3430,31 +3432,31 @@ def section_iceberg_metrics(outer_panels):
[
panels.target(
f"{metric('iceberg_rolling_unfushed_data_file')}",
"{{executor_id}} @ {{sink_id}}",
"{{executor_id}} @ {{sink_id}}",
),
]
],
),
panels.timeseries_count(
"Iceberg position delete cache num",
"",
[
panels.target(
f"{metric('iceberg_position_delete_cache_num')}",
"{{executor_id}} @ {{sink_id}}",
"{{executor_id}} @ {{sink_id}}",
),
]
],
),
panels.timeseries_count(
"Iceberg partition num",
"",
[
panels.target(
f"{metric('iceberg_partition_num')}",
"{{executor_id}} @ {{sink_id}}",
"{{executor_id}} @ {{sink_id}}",
),
]
],
),
]
],
)
]

@@ -3880,6 +3882,99 @@ def section_network_connection(outer_panels):
]


def section_udf(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"User Defined Function",
[
panels.timeseries_count(
"UDF Calls Count",
"",
[
panels.target(
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_success_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_failure_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by (link, name)",
"udf_success_count - {{link}} {{name}}",
),
panels.target(
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by (link, name)",
"udf_failure_count - {{link}} {{name}}",
),
],
),
panels.timeseries_count(
"UDF Input Chunk Rows",
"",
[
panels.target(
f"sum(irate({metric('udf_input_chunk_rows_sum')}[$__rate_interval])) by (link, name) / sum(irate({metric('udf_input_chunk_rows_count')}[$__rate_interval])) by (link, name)",
"udf_input_chunk_rows_avg - {{link}} {{name}}",
),
],
),
panels.timeseries_latency(
"UDF Latency",
"",
[
panels.target(
f"histogram_quantile(0.50, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"udf_latency_p50 - {{%s}}" % NODE_LABEL,
),
panels.target(
f"histogram_quantile(0.90, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"udf_latency_p90 - {{%s}}" % NODE_LABEL,
),
panels.target(
f"histogram_quantile(0.99, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"udf_latency_p99 - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(irate({metric('udf_latency_sum')}[$__rate_interval])) / sum(irate({metric('udf_latency_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_latency_avg - {{%s}}" % NODE_LABEL,
),
],
),
panels.timeseries_count(
"UDF Throughput (rows)",
"",
[
panels.target(
f"sum(rate({metric('udf_input_rows')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_throughput_rows - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_input_rows')}[$__rate_interval])) by (link, name)",
"udf_throughput_rows - {{link}} {{name}}",
),
],
),
panels.timeseries_bytesps(
"UDF Throughput (bytes)",
"",
[
panels.target(
f"sum(rate({metric('udf_input_bytes')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL}) / (1024*1024)",
"udf_throughput_bytes - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_input_bytes')}[$__rate_interval])) by (link, name) / (1024*1024)",
"udf_throughput_bytes - {{link}} {{name}}",
),
],
),
],
)
]


templating_list = []
if dynamic_source_enabled:
templating_list.append(
@@ -4058,6 +4153,7 @@ def section_network_connection(outer_panels):
*section_sink_metrics(panels),
*section_kafka_native_metrics(panels),
*section_network_connection(panels),
*section_iceberg_metrics(panels)
*section_iceberg_metrics(panels),
*section_udf(panels),
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
@@ -99,8 +99,8 @@ impl UdfExpression {
.expect("failed covert ArrayRef to arrow_array::ArrayRef")
})
.collect();
let opts =
arrow_array::RecordBatchOptions::default().with_row_count(Some(vis.count_ones()));
let opts = arrow_array::RecordBatchOptions::default()
.with_row_count(Some(compacted_chunk.capacity()));
let input = arrow_array::RecordBatch::try_new_with_options(
self.arg_schema.clone(),
compacted_columns,
2 changes: 2 additions & 0 deletions src/expr/udf/Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
cfg-or-panic = "0.2"
futures-util = "0.3.28"
prometheus = "0.13"
risingwave_common = { workspace = true }
static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
42 changes: 40 additions & 2 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
@@ -26,12 +26,14 @@ use futures_util::{stream, Stream, StreamExt, TryStreamExt};
use thiserror_ext::AsReport;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

/// Client for external function service based on Arrow Flight.
#[derive(Debug)]
pub struct ArrowFlightUdfClient {
client: FlightServiceClient<Channel>,
addr: String,
}

// TODO: support UDF in simulation
@@ -45,7 +47,10 @@ impl ArrowFlightUdfClient {
.connect()
.await?;
let client = FlightServiceClient::new(conn);
Ok(Self { client })
Ok(Self {
client,
addr: addr.into(),
})
}

/// Connect to a UDF service lazily (i.e. only when the first request is sent).
@@ -55,7 +60,10 @@ impl ArrowFlightUdfClient {
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
let client = FlightServiceClient::new(conn);
Ok(Self { client })
Ok(Self {
client,
addr: addr.into(),
})
}

/// Check if the function is available and the schema is match.
@@ -101,6 +109,36 @@ impl ArrowFlightUdfClient {

/// Call a function.
pub async fn call(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let metrics = &*GLOBAL_METRICS;
let labels = &[self.addr.as_str(), id];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
.observe(input.num_rows() as f64);
metrics
.udf_input_rows
.with_label_values(labels)
.inc_by(input.num_rows() as u64);
metrics
.udf_input_bytes
.with_label_values(labels)
.inc_by(input.get_array_memory_size() as u64);
let timer = metrics.udf_latency.with_label_values(labels).start_timer();

let result = self.call_internal(id, input).await;

timer.stop_and_record();
if result.is_ok() {
&metrics.udf_success_count
} else {
&metrics.udf_failure_count
}
.with_label_values(labels)
.inc();
result
}

async fn call_internal(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let mut output_stream = self.call_stream(id, stream::once(async { input })).await?;
// TODO: support no output
let head = output_stream
2 changes: 2 additions & 0 deletions src/expr/udf/src/lib.rs
Original file line number Diff line number Diff line change
@@ -13,9 +13,11 @@
// limitations under the License.

#![feature(error_generic_member_access)]
#![feature(lazy_cell)]

mod error;
mod external;
mod metrics;

pub use error::{Error, Result};
pub use external::ArrowFlightUdfClient;
101 changes: 101 additions & 0 deletions src/expr/udf/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use prometheus::{
exponential_buckets, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, HistogramVec, IntCounterVec, Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

/// Monitor metrics for UDF.
#[derive(Debug, Clone)]
pub struct Metrics {
/// Number of successful UDF calls.
pub udf_success_count: IntCounterVec,
/// Number of failed UDF calls.
pub udf_failure_count: IntCounterVec,
/// Input chunk rows of UDF calls.
pub udf_input_chunk_rows: HistogramVec,
/// The latency of UDF calls in seconds.
pub udf_latency: HistogramVec,
/// Total number of input rows of UDF calls.
pub udf_input_rows: IntCounterVec,
/// Total number of input bytes of UDF calls.
pub udf_input_bytes: IntCounterVec,
}

/// Global UDF metrics.
pub static GLOBAL_METRICS: LazyLock<Metrics> =
LazyLock::new(|| Metrics::new(&GLOBAL_METRICS_REGISTRY));

impl Metrics {
fn new(registry: &Registry) -> Self {
let labels = &["link", "name"];
let udf_success_count = register_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
labels,
registry
)
.unwrap();
let udf_failure_count = register_int_counter_vec_with_registry!(
"udf_failure_count",
"Total number of failed UDF calls",
labels,
registry
)
.unwrap();
let udf_input_chunk_rows = register_histogram_vec_with_registry!(
"udf_input_chunk_rows",
"Input chunk rows of UDF calls",
labels,
exponential_buckets(1.0, 2.0, 10).unwrap(), // 1 to 1024
registry
)
.unwrap();
let udf_latency = register_histogram_vec_with_registry!(
"udf_latency",
"The latency(s) of UDF calls",
labels,
exponential_buckets(0.000001, 2.0, 30).unwrap(), // 1us to 1000s
registry
)
.unwrap();
let udf_input_rows = register_int_counter_vec_with_registry!(
"udf_input_rows",
"Total number of input rows of UDF calls",
labels,
registry
)
.unwrap();
let udf_input_bytes = register_int_counter_vec_with_registry!(
"udf_input_bytes",
"Total number of input bytes of UDF calls",
labels,
registry
)
.unwrap();

Metrics {
udf_success_count,
udf_failure_count,
udf_input_chunk_rows,
udf_latency,
udf_input_rows,
udf_input_bytes,
}
}
}

0 comments on commit 95466f5

Please sign in to comment.