Skip to content

Commit

Permalink
add tinygo example & turn on wasi support
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jul 13, 2023
1 parent 3ccaf0e commit 237d240
Show file tree
Hide file tree
Showing 13 changed files with 688 additions and 51 deletions.
355 changes: 345 additions & 10 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion src/expr/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ impl UdfExpression {
arrow_array::RecordBatch::try_new_with_options(self.arg_schema.clone(), columns, &opts)
.expect("failed to build record batch");
let output: arrow_array::RecordBatch = match &self.imp {
UdfImpl::Wasm { component } => component.eval(input)?,
UdfImpl::Wasm { component } => {
component
.eval(input)
.instrument_await(self.span.clone())
.await?
}
UdfImpl::External { client, identifier } => {
client
.call(identifier, input)
Expand Down
1 change: 1 addition & 0 deletions src/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
tonic = { version = "0.2", package = "madsim-tonic" }
tracing = "0.1"
wasmtime = { version = "10", features = ["component-model"] }
wasmtime-wasi = { version = "10" }
67 changes: 54 additions & 13 deletions src/udf/src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

#![expect(dead_code)]

use std::sync::{Arc, Mutex};
use std::sync::Arc;

use bytes::Bytes;
use itertools::Itertools;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{parse_remote_object_store, ObjectStoreImpl};
use tokio::sync::Mutex;
use tracing::debug;
use wasmtime::component::{Component, Linker};
use wasmtime::{Config, Store};
Expand All @@ -28,7 +29,8 @@ pub mod component {
mod bindgen {
wasmtime::component::bindgen!({
world: "udf",
path: "wit/udf.wit"
path: "wit/udf.wit",
async: true // required for wasi
});
}
pub use bindgen::{EvalErrno, RecordBatch as WasmRecordBatch, Schema, Udf};
Expand All @@ -37,7 +39,28 @@ pub mod component {
/// Host state
///
/// Currently this is only a placeholder. No states.
struct WasmState {}
struct WasmState {
wasi_ctx: wasmtime_wasi::preview2::WasiCtx,
table: wasmtime_wasi::preview2::Table,
}

impl wasmtime_wasi::preview2::WasiView for WasmState {
fn table(&self) -> &wasmtime_wasi::preview2::Table {
&self.table
}

fn table_mut(&mut self) -> &mut wasmtime_wasi::preview2::Table {
&mut self.table
}

fn ctx(&self) -> &wasmtime_wasi::preview2::WasiCtx {
&self.wasi_ctx
}

fn ctx_mut(&mut self) -> &mut wasmtime_wasi::preview2::WasiCtx {
&mut self.wasi_ctx
}
}

type ArrowResult<T> = std::result::Result<T, arrow_schema::ArrowError>;
type WasmtimeResult<T> = std::result::Result<T, wasmtime::Error>;
Expand Down Expand Up @@ -78,14 +101,19 @@ mod convert {
}

impl InstantiatedComponent {
pub fn eval(&self, input: arrow_array::RecordBatch) -> WasmUdfResult<arrow_array::RecordBatch> {
pub async fn eval(
&self,
input: arrow_array::RecordBatch,
) -> WasmUdfResult<arrow_array::RecordBatch> {
// let input_schema = self.bindings.call_input_schema(&mut self.store)?;
// let output_schema = self.bindings.call_output_schema(&mut self.store)?;

let input = to_wasm_batch(input)?;
// TODO: Use tokio Mutex to use it across the await here. Does it make sense?
let result = self
.bindings
.call_eval(&mut *self.store.lock().unwrap(), &input)??;
.call_eval(&mut *self.store.lock().await, &input)
.await??;
let result = from_wasm_batch(&result)?;
let Some((record_batch,))= result.collect_tuple() else {
return Err(WasmUdfError::Encoding("should return only one record batch in IPC buffer".to_string()));
Expand All @@ -108,7 +136,8 @@ impl WasmEngine {
// Is this expensive?
let mut config = Config::new();
config.wasm_component_model(true);
config.async_support(false);
// required for wasi
config.async_support(true);

Self {
engine: wasmtime::Engine::new(&config).expect("failed to create wasm engine"),
Expand Down Expand Up @@ -150,17 +179,23 @@ impl WasmEngine {
);

// check the component can be instantiated
let linker = Linker::new(&self.engine);
let mut linker = Linker::new(&self.engine);
// A Store is intended to be a short-lived object in a program. No form of GC is
// implemented at this time so once an instance is created within a Store it will not be
// deallocated until the Store itself is dropped. This makes Store unsuitable for
// creating an unbounded number of instances in it because Store will never release this
// memory. It's recommended to have a Store correspond roughly to the lifetime of a
// "main instance" that an embedding is interested in executing.

// So this is cheap?
let mut store = Store::new(&self.engine, WasmState {});
let (_bindings, _instance) = component::Udf::instantiate(&mut store, &component, &linker)?;
// So creating a Store is cheap?
let mut table = wasmtime_wasi::preview2::Table::new();
let wasi_ctx = wasmtime_wasi::preview2::WasiCtxBuilder::new()
.inherit_stdio() // this is needed for println to work
.build(&mut table)?;
let mut store = Store::new(&self.engine, WasmState { table, wasi_ctx });
wasmtime_wasi::preview2::wasi::command::add_to_linker(&mut linker)?;
let (_bindings, _instance) =
component::Udf::instantiate_async(&mut store, &component, &linker).await?;

object_store
.upload(&compiled_path(identifier), serialized.into())
Expand Down Expand Up @@ -189,9 +224,15 @@ impl WasmEngine {
Component::deserialize(&self.engine, serialized_component)?
};

let linker = Linker::new(&self.engine);
let mut store = Store::new(&self.engine, WasmState {});
let (bindings, instance) = component::Udf::instantiate(&mut store, &component, &linker)?;
let mut linker = Linker::new(&self.engine);
let mut table = wasmtime_wasi::preview2::Table::new();
let wasi_ctx = wasmtime_wasi::preview2::WasiCtxBuilder::new()
.inherit_stdio() // this is needed for println to work
.build(&mut table)?;
let mut store = Store::new(&self.engine, WasmState { table, wasi_ctx });
wasmtime_wasi::preview2::wasi::command::add_to_linker(&mut linker)?;
let (bindings, instance) =
component::Udf::instantiate_async(&mut store, &component, &linker).await?;

Ok(InstantiatedComponent {
store: Arc::new(Mutex::new(store)),
Expand Down
3 changes: 2 additions & 1 deletion src/udf/wit_example/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.sql
*.wasm
Cargo.lock
Cargo.lock
tinygo/gen
54 changes: 54 additions & 0 deletions src/udf/wit_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# WASM UDF examples

TODO:
- [ ] error handing
- [ ] schema validation

## Required tools

- [wasm-tools](https://github.com/bytecodealliance/wasm-tools): to create WASM component from WASM module.
- [wit-bindgen](https://github.com/bytecodealliance/wit-bindgen) CLI: to generate guest code from WIT file. (Not required for Rust guest)

```
cargo install [email protected]
cargo install [email protected]
```

> **Note**
>
> WASM component model IS NOT stable and may change. Please use the version specified above.
## Examples for different guest languages

Refer to each language's directory for an example. Some additional notes are listed below.
Generally you will just need to copy the `wit` directory and follow the examples.

It's not guaranteed to work if you used different versions of toolchains and project dependencies.

### Rust

nothing special

### Golang

#### TinyGo

[TinyGo](https://tinygo.org/getting-started/install/) is an alternative Go compiler for small devices. It also supports WASM.

tested under
```
> tinygo version
tinygo version 0.28.1 darwin/amd64 (using go version go1.20.2 and LLVM version 15.0.0)
```

- TinyGo cannot compile the lz4 package ([Add support for reading Go assembly files by aykevl · Pull Request #3103 · tinygo-org/tinygo](https://github.com/tinygo-org/tinygo/pull/3103)), which is used by Arrow. Can workaround by using the forked version of arrow, which removed lz4.

```
replace github.com/apache/arrow/go/v13 => github.com/xxchan/arrow/go/v13 v13.0.0-20230713134335-45002b4934f9
```

#### TODO: Go 1.21

(requires full wasi_snapshot_preview1)
- [Go 1.21 Release Notes - The Go Programming Language](https://tip.golang.org/doc/go1.21)
- [all: add GOOS=wasip1 GOARCH=wasm port · Issue #58141 · golang/go](https://github.com/golang/go/issues/58141)
100 changes: 79 additions & 21 deletions src/udf/wit_example/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,94 @@

set -e

path=$(dirname "$0")
cd "$path"
# usage: ./build.sh --lang [rust|go] [--rebuild]

rustup target add wasm32-unknown-unknown
while [[ $# -gt 0 ]]; do
key="$1"

profile=release
if [ "$profile" == "dev" ]; then
target_dir=debug
else
target_dir=$profile
fi
case $key in
--lang)
lang="$2"
shift # past argument
shift # past value
;;
--rebuild)
rebuild="true"
shift # past argument
;;
*) # unknown option
shift # past argument
;;
esac
done

cargo build --target=wasm32-unknown-unknown --profile "${profile}"
if [ -z "$lang" ]; then
echo "Please specify --lang [rust|go]"
exit 1
fi

if [ "$(wasm-tools -V)" != "wasm-tools 1.0.35" ]; then
echo "wasm-tools 1.0.35 is required"
exit 1
fi

wasm-tools component new ./target/wasm32-unknown-unknown/"${target_dir}"/wit_example.wasm \
-o wit_example.wasm
wasm-tools validate wit_example.wasm --features component-model
path=$(dirname "$0")

function build_rust() {
echo "--- Build Rust guest component"

cd "$path/rust"

rustup target add wasm32-unknown-unknown

profile=release
if [ "$profile" == "dev" ]; then
target_dir=debug
else
target_dir=$profile
fi

# WASI
cargo build --target=wasm32-unknown-unknown --profile "${profile}"
mv ./target/wasm32-unknown-unknown/"${target_dir}"/my_udf.wasm ../my_udf.rust.wasm

# # if file not found, download from
# if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then
# wget https://github.com/bytecodealliance/wasmtime/releases/download/v10.0.1/wasi_snapshot_preview1.reactor.wasm
# fi
# wasm-tools component new ./target/wasm/wasm32-wasi/debug/wasm_component.wasm \
# -o wasm_component_wasi.wasm \
# --adapt wasi_snapshot_preview1=./wasi_snapshot_preview1.reactor.wasm
cd ..
}

function build_go() {
echo "--- Build TinyGo guest component"
# Note: TinyGo will rebuild the whole binary every time and it's slow.

cd "$path/tinygo"
go generate # generate bindings for Go
tinygo build -target=wasi -o my_udf.go.wasm my_udf.go
wasm-tools component embed ../../wit my_udf.go.wasm -o my_udf.go.wasm

mv ./my_udf.go.wasm ..
cd ..
}

# if the file "my_udf.$lang.wasm" does not exist, or --rebuild is specified, rebuild it
if [ ! -f "my_udf.$lang.wasm" ] || [ "$rebuild" == "true" ]; then
if [ "$lang" == "rust" ]; then
build_rust
elif [ "$lang" == "go" ]; then
build_go
else
echo "Unknown language: $lang"
exit 1
fi
else
echo "my_udf.$lang.wasm exists, skip building"
fi



# (WASI adaptor) if file not found, download from
if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then
wget https://github.com/bytecodealliance/wasmtime/releases/download/v10.0.1/wasi_snapshot_preview1.reactor.wasm
fi

echo wasm-tools component new "my_udf.$lang.wasm" -o my_udf.component.wasm
wasm-tools component new "my_udf.$lang.wasm" -o my_udf.component.wasm \
--adapt wasi_snapshot_preview1=./wasi_snapshot_preview1.reactor.wasm
wasm-tools validate my_udf.component.wasm --features component-model
10 changes: 7 additions & 3 deletions src/udf/wit_example/create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ set -e
path=$(dirname "$0")
cd "$path"

./build.sh
if [ ! -f "./my_udf.component.wasm" ]; then
echo "my_udf.component.wasm not found, please run ./build.sh first"
exit 1
fi

echo "size of wasm: $(stat -f "%z" wit_example.wasm) bytes"
encoded=$(base64 -i wit_example.wasm)
echo "size of wasm: $(stat -f "%z" my_udf.component.wasm) bytes"
encoded=$(base64 -i my_udf.component.wasm)
echo "size of encoded wasm: ${#encoded} bytes"
# debug: 23557258
# release: 12457072

psql -h localhost -p 4566 -d dev -U root -c "DROP FUNCTION IF EXISTS is_positive;"
sql="CREATE FUNCTION is_positive (x bigint) RETURNS BOOL LANGUAGE wasm_v1 USING BASE64 '$encoded';"
echo "$sql" > create.sql
psql -h localhost -p 4566 -d dev -U root -f ./create.sql
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "wit_example"
name = "my_udf"
version = "0.1.0"
edition = "2021"
homepage = "https://github.com/risingwavelabs/risingwave"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ wit_bindgen::generate!({
// optional, since there's only one world. We make it explicit here.
world: "udf",
// path is relative to Cargo.toml
path:"../wit"
path:"../../wit"
});

// Define a custom type and implement the generated `Udf` trait for it which
Expand Down
19 changes: 19 additions & 0 deletions src/udf/wit_example/tinygo/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module github.com/my_account/my_udf

go 1.20

require github.com/apache/arrow/go/v13 v13.0.0-20230712165359-085a0baf7868

require (
github.com/goccy/go-json v0.10.0 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
)

replace github.com/apache/arrow/go/v13 => github.com/xxchan/arrow/go/v13 v13.0.0-20230713134335-45002b4934f9
Loading

0 comments on commit 237d240

Please sign in to comment.