Skip to content

Commit

Permalink
feat(metrics): add metrics to udf (#14090)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 committed Dec 25, 2023
1 parent af650d7 commit bf96a5e
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

103 changes: 100 additions & 3 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ def section_streaming(outer_panels):
]



def section_streaming_actors(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand Down Expand Up @@ -2342,7 +2343,8 @@ def section_hummock_write(outer_panels):
[
panels.target(
f"sum(irate({table_metric('state_store_mem_table_spill_counts')}[$__rate_interval])) by ({COMPONENT_LABEL},{NODE_LABEL},table_id)",
"mem table spill table id - {{table_id}} @ {{%s}}" % NODE_LABEL,
"mem table spill table id - {{table_id}} @ {{%s}}"
% NODE_LABEL,
),
],
),
Expand Down Expand Up @@ -3343,6 +3345,7 @@ def section_kafka_native_metrics(outer_panels):
)
]


def section_iceberg_metrics(outer_panels):
panels = outer_panels.sub_panel()
return [
Expand All @@ -3357,7 +3360,7 @@ def section_iceberg_metrics(outer_panels):
f"{metric('iceberg_file_appender_write_qps')}",
"{{executor_id}} @ {{sink_id}}",
),
]
],
),
panels.timeseries_latency(
"Write latency Of Iceberg File Appender",
Expand Down Expand Up @@ -3782,6 +3785,99 @@ def section_network_connection(outer_panels):
]


def section_udf(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"User Defined Function",
[
panels.timeseries_count(
"UDF Calls Count",
"",
[
panels.target(
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_success_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_failure_count - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_success_count')}[$__rate_interval])) by (link, name)",
"udf_success_count - {{link}} {{name}}",
),
panels.target(
f"sum(rate({metric('udf_failure_count')}[$__rate_interval])) by (link, name)",
"udf_failure_count - {{link}} {{name}}",
),
],
),
panels.timeseries_count(
"UDF Input Chunk Rows",
"",
[
panels.target(
f"sum(irate({metric('udf_input_chunk_rows_sum')}[$__rate_interval])) by (link, name) / sum(irate({metric('udf_input_chunk_rows_count')}[$__rate_interval])) by (link, name)",
"udf_input_chunk_rows_avg - {{link}} {{name}}",
),
],
),
panels.timeseries_latency(
"UDF Latency",
"",
[
panels.target(
f"histogram_quantile(0.50, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"udf_latency_p50 - {{%s}}" % NODE_LABEL,
),
panels.target(
f"histogram_quantile(0.90, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"udf_latency_p90 - {{%s}}" % NODE_LABEL,
),
panels.target(
f"histogram_quantile(0.99, sum(irate({metric('udf_latency_bucket')}[$__rate_interval])) by (le, {COMPONENT_LABEL}, {NODE_LABEL}))",
"udf_latency_p99 - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(irate({metric('udf_latency_sum')}[$__rate_interval])) / sum(irate({metric('udf_latency_count')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_latency_avg - {{%s}}" % NODE_LABEL,
),
],
),
panels.timeseries_count(
"UDF Throughput (rows)",
"",
[
panels.target(
f"sum(rate({metric('udf_input_rows')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_throughput_rows - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_input_rows')}[$__rate_interval])) by (link, name)",
"udf_throughput_rows - {{link}} {{name}}",
),
],
),
panels.timeseries_bytesps(
"UDF Throughput (bytes)",
"",
[
panels.target(
f"sum(rate({metric('udf_input_bytes')}[$__rate_interval])) by ({COMPONENT_LABEL}, {NODE_LABEL}) / (1024*1024)",
"udf_throughput_bytes - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum(rate({metric('udf_input_bytes')}[$__rate_interval])) by (link, name) / (1024*1024)",
"udf_throughput_bytes - {{link}} {{name}}",
),
],
),
],
)
]


templating_list = []
if dynamic_source_enabled:
templating_list.append(
Expand Down Expand Up @@ -3959,6 +4055,7 @@ def section_network_connection(outer_panels):
*section_sink_metrics(panels),
*section_kafka_native_metrics(panels),
*section_network_connection(panels),
*section_iceberg_metrics(panels)
*section_iceberg_metrics(panels),
*section_udf(panels),
],
).auto_panel_ids()
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ impl UdfExpression {
.expect("failed covert ArrayRef to arrow_array::ArrayRef")
})
.collect();
let opts =
arrow_array::RecordBatchOptions::default().with_row_count(Some(vis.count_ones()));
let opts = arrow_array::RecordBatchOptions::default()
.with_row_count(Some(compacted_chunk.capacity()));
let input = arrow_array::RecordBatch::try_new_with_options(
self.arg_schema.clone(),
compacted_columns,
Expand Down
2 changes: 2 additions & 0 deletions src/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
cfg-or-panic = "0.2"
futures-util = "0.3.28"
prometheus = "0.13"
risingwave_common = { workspace = true }
static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
Expand Down
42 changes: 40 additions & 2 deletions src/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use cfg_or_panic::cfg_or_panic;
use futures_util::{stream, Stream, StreamExt, TryStreamExt};
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

/// Client for external function service based on Arrow Flight.
#[derive(Debug)]
pub struct ArrowFlightUdfClient {
client: FlightServiceClient<Channel>,
addr: String,
}

// TODO: support UDF in simulation
Expand All @@ -44,7 +46,10 @@ impl ArrowFlightUdfClient {
.connect()
.await?;
let client = FlightServiceClient::new(conn);
Ok(Self { client })
Ok(Self {
client,
addr: addr.into(),
})
}

/// Connect to a UDF service lazily (i.e. only when the first request is sent).
Expand All @@ -54,7 +59,10 @@ impl ArrowFlightUdfClient {
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
let client = FlightServiceClient::new(conn);
Ok(Self { client })
Ok(Self {
client,
addr: addr.into(),
})
}

/// Check if the function is available and the schema is match.
Expand Down Expand Up @@ -99,6 +107,36 @@ impl ArrowFlightUdfClient {

/// Call a function.
pub async fn call(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let metrics = &*GLOBAL_METRICS;
let labels = &[self.addr.as_str(), id];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
.observe(input.num_rows() as f64);
metrics
.udf_input_rows
.with_label_values(labels)
.inc_by(input.num_rows() as u64);
metrics
.udf_input_bytes
.with_label_values(labels)
.inc_by(input.get_array_memory_size() as u64);
let timer = metrics.udf_latency.with_label_values(labels).start_timer();

let result = self.call_internal(id, input).await;

timer.stop_and_record();
if result.is_ok() {
&metrics.udf_success_count
} else {
&metrics.udf_failure_count
}
.with_label_values(labels)
.inc();
result
}

async fn call_internal(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let mut output_stream = self.call_stream(id, stream::once(async { input })).await?;
// TODO: support no output
let head = output_stream
Expand Down
2 changes: 2 additions & 0 deletions src/udf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

#![feature(error_generic_member_access)]
#![feature(lazy_cell)]

mod error;
mod external;
mod metrics;

pub use error::{Error, Result};
pub use external::ArrowFlightUdfClient;
101 changes: 101 additions & 0 deletions src/udf/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use prometheus::{
exponential_buckets, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, HistogramVec, IntCounterVec, Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

/// Monitor metrics for UDF.
#[derive(Debug, Clone)]
pub struct Metrics {
/// Number of successful UDF calls.
pub udf_success_count: IntCounterVec,
/// Number of failed UDF calls.
pub udf_failure_count: IntCounterVec,
/// Input chunk rows of UDF calls.
pub udf_input_chunk_rows: HistogramVec,
/// The latency of UDF calls in seconds.
pub udf_latency: HistogramVec,
/// Total number of input rows of UDF calls.
pub udf_input_rows: IntCounterVec,
/// Total number of input bytes of UDF calls.
pub udf_input_bytes: IntCounterVec,
}

/// Global UDF metrics.
pub static GLOBAL_METRICS: LazyLock<Metrics> =
LazyLock::new(|| Metrics::new(&GLOBAL_METRICS_REGISTRY));

impl Metrics {
fn new(registry: &Registry) -> Self {
let labels = &["link", "name"];
let udf_success_count = register_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
labels,
registry
)
.unwrap();
let udf_failure_count = register_int_counter_vec_with_registry!(
"udf_failure_count",
"Total number of failed UDF calls",
labels,
registry
)
.unwrap();
let udf_input_chunk_rows = register_histogram_vec_with_registry!(
"udf_input_chunk_rows",
"Input chunk rows of UDF calls",
labels,
exponential_buckets(1.0, 2.0, 10).unwrap(), // 1 to 1024
registry
)
.unwrap();
let udf_latency = register_histogram_vec_with_registry!(
"udf_latency",
"The latency(s) of UDF calls",
labels,
exponential_buckets(0.000001, 2.0, 30).unwrap(), // 1us to 1000s
registry
)
.unwrap();
let udf_input_rows = register_int_counter_vec_with_registry!(
"udf_input_rows",
"Total number of input rows of UDF calls",
labels,
registry
)
.unwrap();
let udf_input_bytes = register_int_counter_vec_with_registry!(
"udf_input_bytes",
"Total number of input bytes of UDF calls",
labels,
registry
)
.unwrap();

Metrics {
udf_success_count,
udf_failure_count,
udf_input_chunk_rows,
udf_latency,
udf_input_rows,
udf_input_bytes,
}
}
}

0 comments on commit bf96a5e

Please sign in to comment.