Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): add metric of UDF memory usage #16922

Merged
merged 10 commits into from
Jun 3, 2024
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = "0.3"
arrow-udf-js = "0.3.1"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "fa36365" }
arrow-udf-wasm = { version = "0.2.2", features = ["build"] }
arrow-udf-python = "0.2"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4268,6 +4268,20 @@ def section_udf(outer_panels):
),
],
),
panels.timeseries_bytes(
"UDF Memory Usage (bytes)",
"",
wangrunji0408 marked this conversation as resolved.
Show resolved Hide resolved
[
panels.target(
f"sum({metric('udf_memory_usage')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_memory_usage - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum({metric('udf_memory_usage')}) by (name, fragment_id)",
"udf_memory_usage - {{name}} {{fragment_id}}",
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion src/common/metrics/src/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ macro_rules! register_guarded_histogram_vec_with_registry {
$REGISTRY
}
}};
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $BUCKETS:expr, $REGISTRY:expr $(,)?) => {{
$crate::register_guarded_histogram_vec_with_registry! {
{prometheus::histogram_opts!($NAME, $HELP, $BUCKETS)},
$LABELS_NAMES,
$REGISTRY
}
}};
($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let inner = prometheus::HistogramVec::new($HOPTS, $LABELS_NAMES);
inner.and_then(|inner| {
Expand Down Expand Up @@ -330,7 +337,7 @@ pub struct LabelGuardedMetric<T, const N: usize> {
_guard: Arc<LabelGuard<N>>,
}

impl<T: MetricVecBuilder, const N: usize> Debug for LabelGuardedMetric<T, N> {
impl<T, const N: usize> Debug for LabelGuardedMetric<T, N> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LabelGuardedMetric").finish()
}
Expand Down
114 changes: 71 additions & 43 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, LazyLock};

use anyhow::Context;
use arrow_schema::{Fields, Schema, SchemaRef};
use await_tree::InstrumentAwait;
use prometheus::{
exponential_buckets, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, Histogram, HistogramVec, IntCounter, IntCounterVec,
Registry,
};
use prometheus::{exponential_buckets, Registry};
use risingwave_common::array::arrow::{FromArrow, ToArrow, UdfArrowConvert};
use risingwave_common::array::{Array, ArrayRef, DataChunk};
use risingwave_common::metrics::*;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -114,6 +112,10 @@ impl UserDefinedFunction {
&self.metrics.failure_count
}
.inc();
// update memory usage
self.metrics
.memory_usage_bytes
.set(self.runtime.memory_usage() as i64);

let arrow_output = arrow_output_result?;

Expand Down Expand Up @@ -198,16 +200,15 @@ impl Build for UserDefinedFunction {
.try_collect::<Fields>()?,
));

// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned)
.unwrap_or(0)
.to_string();
let labels: &[&str; 4] = &[
udf.link.as_deref().unwrap_or(""),
let metrics = GLOBAL_METRICS.with_label_values(
link.unwrap_or(""),
language,
identifier,
fragment_id.as_str(),
];
// batch query does not have a fragment_id
&FRAGMENT_ID::try_with(ToOwned::to_owned)
.unwrap_or(0)
.to_string(),
);

Ok(Self {
children: udf.children.iter().map(build_child).try_collect()?,
Expand All @@ -217,7 +218,7 @@ impl Build for UserDefinedFunction {
runtime,
arrow_convert,
span: format!("udf_call({})", identifier).into(),
metrics: GLOBAL_METRICS.with_label_values(labels),
metrics,
})
}
}
Expand All @@ -226,39 +227,43 @@ impl Build for UserDefinedFunction {
#[derive(Debug, Clone)]
struct MetricsVec {
/// Number of successful UDF calls.
success_count: IntCounterVec,
success_count: LabelGuardedIntCounterVec<4>,
/// Number of failed UDF calls.
failure_count: IntCounterVec,
failure_count: LabelGuardedIntCounterVec<4>,
/// Total number of retried UDF calls.
retry_count: IntCounterVec,
retry_count: LabelGuardedIntCounterVec<4>,
/// Input chunk rows of UDF calls.
input_chunk_rows: HistogramVec,
input_chunk_rows: LabelGuardedHistogramVec<4>,
/// The latency of UDF calls in seconds.
latency: HistogramVec,
latency: LabelGuardedHistogramVec<4>,
/// Total number of input rows of UDF calls.
input_rows: IntCounterVec,
input_rows: LabelGuardedIntCounterVec<4>,
/// Total number of input bytes of UDF calls.
input_bytes: IntCounterVec,
input_bytes: LabelGuardedIntCounterVec<4>,
/// Total memory usage of UDF runtime in bytes.
memory_usage_bytes: LabelGuardedIntGaugeVec<5>,
}

/// Monitor metrics for UDF.
#[derive(Debug, Clone)]
struct Metrics {
/// Number of successful UDF calls.
success_count: IntCounter,
success_count: LabelGuardedIntCounter<4>,
/// Number of failed UDF calls.
failure_count: IntCounter,
failure_count: LabelGuardedIntCounter<4>,
/// Total number of retried UDF calls.
#[allow(dead_code)]
retry_count: IntCounter,
retry_count: LabelGuardedIntCounter<4>,
/// Input chunk rows of UDF calls.
input_chunk_rows: Histogram,
input_chunk_rows: LabelGuardedHistogram<4>,
/// The latency of UDF calls in seconds.
latency: Histogram,
latency: LabelGuardedHistogram<4>,
/// Total number of input rows of UDF calls.
input_rows: IntCounter,
input_rows: LabelGuardedIntCounter<4>,
/// Total number of input bytes of UDF calls.
input_bytes: IntCounter,
input_bytes: LabelGuardedIntCounter<4>,
/// Total memory usage of UDF runtime in bytes.
memory_usage_bytes: LabelGuardedIntGauge<5>,
}

/// Global UDF metrics.
Expand All @@ -268,57 +273,65 @@ static GLOBAL_METRICS: LazyLock<MetricsVec> =
impl MetricsVec {
fn new(registry: &Registry) -> Self {
let labels = &["link", "language", "name", "fragment_id"];
let success_count = register_int_counter_vec_with_registry!(
let labels5 = &["link", "language", "name", "fragment_id", "instance_id"];
let success_count = register_guarded_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
labels,
registry
)
.unwrap();
let failure_count = register_int_counter_vec_with_registry!(
let failure_count = register_guarded_int_counter_vec_with_registry!(
"udf_failure_count",
"Total number of failed UDF calls",
labels,
registry
)
.unwrap();
let retry_count = register_int_counter_vec_with_registry!(
let retry_count = register_guarded_int_counter_vec_with_registry!(
"udf_retry_count",
"Total number of retried UDF calls",
labels,
registry
)
.unwrap();
let input_chunk_rows = register_histogram_vec_with_registry!(
let input_chunk_rows = register_guarded_histogram_vec_with_registry!(
"udf_input_chunk_rows",
"Input chunk rows of UDF calls",
labels,
exponential_buckets(1.0, 2.0, 10).unwrap(), // 1 to 1024
registry
)
.unwrap();
let latency = register_histogram_vec_with_registry!(
let latency = register_guarded_histogram_vec_with_registry!(
"udf_latency",
"The latency(s) of UDF calls",
labels,
exponential_buckets(0.000001, 2.0, 30).unwrap(), // 1us to 1000s
registry
)
.unwrap();
let input_rows = register_int_counter_vec_with_registry!(
let input_rows = register_guarded_int_counter_vec_with_registry!(
"udf_input_rows",
"Total number of input rows of UDF calls",
labels,
registry
)
.unwrap();
let input_bytes = register_int_counter_vec_with_registry!(
let input_bytes = register_guarded_int_counter_vec_with_registry!(
"udf_input_bytes",
"Total number of input bytes of UDF calls",
labels,
registry
)
.unwrap();
let memory_usage_bytes = register_guarded_int_gauge_vec_with_registry!(
"udf_memory_usage",
"Total memory usage of UDF runtime in bytes",
labels5,
registry
)
.unwrap();

MetricsVec {
success_count,
Expand All @@ -328,18 +341,33 @@ impl MetricsVec {
latency,
input_rows,
input_bytes,
memory_usage_bytes,
}
}

fn with_label_values(&self, values: &[&str; 4]) -> Metrics {
fn with_label_values(
&self,
link: &str,
language: &str,
identifier: &str,
fragment_id: &str,
) -> Metrics {
// generate an unique id for each instance
static NEXT_INSTANCE_ID: AtomicU64 = AtomicU64::new(0);
let instance_id = NEXT_INSTANCE_ID.fetch_add(1, Ordering::Relaxed).to_string();
Comment on lines +355 to +357
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory usage is a gauge. So we need to distinguish each instance's metric by a different label, otherwise the value will be overridden rather than accumulated.


let labels = &[link, language, identifier, fragment_id];
let labels5 = &[link, language, identifier, fragment_id, &instance_id];

Metrics {
success_count: self.success_count.with_label_values(values),
failure_count: self.failure_count.with_label_values(values),
retry_count: self.retry_count.with_label_values(values),
input_chunk_rows: self.input_chunk_rows.with_label_values(values),
latency: self.latency.with_label_values(values),
input_rows: self.input_rows.with_label_values(values),
input_bytes: self.input_bytes.with_label_values(values),
success_count: self.success_count.with_guarded_label_values(labels),
failure_count: self.failure_count.with_guarded_label_values(labels),
retry_count: self.retry_count.with_guarded_label_values(labels),
input_chunk_rows: self.input_chunk_rows.with_guarded_label_values(labels),
latency: self.latency.with_guarded_label_values(labels),
input_rows: self.input_rows.with_guarded_label_values(labels),
input_bytes: self.input_bytes.with_guarded_label_values(labels),
memory_usage_bytes: self.memory_usage_bytes.with_guarded_label_values(labels5),
}
}
}
7 changes: 7 additions & 0 deletions src/expr/core/src/sig/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,11 @@ pub trait UdfImpl: std::fmt::Debug + Send + Sync {
fn is_legacy(&self) -> bool {
false
}

/// Return the memory size consumed by UDF runtime in bytes.
///
/// If not available, return 0.
fn memory_usage(&self) -> usize {
0
}
}
4 changes: 4 additions & 0 deletions src/expr/impl/src/udf/quickjs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ impl UdfImpl for QuickJsFunction {
fn call_agg_finish(&self, state: &ArrayRef) -> Result<ArrayRef> {
self.runtime.finish(&self.identifier, state)
}

fn memory_usage(&self) -> usize {
self.runtime.memory_usage().malloc_size as usize
}
}
Loading