Skip to content

Commit

Permalink
feat(udf): Add deno as UDF language
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 15, 2024
1 parent 9b84ae0 commit fe66c2a
Show file tree
Hide file tree
Showing 23 changed files with 2,300 additions and 269 deletions.
2,124 changes: 1,872 additions & 252 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
# FIXME: This should be changed to the official crate once it's published.
arrow-udf-deno = { git = "https://github.com/bakjos/arrow-udf.git", rev = "8660631" }
arrow-udf-js = "0.1"
arrow-udf-wasm = { version = "0.2", features = ["build"] }
arrow-udf-python = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "6c32f71" }
Expand Down
6 changes: 6 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ is_release = get_env ENABLE_RELEASE_PROFILE
is_not_release = not ${is_release}
is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING
is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE
is_deno_udf_enabled = get_env ENABLE_DENO_UDF
is_python_udf_enabled = get_env ENABLE_PYTHON_UDF
if ${is_sanitizer_enabled}
Expand All @@ -60,6 +61,11 @@ else
set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link"
end
if ${is_deno_udf_enabled}
flags = get_env RISINGWAVE_FEATURE_FLAGS
set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features embedded-deno-udf"
end
if ${is_python_udf_enabled}
flags = get_env RISINGWAVE_FEATURE_FLAGS
set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features embedded-python-udf"
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ cargo build \
-p risingwave_compaction_test \
-p risingwave_e2e_extended_mode_test \
$RISINGWAVE_FEATURE_FLAGS \
--features embedded-deno-udf \
--features embedded-python-udf \
--profile "$profile"

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ WORKDIR /risingwave
ENV ENABLE_BUILD_DASHBOARD=1

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" --features embedded-python-udf && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" --features embedded-deno-udf --features embedded-python-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ENV JAVA_HOME ${JAVA_HOME_PATH}
ENV LD_LIBRARY_PATH ${JAVA_HOME_PATH}/lib/server:${LD_LIBRARY_PATH}

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release -p risingwave_object_store --features hdfs-backend --features "rw-static-link" --features embedded-python-udf && \
cargo build -p risingwave_cmd_all --release -p risingwave_object_store --features hdfs-backend --features "rw-static-link" --features embedded-deno-udf --features embedded-python-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \
Expand Down
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ message Function {
// The zstd-compressed binary of the function.
optional bytes compressed_binary = 17;
bool always_retry_on_network_error = 16;
optional string param_name = 18;
optional string param_value = 19;

oneof kind {
ScalarFunction scalar = 11;
Expand Down
6 changes: 6 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ message UserDefinedFunction {
// - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary.
optional bytes compressed_binary = 10;
bool always_retry_on_network_error = 9;

optional string param_name = 11;
optional string param_value = 12;
}

// Additional information for user defined table functions.
Expand All @@ -548,4 +551,7 @@ message UserDefinedTableFunction {
optional string identifier = 6;
optional string body = 7;
optional bytes compressed_binary = 10;
optional string param_name = 11;
optional string param_value = 12;

}
3 changes: 2 additions & 1 deletion src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ repository = { workspace = true }
[features]
rw-static-link = ["workspace-config/rw-static-link"]
rw-dynamic-link = ["workspace-config/rw-dynamic-link"]
embedded-deno-udf = ["risingwave_expr/embedded-deno-udf"]
embedded-python-udf = ["risingwave_expr/embedded-python-udf"]
default = ["rw-static-link"]
default = ["embedded-deno-udf", "rw-static-link"]

[package.metadata.cargo-machete]
ignored = ["workspace-hack", "workspace-config", "task_stats_alloc"]
Expand Down
3 changes: 3 additions & 0 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ ignored = ["workspace-hack", "ctor"]
normal = ["workspace-hack", "ctor"]

[features]
embedded-deno-udf = ["arrow-udf-deno"]
embedded-python-udf = ["arrow-udf-python"]

[dependencies]
anyhow = "1"
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-udf-deno = { workspace = true, optional = true }
arrow-udf-js = { workspace = true }
arrow-udf-python = { workspace = true, optional = true }
arrow-udf-wasm = { workspace = true }
Expand All @@ -38,6 +40,7 @@ easy-ext = "1"
educe = "0.5"
either = "1"
enum-as-inner = "0.6"
futures = "0.3"
futures-async-stream = { workspace = true }
futures-util = "0.3"
itertools = "0.12"
Expand Down
61 changes: 61 additions & 0 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::time::Duration;

use anyhow::{Context, Error};
use arrow_schema::{Field, Fields, Schema};
#[cfg(feature = "embedded-deno-udf")]
use arrow_udf_deno::{CallMode as DenoCallMode, Runtime as DenoRuntime};
use arrow_udf_js::{CallMode as JsCallMode, Runtime as JsRuntime};
#[cfg(feature = "embedded-python-udf")]
use arrow_udf_python::{CallMode as PythonCallMode, Runtime as PythonRuntime};
Expand Down Expand Up @@ -73,6 +75,8 @@ pub enum UdfImpl {
JavaScript(JsRuntime),
#[cfg(feature = "embedded-python-udf")]
Python(PythonRuntime),
#[cfg(feature = "embedded-deno-udf")]
Deno(Arc<DenoRuntime>),
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -130,6 +134,8 @@ impl UserDefinedFunction {
UdfImpl::JavaScript(_) => "javascript",
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(_) => "python",
#[cfg(feature = "embedded-deno-udf")]
UdfImpl::Deno(_) => "deno",
UdfImpl::External(_) => "external",
};
let labels: &[&str; 4] = &[addr, language, &self.identifier, fragment_id.as_str()];
Expand All @@ -152,6 +158,10 @@ impl UserDefinedFunction {
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input),
#[cfg(feature = "embedded-python-udf")]
UdfImpl::Python(runtime) => runtime.call(&self.identifier, &arrow_input),
#[cfg(feature = "embedded-deno-udf")]
UdfImpl::Deno(runtime) => {
futures::executor::block_on(runtime.call(&self.identifier, arrow_input))
}
UdfImpl::External(client) => {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if self.always_retry_on_network_error {
Expand Down Expand Up @@ -278,6 +288,57 @@ impl Build for UserDefinedFunction {
)?;
UdfImpl::Python(rt)
}
#[cfg(feature = "embedded-deno-udf")]
"deno" if udf.body.is_some() => {
let mut rt = DenoRuntime::new();
let body = match udf.get_body() {
Ok(body) => body.clone(),
Err(_) => match udf.get_compressed_binary() {
Ok(compressed_binary) => {
let binary = zstd::stream::decode_all(compressed_binary.as_slice())
.context("failed to decompress binary")?;
String::from_utf8(binary).context("failed to decode binary")?
}
Err(_) => {
bail!("UDF body or compressed binary is required for deno UDF");
}
},
};

let is_async = if let (Ok(name), Ok(ty)) = (
udf.get_param_name().map(|s| s.as_str()),
udf.get_param_value().map(|s| s.as_str()),
) {
"function_type" == name && "async" == ty
} else {
false
};

let body = if is_async {
format!(
"export async function {}({}) {{ {} }}",
identifier,
udf.arg_names.join(","),
body
)
} else {
format!(
"export function {}({}) {{ {} }}",
identifier,
udf.arg_names.join(","),
body
)
};

futures::executor::block_on(rt.add_function(
identifier,
arrow_schema::DataType::try_from(&return_type)?,
DenoCallMode::CalledOnNullInput,
&body,
))?;

UdfImpl::Deno(rt)
}
#[cfg(not(madsim))]
_ => {
let link = udf.get_link()?;
Expand Down
82 changes: 82 additions & 0 deletions src/expr/core/src/table_function/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::sync::Arc;
use anyhow::Context;
use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema, SchemaRef};
#[cfg(feature = "embedded-deno-udf")]
use arrow_udf_deno::{CallMode as DenoCallMode, Runtime as DenoRuntime};
use arrow_udf_js::{CallMode as JsCallMode, Runtime as JsRuntime};
#[cfg(feature = "embedded-python-udf")]
use arrow_udf_python::{CallMode as PythonCallMode, Runtime as PythonRuntime};
Expand Down Expand Up @@ -78,6 +80,13 @@ impl UdfImpl {
yield res?;
}
}
#[cfg(feature = "embedded-deno-udf")]
UdfImpl::Deno(runtime) => {
let mut iter = runtime.call_table_function(identifier, input, 1024).await?;
while let Some(res) = iter.next().await {
yield res?;
}
}
UdfImpl::Wasm(runtime) => {
for res in runtime.call_table_function(identifier, &input)? {
yield res?;
Expand Down Expand Up @@ -224,6 +233,79 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result<Bo
)?;
UdfImpl::Python(rt)
}

"deno" => {
let rt = DenoRuntime::new();
let body = match udtf.get_body() {
Ok(body) => body.clone(),
Err(_) => match udtf.get_compressed_binary() {
Ok(compressed_binary) => {
let binary = zstd::stream::decode_all(compressed_binary.as_slice())
.context("failed to decompress binary")?;
String::from_utf8(binary).context("failed to decode binary")?
}
Err(_) => {
bail!("UDF body or compressed binary is required for deno UDF");
}
},
};

let function_type = if let (Ok(f), Ok(function_type)) = (
udtf.get_param_name().map(|s| s.as_str()),
udtf.get_param_value().map(|s| s.as_str()),
) {
if f == "function_type" {
function_type
} else {
"generator"
}
} else {
"generator"
};

let body = match function_type {
"async" => {
format!(
"export async function {}({}) {{ {} }}",
identifier,
udtf.arg_names.join(","),
body
)
}
"async_generator" => {
format!(
"export async function* {}({}) {{ {} }}",
identifier,
udtf.arg_names.join(","),
body
)
}
"generator" => {
format!(
"export function* {}({}) {{ {} }}",
identifier,
udtf.arg_names.join(","),
body
)
}
_ => {
format!(
"export function {}({}) {{ {} }}",
identifier,
udtf.arg_names.join(","),
body
)
}
};

futures::executor::block_on(rt.add_function(
identifier,
arrow_schema::DataType::try_from(&return_type)?,
DenoCallMode::CalledOnNullInput,
&body,
))?;
UdfImpl::Deno(rt)
}
// connect to UDF service
_ => {
let link = udtf.get_link()?;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/function_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct FunctionCatalog {
pub link: Option<String>,
pub compressed_binary: Option<Vec<u8>>,
pub always_retry_on_network_error: bool,
pub param_name: Option<String>,
pub param_value: Option<String>,
}

#[derive(Clone, Display, PartialEq, Eq, Hash, Debug)]
Expand Down Expand Up @@ -72,6 +74,8 @@ impl From<&PbFunction> for FunctionCatalog {
link: prost.link.clone(),
compressed_binary: prost.compressed_binary.clone(),
always_retry_on_network_error: prost.always_retry_on_network_error,
param_name: prost.param_name.clone(),
param_value: prost.param_value.clone(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ impl TableFunction {
identifier: c.identifier.clone(),
body: c.body.clone(),
compressed_binary: c.compressed_binary.clone(),
param_name: c.param_name.clone(),
param_value: c.param_value.clone(),
}),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ impl UserDefinedFunction {
link: udf.link.clone(),
compressed_binary: udf.compressed_binary.clone(),
always_retry_on_network_error: udf.always_retry_on_network_error,
param_name: udf.param_name.clone(),
param_value: udf.param_value.clone(),
};

Ok(Self {
Expand Down Expand Up @@ -96,6 +98,8 @@ impl Expr for UserDefinedFunction {
body: self.catalog.body.clone(),
compressed_binary: self.catalog.compressed_binary.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
param_name: self.catalog.param_name.clone(),
param_value: self.catalog.param_value.clone(),
})),
}
}
Expand Down
Loading

0 comments on commit fe66c2a

Please sign in to comment.