Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): add udf metrics to embeded udf, refactor external udf metrics. #15506

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 53 additions & 16 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, LazyLock, Weak};
use std::time::Duration;

use anyhow::Context;
use anyhow::{Context, Error};
use arrow_schema::{Field, Fields, Schema};
use arrow_udf_js::{CallMode as JsCallMode, Runtime as JsRuntime};
#[cfg(feature = "embedded-python-udf")]
Expand All @@ -32,6 +32,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::expr_context::FRAGMENT_ID;
use risingwave_pb::expr::ExprNode;
use risingwave_udf::metrics::GLOBAL_METRICS;
use risingwave_udf::ArrowFlightUdfClient;
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -110,34 +111,59 @@ impl UserDefinedFunction {
// this will drop invisible rows
let arrow_input = arrow_array::RecordBatch::try_from(input)?;

let arrow_output: arrow_array::RecordBatch = match &self.imp {
UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &arrow_input)?,
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input)?,
// metrics
let metrics = &*GLOBAL_METRICS;
// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned)
.unwrap_or(0)
.to_string();
let addr = match &self.imp {
UdfImpl::External(client) => client.get_addr(),
_ => "",
};
let language = match &self.imp {
UdfImpl::Wasm(_) => "wasm",
UdfImpl::JavaScript(_) => "javascript",
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(runtime) => runtime.call(&self.identifier, &arrow_input)?,
UdfImpl::External(client) => {
// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned).unwrap_or(0);
UdfImpl::Python(_) => "python",
UdfImpl::External(_) => "external",
};
let labels: &[&str; 4] = &[addr, language, &self.identifier, fragment_id.as_str()];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
.observe(arrow_input.num_rows() as f64);
metrics
.udf_input_rows
.with_label_values(labels)
.inc_by(arrow_input.num_rows() as u64);
metrics
.udf_input_bytes
.with_label_values(labels)
.inc_by(arrow_input.get_array_memory_size() as u64);
let timer = metrics.udf_latency.with_label_values(labels).start_timer();

let arrow_output_result: Result<arrow_array::RecordBatch, Error> = match &self.imp {
UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &arrow_input),
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input),
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(runtime) => runtime.call(&self.identifier, &arrow_input),
UdfImpl::External(client) => {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if self.always_retry_on_network_error {
client
.call_with_always_retry_on_network_error(
&self.identifier,
arrow_input,
fragment_id,
)
.call_with_always_retry_on_network_error(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
} else {
let result = if disable_retry_count != 0 {
client
.call(&self.identifier, arrow_input, fragment_id)
.call(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, arrow_input, fragment_id)
.call_with_retry(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
};
Expand All @@ -158,9 +184,20 @@ impl UserDefinedFunction {
}
result
};
result?
result.map_err(|e| e.into())
}
};
timer.stop_and_record();
if arrow_output_result.is_ok() {
&metrics.udf_success_count
} else {
&metrics.udf_failure_count
}
.with_label_values(labels)
.inc();

let arrow_output = arrow_output_result?;

if arrow_output.num_rows() != input.cardinality() {
bail!(
"UDF returned {} rows, but expected {}",
Expand Down
4 changes: 2 additions & 2 deletions src/expr/udf/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() {
.unwrap();

let output = client
.call("gcd", input2, 0)
.call("gcd", input2)
.await
.expect("failed to call function");

Expand All @@ -68,7 +68,7 @@ async fn main() {
.unwrap();

let output = client
.call("gcd3", input3, 0)
.call("gcd3", input3)
.await
.expect("failed to call function");

Expand Down
53 changes: 9 additions & 44 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

// Interval between two successive probes of the UDF DNS.
Expand Down Expand Up @@ -144,40 +143,8 @@ impl ArrowFlightUdfClient {
}

/// Call a function.
pub async fn call(
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let metrics = &*GLOBAL_METRICS;
let fragment_id_str = fragment_id.to_string();
let labels = &[self.addr.as_str(), id, fragment_id_str.as_str()];
metrics
.udf_input_chunk_rows
.with_label_values(labels)
.observe(input.num_rows() as f64);
metrics
.udf_input_rows
.with_label_values(labels)
.inc_by(input.num_rows() as u64);
metrics
.udf_input_bytes
.with_label_values(labels)
.inc_by(input.get_array_memory_size() as u64);
let timer = metrics.udf_latency.with_label_values(labels).start_timer();

let result = self.call_internal(id, input).await;

timer.stop_and_record();
if result.is_ok() {
&metrics.udf_success_count
} else {
&metrics.udf_failure_count
}
.with_label_values(labels)
.inc();
result
pub async fn call(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
self.call_internal(id, input).await
}

async fn call_internal(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
Expand All @@ -195,15 +162,10 @@ impl ArrowFlightUdfClient {
}

/// Call a function, retry up to 5 times / 3s if connection is broken.
pub async fn call_with_retry(
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
pub async fn call_with_retry(&self, id: &str, input: RecordBatch) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
for i in 0..5 {
match self.call(id, input.clone(), fragment_id).await {
match self.call(id, input.clone()).await {
Err(err) if err.is_connection_error() && i != 4 => {
tracing::error!(error = %err.as_report(), "UDF connection error. retry...");
}
Expand All @@ -220,11 +182,10 @@ impl ArrowFlightUdfClient {
&self,
id: &str,
input: RecordBatch,
fragment_id: u32,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
loop {
match self.call(id, input.clone(), fragment_id).await {
match self.call(id, input.clone()).await {
Err(err) if err.is_tonic_error() => {
tracing::error!(error = %err.as_report(), "UDF tonic error. retry...");
}
Expand Down Expand Up @@ -278,6 +239,10 @@ impl ArrowFlightUdfClient {
stream.map_err(|e| e.into()),
))
}

pub fn get_addr(&self) -> &str {
&self.addr
}
}

/// Check if two list of data types match, ignoring field names.
Expand Down
3 changes: 2 additions & 1 deletion src/expr/udf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

mod error;
mod external;
mod metrics;
pub mod metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this module can be moved to expr/core now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me hold this first, I am thinking if we add some specific for in-flight udf, the metrics mod should better stay here. After solve this problem, I can move it in other PR.


pub use error::{Error, Result};
pub use external::ArrowFlightUdfClient;
pub use metrics::GLOBAL_METRICS;
2 changes: 1 addition & 1 deletion src/expr/udf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub static GLOBAL_METRICS: LazyLock<Metrics> =

impl Metrics {
fn new(registry: &Registry) -> Self {
let labels = &["link", "name", "fragment_id"];
let labels = &["link", "language", "name", "fragment_id"];
let udf_success_count = register_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
Expand Down
Loading