Skip to content

Commit

Permalink
Wasm udf now work again, with the latest changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tachyonicbytes committed Oct 6, 2023
1 parent 9817235 commit f08b891
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dozer-sql/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ndarray = { version = "0.15", optional = true }
half = { version = "2.3.1", optional = true }
like = "0.3.1"
jsonpath = { path = "../jsonpath" }
wasmtime = { version = "9.0.4", optional = true }

[dev-dependencies]
proptest = "1.2.0"
Expand All @@ -22,3 +23,4 @@ proptest = "1.2.0"
bigdecimal = ["dep:bigdecimal", "sqlparser/bigdecimal"]
python = ["dozer-types/python-auto-initialize"]
onnx = ["dep:ort", "dep:ndarray", "dep:half"]
wasm = ["wasmtime"]
13 changes: 6 additions & 7 deletions dozer-sql/expression/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,26 +887,25 @@ impl ExpressionBuilder {
function: &Function,
schema: &Schema,
udfs: &[UdfConfig],
) -> Result<Expression, PipelineError> {
) -> Result<Expression, Error> {
// First, get the wasm function defined by name.
// Then, transfer the wasm function to Expression::WasmUDF
use dozer_types::types::FieldType;
use PipelineError::InvalidQuery;
use crate::wasm_udf::WasmError::MissingReturnType;


let args = function
.args
.iter()
.map(|argument| self.parse_sql_function_arg(false, argument, schema, udfs))
.collect::<Result<Vec<_>, PipelineError>>()?;
.collect::<Result<Vec<_>, Error>>()?;

let return_type = {
let ident = function
.return_type
.as_ref()
.ok_or_else(|| InvalidQuery("Wasm UDF must have a return type. The syntax is: function_name<return_type>(arguments)".to_string()))?;
.ok_or_else(|| MissingReturnType).unwrap();

FieldType::try_from(ident.value.as_str())
.map_err(|e| InvalidQuery(format!("Failed to parse Wasm UDF return type: {e}")))?
FieldType::try_from(ident.value.as_str()).unwrap()
};

Ok(Expression::WasmUDF {
Expand Down
6 changes: 3 additions & 3 deletions dozer-sql/expression/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::Range;

use dozer_types::{
thiserror::{self, Error},
types::{Field, FieldType}, models::udf_config::UdfType,
types::{Field, FieldType},
};
use sqlparser::ast::{
BinaryOperator, DataType, DateTimeField, Expr, FunctionArg, Ident, UnaryOperator,
Expand Down Expand Up @@ -102,9 +102,9 @@ pub enum Error {

#[cfg(feature = "wasm")]
#[error("WASM UDF error: {0}")]
Wasm(#[from] crate::wasm_udf::Error),
Wasm(#[from] crate::wasm_udf::WasmError),
#[cfg(not(feature = "wasm"))]
#[error("WASM UDF is not enabled")]
#[error("WASM UDF is not enabled here")]
WasmNotEnabled,

#[error("Unsupported UDF type")]
Expand Down
2 changes: 1 addition & 1 deletion dozer-sql/expression/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl Expression {
return_type,
..
} => {
use crate::pipeline::expression::wasm_udf::evaluate_wasm_udf;
use crate::wasm_udf::evaluate_wasm_udf;
evaluate_wasm_udf(schema, name, module, args, return_type, record)
}
Expression::UnaryOperator { operator, arg } => operator.evaluate(schema, arg, record),
Expand Down
2 changes: 2 additions & 0 deletions dozer-sql/expression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub mod scalar;
mod onnx;
#[cfg(feature = "python")]
mod python_udf;
#[cfg(feature = "wasm")]
mod wasm_udf;

pub use num_traits;
pub use sqlparser;
Expand Down
32 changes: 16 additions & 16 deletions dozer-sql/expression/src/wasm_udf.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use crate::pipeline::errors::PipelineError;
use crate::pipeline::errors::PipelineError::UnsupportedSqlError;
use crate::pipeline::errors::UnsupportedSqlError::GenericError;
use crate::pipeline::expression::execution::Expression;

use dozer_types::ordered_float::OrderedFloat;
use dozer_types::types::{Field, FieldType, Record, Schema};
use dozer_types::thiserror::{self, Error};
use crate::error::Error;

use wasmtime::*;

pub enum Error {
use crate::execution::Expression;

#[derive(Debug, Error)]
pub enum WasmError {
#[error(
"Wasm UDF must have a return type. The syntax is: function_name<return_type>(arguments)"
)]
MissingReturnType,
#[error("wasmtime error: {0}")]
WasmTime(#[from] wasmtime::Error),
#[error("Type is not yet supported in Wasm")]
WasmUnsupportedType(String)
}


Expand All @@ -25,13 +27,13 @@ pub fn evaluate_wasm_udf(
args: &[Expression],
return_type: &FieldType,
record: &Record,
) -> Result<Field, PipelineError> {
let values = args
) -> Result<Field, Error> {
let input_values = args
.iter()
.map(|arg| arg.evaluate(record, schema))
.collect::<Result<Vec<_>, PipelineError>>()?;
.collect::<Result<Vec<_>, Error>>()?;

let values2: Vec<Val> = values
let values: Vec<Val> = input_values
.iter()
.map(|field| match field {
Field::Int(value) => Val::I64(*value),
Expand All @@ -42,17 +44,17 @@ pub fn evaluate_wasm_udf(

let engine = Engine::default();

let module = Module::from_file(&engine, module)?;
let module = Module::from_file(&engine, module).unwrap();
let mut store = Store::new(&engine, 4);
let instance = Instance::new(&mut store, &module, &[])?;
let instance = Instance::new(&mut store, &module, &[]).unwrap();

let wasm_udf_func = instance
.get_func(&mut store, name)
.expect("export wasn't a function");
let mut results: [Val; 1] = [Val::I64(0)];

// match wasm_udf_func.call(&mut store, &[Val::I64(9)], &mut results) {
match wasm_udf_func.call(&mut store, &values2, &mut results) {
match wasm_udf_func.call(&mut store, &values, &mut results) {
Ok(()) => {}
Err(trap) => {
panic!(
Expand All @@ -78,9 +80,7 @@ pub fn evaluate_wasm_udf(
| FieldType::Point
| FieldType::Duration
| FieldType::Json => {
return Err(UnsupportedSqlError(GenericError(
"Unsupported return type for wasm udf".to_string(),
)))
return Err(Error::UnsupportedUdfType)
}
})
}
27 changes: 12 additions & 15 deletions dozer-sql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use std::fmt::{Display, Formatter};

use super::utils::serialize::DeserializationError;

#[cfg(feature = "wasm")]
use wasmtime;

#[derive(Debug, Clone)]
pub struct FieldTypes {
types: Vec<FieldType>,
Expand Down Expand Up @@ -57,12 +54,12 @@ pub enum PipelineError {
#[error("Duplicate INTO table name found: {0:?}")]
DuplicateIntoClause(String),

#[cfg(feature = "wasm")]
#[error("Wasm Error: {0}")]
WasmErr(wasmtime::Error),
#[cfg(not(feature = "wasm"))]
#[error("Wasm UDF feature is not enabled")]
WasmNotEnabled,
// #[cfg(feature = "wasm")]
// #[error("Wasm Error: {0}")]
// WasmErr(wasmtime::Error),
// #[cfg(not(feature = "wasm"))]
// #[error("Wasm UDF feature is not enabled")]
// WasmNotEnabled,

// Error forwarding
#[error("Internal type error: {0}")]
Expand Down Expand Up @@ -141,12 +138,12 @@ pub enum PipelineError {
ProcessorAlreadyExists(String),
}

#[cfg(feature = "wasm")]
impl From<wasmtime::Error> for PipelineError {
fn from(wasm_err: wasmtime::Error) -> Self {
PipelineError::WasmErr(wasm_err)
}
}
// #[cfg(feature = "wasm")]
// impl From<wasmtime::Error> for PipelineError {
// fn from(wasm_err: wasmtime::Error) -> Self {
// PipelineError::WasmErr(wasm_err)
// }
// }

#[derive(Error, Debug)]
pub enum UnsupportedSqlError {
Expand Down

0 comments on commit f08b891

Please sign in to comment.