Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): support inlined Rust UDF #14903

Merged
merged 4 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 91 additions & 88 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "7ba1c22" }
arrow-udf-wasm = "0.1"
arrow-udf-js = "0.1"
arrow-udf-wasm = { version = "0.1.2", features = ["build"] }
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
Expand Down Expand Up @@ -282,8 +282,6 @@ tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev =
futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev = "05b33b4" }
# patch: unlimit 4MB message size for grpc client
etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" }
# need binding on aarch64-unknown-linux-gnu, waiting for new release
rquickjs-sys = { git = "https://github.com/DelSkayn/rquickjs.git", rev = "60696e8" }

[workspace.metadata.dylint]
libraries = [{ path = "./lints" }]
1 change: 1 addition & 0 deletions ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ RUN curl -sSL https://install.python-poetry.org | python3 -

# add required rustup components
RUN rustup component add rustfmt llvm-tools-preview clippy
RUN rustup target add wasm32-wasi

ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse

Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240124_1
export BUILD_ENV_VERSION=v20240204

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
- mysql
- db
Expand All @@ -81,7 +81,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
- mysql
- db
Expand All @@ -93,12 +93,12 @@ services:
- ..:/risingwave

rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -109,7 +109,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
db:
condition: service_healthy
Expand Down
1 change: 0 additions & 1 deletion ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ source ci/scripts/common.sh

echo "--- Build Rust UDF"
cd e2e_test/udf/wasm
rustup target add wasm32-wasi
cargo build --release
cd ../../..

Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ COPY rust-toolchain rust-toolchain
RUN rustup self update \
&& rustup set profile minimal \
&& rustup show \
&& rustup component add rustfmt
&& rustup component add rustfmt \
&& rustup target add wasm32-wasi

RUN cargo install flamegraph
# TODO: cargo-chef doesn't work well now, because we update Cargo.lock very often.
Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ RUN unzip dashboard-artifact.zip && mv risingwave-dashboard-artifact /risingwave
RUN rustup self update \
&& rustup set profile minimal \
&& rustup show \
&& rustup component add rustfmt
&& rustup component add rustfmt \
&& rustup target add wasm32-wasi

RUN cargo fetch

Expand Down
21 changes: 21 additions & 0 deletions e2e_test/udf/wasm_udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,24 @@ drop function jsonb_access;

statement ok
drop function series;

# inlined rust function
statement ok
create function gcd(int, int) returns int language rust as $$
fn gcd(mut a: i32, mut b: i32) -> i32 {
while b != 0 {
let t = b;
b = a % b;
a = t;
}
a
}
$$;

query I
select gcd(25, 15);
----
5

statement ok
drop function gcd;
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Build for UserDefinedFunction {

let identifier = udf.get_identifier()?;
let imp = match udf.language.as_str() {
"wasm" => {
"wasm" | "rust" => {
let link = udf.get_link()?;
// Use `block_in_place` as an escape hatch to run async code here in sync context.
// Calling `block_on` directly will panic.
Expand Down
70 changes: 54 additions & 16 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use risingwave_expr::expr::get_or_create_wasm_runtime;
use risingwave_object_store::object::{build_remote_object_store, ObjectStoreConfig};
use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction};
use risingwave_pb::catalog::Function;
use risingwave_sqlparser::ast::{
CreateFunctionBody, FunctionDefinition, ObjectName, OperateFunctionArg,
};
use risingwave_sqlparser::ast::{CreateFunctionBody, ObjectName, OperateFunctionArg};
use risingwave_storage::monitor::ObjectStoreMetrics;
use risingwave_udf::ArrowFlightUdfClient;

Expand Down Expand Up @@ -54,7 +52,7 @@ pub async fn handle_create_function(
Some(lang) => {
let lang = lang.real_value().to_lowercase();
match &*lang {
"python" | "java" | "wasm" | "javascript" => lang,
"python" | "java" | "wasm" | "rust" | "javascript" => lang,
_ => {
return Err(ErrorCode::InvalidParameterValue(format!(
"language {} is not supported",
Expand Down Expand Up @@ -134,12 +132,12 @@ pub async fn handle_create_function(
)
.into());
};
let Some(FunctionDefinition::SingleQuotedDef(id)) = params.as_ else {
let Some(as_) = params.as_ else {
return Err(
ErrorCode::InvalidParameterValue("AS must be specified".to_string()).into(),
);
};
identifier = id;
identifier = as_.into_string();

// check UDF server
{
Expand Down Expand Up @@ -173,16 +171,56 @@ pub async fn handle_create_function(
}
"javascript" => {
identifier = function_name.to_string();
body = Some(match params.as_ {
Some(FunctionDefinition::SingleQuotedDef(s)) => s,
Some(FunctionDefinition::DoubleDollarDef(s)) => s,
_ => {
return Err(ErrorCode::InvalidParameterValue(
"AS must be specified".to_string(),
)
.into())
}
});
body = Some(
params
.as_
.ok_or_else(|| ErrorCode::InvalidParameterValue("AS must be specified".into()))?
.into_string(),
);
}
"rust" => {
identifier = wasm_identifier(
&function_name,
&arg_types,
&return_type,
matches!(kind, Kind::Table(_)),
);
if params.using.is_some() {
return Err(ErrorCode::InvalidParameterValue(
"USING is not supported for rust function".to_string(),
)
.into());
}
let function_body = params
.as_
.ok_or_else(|| ErrorCode::InvalidParameterValue("AS must be specified".into()))?
.into_string();
let script = format!("#[arrow_udf::function(\"{identifier}\")]\n{function_body}");
body = Some(function_body.clone());

let wasm_binary =
tokio::task::spawn_blocking(move || arrow_udf_wasm::build::build("", &script))
.await?
.context("failed to build rust function")?;

// below is the same as `wasm` language
let runtime = arrow_udf_wasm::Runtime::new(&wasm_binary)?;
check_wasm_function(&runtime, &identifier)?;

let system_params = session.env().meta_client().get_system_params().await?;
let object_name = format!("{:?}.wasm", md5::compute(&wasm_binary));
upload_wasm_binary(
system_params.wasm_storage_url(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still going to deprecate the external storage for WASM binaries? I'm not sure if a Rust WASM binary fits the metadata store. 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "yes"? 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will soon move WASM binaries from object store to meta store. I think meta store is capable given that we will move to SQL backend soon and binaries usually won't be larger than 10MB. 😄

&object_name,
wasm_binary.into(),
)
.await?;

link = Some(format!(
"{}/{}",
system_params.wasm_storage_url(),
object_name
));
}
"wasm" => {
identifier = wasm_identifier(
Expand Down
18 changes: 18 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2642,6 +2642,24 @@ impl fmt::Display for FunctionDefinition {
}
}

impl FunctionDefinition {
/// Returns the function definition as a string slice.
pub fn as_str(&self) -> &str {
match self {
FunctionDefinition::SingleQuotedDef(s) => s,
FunctionDefinition::DoubleDollarDef(s) => s,
}
}

/// Returns the function definition as a string.
pub fn into_string(self) -> String {
match self {
FunctionDefinition::SingleQuotedDef(s) => s,
FunctionDefinition::DoubleDollarDef(s) => s,
}
}
}

/// Return types of a function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
Loading