Skip to content

Commit

Permalink
advance a little
Browse files Browse the repository at this point in the history
  • Loading branch information
tessi committed Jun 3, 2023
1 parent f639b48 commit 65b17d1
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 216 deletions.
221 changes: 194 additions & 27 deletions native/wasmex/Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions native/wasmex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ path = "src/lib.rs"
crate-type = ["dylib"]

[dependencies]
rustler = "0.28.0"
# anyhow = "1.0.71"
once_cell = "1.17.1"
rand = "0.8.5"
rustler = "0.28.0"
tokio = { version = "1.28", features = ["full"] }
wasi-common = "9.0.1"
wasmtime = "9.0.1"
wasmtime-wasi = "9.0.1"
wasi-common = "9.0.1"
wiggle = "9.0.1"
wat = "1.0.65"
wiggle = "9.0.1"
3 changes: 2 additions & 1 deletion native/wasmex/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn new(config: ExEngineConfig) -> Result<ResourceArc<EngineResource>, rustle
Ok(resource)
}

#[rustler::nif(name = "engine_precompile_module")]
#[rustler::nif(name = "engine_precompile_module", schedule = "DirtyCpu")]
pub fn precompile_module<'a>(
env: rustler::Env<'a>,
engine_resource: ResourceArc<EngineResource>,
Expand Down Expand Up @@ -68,6 +68,7 @@ pub(crate) fn engine_config(engine_config: ExEngineConfig) -> Config {
config.consume_fuel(engine_config.consume_fuel);
config.wasm_backtrace_details(backtrace_details);
config.cranelift_opt_level(cranelift_opt_level);
config.async_support(true);
config
}

Expand Down
182 changes: 94 additions & 88 deletions native/wasmex/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rustler::{
resource::ResourceArc, types::tuple, Atom, Encoder, Error, ListIterator, MapIterator, OwnedEnv,
Term,
};
use wasmtime::{Caller, Extern, FuncType, Linker, Val, ValType};
use wasmtime::{Extern, FuncType, Linker, Val, ValType};
use wiggle::anyhow::{self, anyhow};

use crate::{
Expand Down Expand Up @@ -103,99 +103,105 @@ fn link_imported_function(

let signature = FuncType::new(params_signature, results_signature.clone());
linker
.func_new(
.func_new_async(
&namespace_name.clone(),
&import_name.clone(),
signature,
move |mut caller: Caller<'_, StoreData>,
params: &[Val],
results: &mut [Val]|
-> Result<(), anyhow::Error> {
let callback_token = ResourceArc::new(CallbackTokenResource {
token: CallbackToken {
continue_signal: Condvar::new(),
return_types: results_signature.clone(),
return_values: Mutex::new(None),
},
});

let memory = match caller.get_export("memory") {
Some(Extern::Memory(mem)) => mem,
_ => return Err(anyhow!("failed to find host memory")),
};

let caller_token = set_caller(caller);

let mut msg_env = OwnedEnv::new();
msg_env.send_and_clear(&pid.clone(), |env| {
let mut callback_params: Vec<Term> = Vec::with_capacity(params.len());
for value in params {
callback_params.push(match value {
Val::I32(i) => i.encode(env),
Val::I64(i) => i.encode(env),
Val::F32(i) => f32::from_bits(*i).encode(env),
Val::F64(i) => f64::from_bits(*i).encode(env),
// encoding V128 is not yet supported by rustler
Val::V128(_) => {
(atoms::error(), "unable_to_convert_v128_type").encode(env)
}
Val::ExternRef(_) => {
(atoms::error(), "unable_to_convert_extern_ref_type").encode(env)
}
Val::FuncRef(_) => {
(atoms::error(), "unable_to_convert_func_ref_type").encode(env)
}
})
}
// Callback context will contain memory (plus maybe globals, tables etc later).
// This will allow Elixir callback to operate on these objects.
let callback_context = Term::map_new(env);

let memory_resource = ResourceArc::new(MemoryResource {
inner: Mutex::new(memory),
move |mut caller, params, results| {
let pid = pid.clone();
let results_signature = results_signature.clone();
let namespace_name = namespace_name.clone();
let import_name = import_name.clone();

Box::new(async move {
let callback_token = ResourceArc::new(CallbackTokenResource {
token: CallbackToken {
continue_signal: Condvar::new(),
return_types: results_signature.clone(),
return_values: Mutex::new(None),
},
});
let callback_context = Term::map_put(
callback_context,
atoms::memory().encode(env),
memory_resource.encode(env),
)
.unwrap();

let caller_resource = ResourceArc::new(StoreOrCallerResource {
inner: Mutex::new(StoreOrCaller::Caller(caller_token)),


let memory = match caller.get_export("memory") {
Some(Extern::Memory(mem)) => mem,
_ => return Err(anyhow!("failed to find host memory")),
};

let caller_token = set_caller(caller);

let mut msg_env = OwnedEnv::new();
msg_env.send_and_clear(&pid, |env| {
let mut callback_params: Vec<Term> = Vec::with_capacity(params.len());
for value in params {
callback_params.push(match value {
Val::I32(i) => i.encode(env),
Val::I64(i) => i.encode(env),
Val::F32(i) => f32::from_bits(*i).encode(env),
Val::F64(i) => f64::from_bits(*i).encode(env),
// encoding V128 is not yet supported by rustler
Val::V128(_) => {
(atoms::error(), "unable_to_convert_v128_type").encode(env)
}
Val::ExternRef(_) => {
(atoms::error(), "unable_to_convert_extern_ref_type")
.encode(env)
}
Val::FuncRef(_) => {
(atoms::error(), "unable_to_convert_func_ref_type").encode(env)
}
})
}
// Callback context will contain memory (plus maybe globals, tables etc later).
// This will allow Elixir callback to operate on these objects.
let callback_context = Term::map_new(env);

let memory_resource = ResourceArc::new(MemoryResource {
inner: Mutex::new(memory),
});
let callback_context = Term::map_put(
callback_context,
atoms::memory().encode(env),
memory_resource.encode(env),
)
.unwrap();

let caller_resource = ResourceArc::new(StoreOrCallerResource {
inner: Mutex::new(StoreOrCaller::Caller(caller_token)),
});

let callback_context = Term::map_put(
callback_context,
atoms::caller().encode(env),
caller_resource.encode(env),
)
.unwrap();
(
atoms::invoke_callback(),
namespace_name.clone(),
import_name.clone(),
callback_context,
callback_params,
callback_token.clone(),
)
.encode(env)
});

let callback_context = Term::map_put(
callback_context,
atoms::caller().encode(env),
caller_resource.encode(env),
)
.unwrap();
(
atoms::invoke_callback(),
namespace_name.clone(),
import_name.clone(),
callback_context,
callback_params,
callback_token.clone(),
)
.encode(env)
});

// Wait for the thread to start up - `receive_callback_result` is responsible for that.
let mut result = callback_token.token.return_values.lock().unwrap();
while result.is_none() {
result = callback_token.token.continue_signal.wait(result).unwrap();
}
remove_caller(caller_token);

let result: &(bool, Vec<WasmValue>) = result
.as_ref()
.expect("expect callback token to contain a result");
match result {
(true, return_values) => write_results(results, return_values),
(false, _) => Err(anyhow!("the elixir callback threw an exception")),
}
// Wait for the thread to start up - `receive_callback_result` is responsible for that.
let mut result = callback_token.token.return_values.lock().unwrap();
while result.is_none() {
result = callback_token.token.continue_signal.wait(result).unwrap();
}
remove_caller(caller_token);

let result: &(bool, Vec<WasmValue>) = result
.as_ref()
.expect("expect callback token to contain a result");
match result {
(true, return_values) => write_results(results, return_values),
(false, _) => Err(anyhow!("the elixir callback threw an exception")),
}
})
},
)
.map_err(|err| Error::Term(Box::new(err.to_string())))?;
Expand Down
89 changes: 63 additions & 26 deletions native/wasmex/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use rustler::{
types::ListIterator,
Encoder, Env as RustlerEnv, Error, MapIterator, NifResult, Term,
};
use std::ops::Deref;
use std::sync::Mutex;
use std::thread;

use wasmtime::{Instance, Linker, Module, Val, ValType};

Expand All @@ -18,6 +18,7 @@ use crate::{
module::ModuleResource,
printable_term_type::PrintableTermType,
store::{StoreData, StoreOrCaller, StoreOrCallerResource},
task
};

pub struct InstanceResource {
Expand All @@ -31,35 +32,71 @@ pub struct InstanceResource {
// * module (ModuleResource): the compiled Wasm module
// * imports (map): a map defining eventual instance imports, may be empty if there are none.
// structure: %{namespace_name: %{import_name: {:fn, param_types, result_types, captured_function}}}
#[rustler::nif(name = "instance_new")]
pub fn new(
env: rustler::Env,
store_or_caller_resource: ResourceArc<StoreOrCallerResource>,
module_resource: ResourceArc<ModuleResource>,
imports: MapIterator,
) -> Result<ResourceArc<InstanceResource>, rustler::Error> {
let module = module_resource.inner.lock().map_err(|e| {
rustler::Error::Term(Box::new(format!(
"Could not unlock module resource as the mutex was poisoned: {e}"
)))
})?;
let store_or_caller: &mut StoreOrCaller =
&mut *(store_or_caller_resource.inner.lock().map_err(|e| {
rustler::Error::Term(Box::new(format!(
"Could not unlock store_or_caller resource as the mutex was poisoned: {e}"
)))
})?);
) -> Result<(), rustler::Error> {
// TODO: pass pid as parameter instead of hardcoding it
let pid = env.pid();
// create erlang environment for the thread
let mut thread_env = OwnedEnv::new();

task::spawn(async move {
let module = module_resource.deref().inner.lock().map_err(|e| {
let message = Box::new(format!(
"Could not unlock module resource as the mutex was poisoned: {e}"
));
(atoms::error(), message.encode(env)).encode(env)
});
let store_or_caller: &mut StoreOrCaller =
&mut *(store_or_caller_resource.deref().inner.lock().map_err(|e| {
rustler::Error::Term(Box::new(format!(
"Could not unlock store_or_caller resource as the mutex was poisoned: {e}"
)))
})?);

let result = match link_and_create_instance(store_or_caller, &module, imports).await {
Ok(instance) => {
let resource = ResourceArc::new(InstanceResource {
inner: Mutex::new(instance),
});
make_tuple(
env,
&[
atoms::ok().encode(env),
resource.encode(env),
],
)
},
Err(_) => todo!(),
};

thread_env.send_and_clear(&pid, |thread_env| {

// TODO: pass in forward_term as param
let forward_term = atoms::returned_function_call().encode(thread_env);

make_tuple(
thread_env,
&[
// TODO: use a custom atom
atoms::returned_function_call().encode(thread_env),
result,
forward_term,
],
)
});

let instance = link_and_create_instance(store_or_caller, &module, imports)?;
let resource = ResourceArc::new(InstanceResource {
inner: Mutex::new(instance),
});
Ok(resource)
Ok(())
}

fn link_and_create_instance(
async fn link_and_create_instance(
store_or_caller: &mut StoreOrCaller,
module: &Module,
imports: MapIterator,
imports: MapIterator<'_>,
) -> Result<Instance, Error> {
let mut linker = Linker::new(store_or_caller.engine());
if let Some(_wasi_ctx) = &store_or_caller.data().wasi {
Expand All @@ -69,8 +106,8 @@ fn link_and_create_instance(
}
link_imports(&mut linker, imports)?;
linker
.instantiate(store_or_caller, module)
.map_err(|err| Error::Term(Box::new(err.to_string())))
.instantiate_async(store_or_caller, module)
.await.map_err(|err| Error::Term(Box::new(err.to_string())))
}

#[rustler::nif(name = "instance_function_export_exists")]
Expand All @@ -95,7 +132,7 @@ pub fn function_export_exists(
Ok(result)
}

#[rustler::nif(name = "instance_call_exported_function", schedule = "DirtyCpu")]
#[rustler::nif(name = "instance_call_exported_function")]
pub fn call_exported_function(
env: rustler::Env,
store_or_caller_resource: ResourceArc<StoreOrCallerResource>,
Expand All @@ -111,7 +148,7 @@ pub fn call_exported_function(
let function_params = thread_env.save(params);
let from = thread_env.save(from);

thread::spawn(move || {
task::spawn(async move {
thread_env.send_and_clear(&pid, |thread_env| {
execute_function(
thread_env,
Expand Down Expand Up @@ -143,8 +180,8 @@ fn execute_function(
Ok(vec) => vec,
Err(_) => return make_error_tuple(&thread_env, "could not load 'function params'", from),
};
let instance: Instance = *(instance_resource.inner.lock().unwrap());
let mut store_or_caller = store_or_caller_resource.inner.lock().unwrap();
let instance: Instance = *(instance_resource.deref().inner.lock().unwrap());
let mut store_or_caller = store_or_caller_resource.deref().inner.lock().unwrap();
let function_result = functions::find(&instance, &mut store_or_caller, &function_name);
let function = match function_result {
Some(func) => func,
Expand Down
Loading

0 comments on commit 65b17d1

Please sign in to comment.