Skip to content

Commit

Permalink
feat(udf): add udf metrics to embeded udf, refactor external udf metr…
Browse files Browse the repository at this point in the history
…ics. (#15506)
  • Loading branch information
yufansong authored Mar 7, 2024
1 parent 1a9e6e4 commit 3a2e574
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 64 deletions.
69 changes: 53 additions & 16 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, LazyLock, Weak};
use std::time::Duration;

use anyhow::Context;
use anyhow::{Context, Error};
use arrow_schema::{Field, Fields, Schema};
use arrow_udf_js::{CallMode as JsCallMode, Runtime as JsRuntime};
#[cfg(feature = "embedded-python-udf")]
Expand All @@ -32,6 +32,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr_context::FRAGMENT_ID;
use risingwave_pb::expr::ExprNode;
use risingwave_udf::metrics::GLOBAL_METRICS;
use risingwave_udf::ArrowFlightUdfClient;
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -110,34 +111,59 @@ impl UserDefinedFunction {
// this will drop invisible rows
let arrow_input = arrow_array::RecordBatch::try_from(input)?;

let arrow_output: arrow_array::RecordBatch = match &self.imp {
UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &arrow_input)?,
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input)?,
// metrics
let metrics = &*GLOBAL_METRICS;
// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned)
.unwrap_or(0)
.to_string();
let addr = match &self.imp {
UdfImpl::External(client) => client.get_addr(),
_ => "",
};
let language = match &self.imp {
UdfImpl::Wasm(_) => "wasm",
UdfImpl::JavaScript(_) => "javascript",
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(runtime) => runtime.call(&self.identifier, &arrow_input)?,
UdfImpl::External(client) => {
// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned).unwrap_or(0);
UdfImpl::Python(_) => "python",
UdfImpl::External(_) => "external",
};
let labels: &[&str; 4] = &[addr, language, &self.identifier, fragment_id.as_str()];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
.observe(arrow_input.num_rows() as f64);
metrics
.udf_input_rows
.with_label_values(labels)
.inc_by(arrow_input.num_rows() as u64);
metrics
.udf_input_bytes
.with_label_values(labels)
.inc_by(arrow_input.get_array_memory_size() as u64);
let timer = metrics.udf_latency.with_label_values(labels).start_timer();

let arrow_output_result: Result<arrow_array::RecordBatch, Error> = match &self.imp {
UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &arrow_input),
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input),
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(runtime) => runtime.call(&self.identifier, &arrow_input),
UdfImpl::External(client) => {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if self.always_retry_on_network_error {
client
.call_with_always_retry_on_network_error(
&self.identifier,
arrow_input,
fragment_id,
)
.call_with_always_retry_on_network_error(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
} else {
let result = if disable_retry_count != 0 {
client
.call(&self.identifier, arrow_input, fragment_id)
.call(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, arrow_input, fragment_id)
.call_with_retry(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
};
Expand All @@ -158,9 +184,20 @@ impl UserDefinedFunction {
}
result
};
result?
result.map_err(|e| e.into())
}
};
timer.stop_and_record();
if arrow_output_result.is_ok() {
&metrics.udf_success_count
} else {
&metrics.udf_failure_count
}
.with_label_values(labels)
.inc();

let arrow_output = arrow_output_result?;

if arrow_output.num_rows() != input.cardinality() {
bail!(
"UDF returned {} rows, but expected {}",
Expand Down
4 changes: 2 additions & 2 deletions src/expr/udf/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() {
.unwrap();

let output = client
.call("gcd", input2, 0)
.call("gcd", input2)
.await
.expect("failed to call function");

Expand All @@ -68,7 +68,7 @@ async fn main() {
.unwrap();

let output = client
.call("gcd3", input3, 0)
.call("gcd3", input3)
.await
.expect("failed to call function");

Expand Down
53 changes: 9 additions & 44 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

// Interval between two successive probes of the UDF DNS.
Expand Down Expand Up @@ -144,40 +143,8 @@ impl ArrowFlightUdfClient {
}

/// Call a function.
pub async fn call(
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let metrics = &*GLOBAL_METRICS;
let fragment_id_str = fragment_id.to_string();
let labels = &[self.addr.as_str(), id, fragment_id_str.as_str()];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
.observe(input.num_rows() as f64);
metrics
.udf_input_rows
.with_label_values(labels)
.inc_by(input.num_rows() as u64);
metrics
.udf_input_bytes
.with_label_values(labels)
.inc_by(input.get_array_memory_size() as u64);
let timer = metrics.udf_latency.with_label_values(labels).start_timer();

let result = self.call_internal(id, input).await;

timer.stop_and_record();
if result.is_ok() {
&metrics.udf_success_count
} else {
&metrics.udf_failure_count
}
.with_label_values(labels)
.inc();
result
pub async fn call(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
self.call_internal(id, input).await
}

async fn call_internal(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
Expand All @@ -195,15 +162,10 @@ impl ArrowFlightUdfClient {
}

/// Call a function, retry up to 5 times / 3s if connection is broken.
pub async fn call_with_retry(
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
pub async fn call_with_retry(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
for i in 0..5 {
match self.call(id, input.clone(), fragment_id).await {
match self.call(id, input.clone()).await {
Err(err) if err.is_connection_error() && i != 4 => {
tracing::error!(error = %err.as_report(), "UDF connection error. retry...");
}
Expand All @@ -220,11 +182,10 @@ impl ArrowFlightUdfClient {
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
loop {
match self.call(id, input.clone(), fragment_id).await {
match self.call(id, input.clone()).await {
Err(err) if err.is_tonic_error() => {
tracing::error!(error = %err.as_report(), "UDF tonic error. retry...");
}
Expand Down Expand Up @@ -278,6 +239,10 @@ impl ArrowFlightUdfClient {
stream.map_err(|e| e.into()),
))
}

pub fn get_addr(&self) -> &str {
&self.addr
}
}

/// Check if two list of data types match, ignoring field names.
Expand Down
3 changes: 2 additions & 1 deletion src/expr/udf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

mod error;
mod external;
mod metrics;
pub mod metrics;

pub use error::{Error, Result};
pub use external::ArrowFlightUdfClient;
pub use metrics::GLOBAL_METRICS;
2 changes: 1 addition & 1 deletion src/expr/udf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub static GLOBAL_METRICS: LazyLock<Metrics> =

impl Metrics {
fn new(registry: &Registry) -> Self {
let labels = &["link", "name", "fragment_id"];
let labels = &["link", "language", "name", "fragment_id"];
let udf_success_count = register_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
Expand Down

0 comments on commit 3a2e574

Please sign in to comment.