Skip to content

Commit

Permalink
feat(udf): add metric of UDF memory usage (#16922)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
wangrunji0408 and xxchan authored Jun 3, 2024
1 parent 144c7cc commit da28570
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 49 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = "0.3"
arrow-udf-js = "0.3.1"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "fa36365" }
arrow-udf-wasm = { version = "0.2.2", features = ["build"] }
arrow-udf-python = "0.2"
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4365,6 +4365,20 @@ def section_udf(outer_panels):
),
],
),
panels.timeseries_bytes(
"UDF Memory Usage (bytes)",
"Currently only embedded JS UDF supports this. Others will always show 0.",
[
panels.target(
f"sum({metric('udf_memory_usage')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"udf_memory_usage - {{%s}}" % NODE_LABEL,
),
panels.target(
f"sum({metric('udf_memory_usage')}) by (name, fragment_id)",
"udf_memory_usage - {{name}} {{fragment_id}}",
),
],
),
],
)
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion src/common/metrics/src/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ macro_rules! register_guarded_histogram_vec_with_registry {
$REGISTRY
}
}};
($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $BUCKETS:expr, $REGISTRY:expr $(,)?) => {{
$crate::register_guarded_histogram_vec_with_registry! {
{prometheus::histogram_opts!($NAME, $HELP, $BUCKETS)},
$LABELS_NAMES,
$REGISTRY
}
}};
($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
let inner = prometheus::HistogramVec::new($HOPTS, $LABELS_NAMES);
inner.and_then(|inner| {
Expand Down Expand Up @@ -330,7 +337,7 @@ pub struct LabelGuardedMetric<T, const N: usize> {
_guard: Arc<LabelGuard<N>>,
}

impl<T: MetricVecBuilder, const N: usize> Debug for LabelGuardedMetric<T, N> {
impl<T, const N: usize> Debug for LabelGuardedMetric<T, N> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LabelGuardedMetric").finish()
}
Expand Down
114 changes: 71 additions & 43 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, LazyLock};

use anyhow::Context;
use arrow_schema::{Fields, Schema, SchemaRef};
use await_tree::InstrumentAwait;
use prometheus::{
exponential_buckets, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, Histogram, HistogramVec, IntCounter, IntCounterVec,
Registry,
};
use prometheus::{exponential_buckets, Registry};
use risingwave_common::array::arrow::{FromArrow, ToArrow, UdfArrowConvert};
use risingwave_common::array::{Array, ArrayRef, DataChunk};
use risingwave_common::metrics::*;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -114,6 +112,10 @@ impl UserDefinedFunction {
&self.metrics.failure_count
}
.inc();
// update memory usage
self.metrics
.memory_usage_bytes
.set(self.runtime.memory_usage() as i64);

let arrow_output = arrow_output_result?;

Expand Down Expand Up @@ -198,16 +200,15 @@ impl Build for UserDefinedFunction {
.try_collect::<Fields>()?,
));

// batch query does not have a fragment_id
let fragment_id = FRAGMENT_ID::try_with(ToOwned::to_owned)
.unwrap_or(0)
.to_string();
let labels: &[&str; 4] = &[
udf.link.as_deref().unwrap_or(""),
let metrics = GLOBAL_METRICS.with_label_values(
link.unwrap_or(""),
language,
identifier,
fragment_id.as_str(),
];
// batch query does not have a fragment_id
&FRAGMENT_ID::try_with(ToOwned::to_owned)
.unwrap_or(0)
.to_string(),
);

Ok(Self {
children: udf.children.iter().map(build_child).try_collect()?,
Expand All @@ -217,7 +218,7 @@ impl Build for UserDefinedFunction {
runtime,
arrow_convert,
span: format!("udf_call({})", identifier).into(),
metrics: GLOBAL_METRICS.with_label_values(labels),
metrics,
})
}
}
Expand All @@ -226,39 +227,43 @@ impl Build for UserDefinedFunction {
#[derive(Debug, Clone)]
struct MetricsVec {
/// Number of successful UDF calls.
success_count: IntCounterVec,
success_count: LabelGuardedIntCounterVec<4>,
/// Number of failed UDF calls.
failure_count: IntCounterVec,
failure_count: LabelGuardedIntCounterVec<4>,
/// Total number of retried UDF calls.
retry_count: IntCounterVec,
retry_count: LabelGuardedIntCounterVec<4>,
/// Input chunk rows of UDF calls.
input_chunk_rows: HistogramVec,
input_chunk_rows: LabelGuardedHistogramVec<4>,
/// The latency of UDF calls in seconds.
latency: HistogramVec,
latency: LabelGuardedHistogramVec<4>,
/// Total number of input rows of UDF calls.
input_rows: IntCounterVec,
input_rows: LabelGuardedIntCounterVec<4>,
/// Total number of input bytes of UDF calls.
input_bytes: IntCounterVec,
input_bytes: LabelGuardedIntCounterVec<4>,
/// Total memory usage of UDF runtime in bytes.
memory_usage_bytes: LabelGuardedIntGaugeVec<5>,
}

/// Monitor metrics for UDF.
#[derive(Debug, Clone)]
struct Metrics {
/// Number of successful UDF calls.
success_count: IntCounter,
success_count: LabelGuardedIntCounter<4>,
/// Number of failed UDF calls.
failure_count: IntCounter,
failure_count: LabelGuardedIntCounter<4>,
/// Total number of retried UDF calls.
#[allow(dead_code)]
retry_count: IntCounter,
retry_count: LabelGuardedIntCounter<4>,
/// Input chunk rows of UDF calls.
input_chunk_rows: Histogram,
input_chunk_rows: LabelGuardedHistogram<4>,
/// The latency of UDF calls in seconds.
latency: Histogram,
latency: LabelGuardedHistogram<4>,
/// Total number of input rows of UDF calls.
input_rows: IntCounter,
input_rows: LabelGuardedIntCounter<4>,
/// Total number of input bytes of UDF calls.
input_bytes: IntCounter,
input_bytes: LabelGuardedIntCounter<4>,
/// Total memory usage of UDF runtime in bytes.
memory_usage_bytes: LabelGuardedIntGauge<5>,
}

/// Global UDF metrics.
Expand All @@ -268,57 +273,65 @@ static GLOBAL_METRICS: LazyLock<MetricsVec> =
impl MetricsVec {
fn new(registry: &Registry) -> Self {
let labels = &["link", "language", "name", "fragment_id"];
let success_count = register_int_counter_vec_with_registry!(
let labels5 = &["link", "language", "name", "fragment_id", "instance_id"];
let success_count = register_guarded_int_counter_vec_with_registry!(
"udf_success_count",
"Total number of successful UDF calls",
labels,
registry
)
.unwrap();
let failure_count = register_int_counter_vec_with_registry!(
let failure_count = register_guarded_int_counter_vec_with_registry!(
"udf_failure_count",
"Total number of failed UDF calls",
labels,
registry
)
.unwrap();
let retry_count = register_int_counter_vec_with_registry!(
let retry_count = register_guarded_int_counter_vec_with_registry!(
"udf_retry_count",
"Total number of retried UDF calls",
labels,
registry
)
.unwrap();
let input_chunk_rows = register_histogram_vec_with_registry!(
let input_chunk_rows = register_guarded_histogram_vec_with_registry!(
"udf_input_chunk_rows",
"Input chunk rows of UDF calls",
labels,
exponential_buckets(1.0, 2.0, 10).unwrap(), // 1 to 1024
registry
)
.unwrap();
let latency = register_histogram_vec_with_registry!(
let latency = register_guarded_histogram_vec_with_registry!(
"udf_latency",
"The latency(s) of UDF calls",
labels,
exponential_buckets(0.000001, 2.0, 30).unwrap(), // 1us to 1000s
registry
)
.unwrap();
let input_rows = register_int_counter_vec_with_registry!(
let input_rows = register_guarded_int_counter_vec_with_registry!(
"udf_input_rows",
"Total number of input rows of UDF calls",
labels,
registry
)
.unwrap();
let input_bytes = register_int_counter_vec_with_registry!(
let input_bytes = register_guarded_int_counter_vec_with_registry!(
"udf_input_bytes",
"Total number of input bytes of UDF calls",
labels,
registry
)
.unwrap();
let memory_usage_bytes = register_guarded_int_gauge_vec_with_registry!(
"udf_memory_usage",
"Total memory usage of UDF runtime in bytes",
labels5,
registry
)
.unwrap();

MetricsVec {
success_count,
Expand All @@ -328,18 +341,33 @@ impl MetricsVec {
latency,
input_rows,
input_bytes,
memory_usage_bytes,
}
}

fn with_label_values(&self, values: &[&str; 4]) -> Metrics {
fn with_label_values(
&self,
link: &str,
language: &str,
identifier: &str,
fragment_id: &str,
) -> Metrics {
// generate an unique id for each instance
static NEXT_INSTANCE_ID: AtomicU64 = AtomicU64::new(0);
let instance_id = NEXT_INSTANCE_ID.fetch_add(1, Ordering::Relaxed).to_string();

let labels = &[link, language, identifier, fragment_id];
let labels5 = &[link, language, identifier, fragment_id, &instance_id];

Metrics {
success_count: self.success_count.with_label_values(values),
failure_count: self.failure_count.with_label_values(values),
retry_count: self.retry_count.with_label_values(values),
input_chunk_rows: self.input_chunk_rows.with_label_values(values),
latency: self.latency.with_label_values(values),
input_rows: self.input_rows.with_label_values(values),
input_bytes: self.input_bytes.with_label_values(values),
success_count: self.success_count.with_guarded_label_values(labels),
failure_count: self.failure_count.with_guarded_label_values(labels),
retry_count: self.retry_count.with_guarded_label_values(labels),
input_chunk_rows: self.input_chunk_rows.with_guarded_label_values(labels),
latency: self.latency.with_guarded_label_values(labels),
input_rows: self.input_rows.with_guarded_label_values(labels),
input_bytes: self.input_bytes.with_guarded_label_values(labels),
memory_usage_bytes: self.memory_usage_bytes.with_guarded_label_values(labels5),
}
}
}
7 changes: 7 additions & 0 deletions src/expr/core/src/sig/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,11 @@ pub trait UdfImpl: std::fmt::Debug + Send + Sync {
fn is_legacy(&self) -> bool {
false
}

/// Return the memory size consumed by UDF runtime in bytes.
///
/// If not available, return 0.
fn memory_usage(&self) -> usize {
0
}
}
4 changes: 4 additions & 0 deletions src/expr/impl/src/udf/quickjs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ impl UdfImpl for QuickJsFunction {
fn call_agg_finish(&self, state: &ArrayRef) -> Result<ArrayRef> {
self.runtime.finish(&self.identifier, state)
}

fn memory_usage(&self) -> usize {
self.runtime.memory_usage().malloc_size as usize
}
}

0 comments on commit da28570

Please sign in to comment.