Skip to content

Commit

Permalink
Ok all working again
Browse files Browse the repository at this point in the history
  • Loading branch information
alshdavid committed Jun 25, 2024
1 parent 5505668 commit 5b64045
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 59 deletions.
2 changes: 1 addition & 1 deletion crates/neon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ features = ["sync"]
optional = true

[features]
default = ["napi-8", "futures"]
default = ["napi-8"]

# Enable extracting values by serializing to JSON
serde = ["dep:serde", "dep:serde_json"]
Expand Down
17 changes: 14 additions & 3 deletions crates/neon/src/async_local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@ pub fn spawn_async_local<'a>(
return Ok(());
}

// The futures executor runs on another thread and will
// use a threadsafe function to call schedule work
// on the JavaScript thread
// The futures executor runs on the main thread thread but
// the waker runs on another thread.
//
// The main thread executor will run the contained futures
// and as soon as they stall (e.g. waiting for a channel, timer, etc),
// the executor will immediately yield back to the JavaScript event loop.
//
// This "parks" the executer, which normally means the thread
// is block - however we cannot do that here so instead, there
// is a sacrificial "waker" thread who's only job is to sleep/wake and
// signal to Nodejs that futures need to be run.
//
// The waker thread notifies the main thread of pending work by
// running the futures executor within a threadsafe function
let env_raw = cx.env().to_raw();

LocalWaker::send(WakerEvent::Init(unsafe {
Expand Down
67 changes: 34 additions & 33 deletions crates/neon/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub use crate::types::buffer::lock::Lock;
// use crate::async_local::{root::RootGlobal, spawn_async_local};
#[cfg(feature = "async_local")]
use futures::Future;
#[cfg(feature = "async_local")]
use crate::handle::StaticHandle;

use crate::{
event::TaskBuilder,
Expand Down Expand Up @@ -291,20 +293,16 @@ pub trait Context<'a>: ContextInternal<'a> {
}

#[cfg(feature = "async_local")]
fn execute_async_local<F, Fut>(&mut self, f: F)
fn spawn_local<F, Fut>(&mut self, f: F)
where
Fut: Future<Output = ()>,
F: FnOnce(AsyncContext) -> Fut + 'static,
{
use futures::Future;

let env = self.env();

crate::async_local::spawn_async_local(self, async move {
// let scope = unsafe { HandleScope::new(env.to_raw()) };
let future = f(AsyncContext { env });
future.await;
// drop(scope);
}).unwrap();
}

Expand Down Expand Up @@ -623,29 +621,31 @@ impl<'a> ModuleContext<'a> {
F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy,
V: Value,
{
// let wrapper = JsFunction::new(self, move |mut cx| {
// let mut args = vec![];

// while let Some(arg) = cx.argument_opt(args.len()) {
// let arg = arg.as_value(&mut cx);
// let arg = RootGlobal::new(&mut cx, arg);
// args.push(arg);
// }

// let (deferred, promise) = cx.promise();
// cx.execute_async_local(move |mut cx| async move {
// let acx = AsyncFunctionContext {
// env: cx.env(),
// arguments: args,
// };
// deferred.resolve(&mut cx, f(acx).await.unwrap());
// ()
// });

// Ok(promise)
// })?;

// self.exports.clone().set(self, key, wrapper)?;
use crate::handle::StaticHandle;

let wrapper = JsFunction::new(self, move |mut cx| {
let mut args = vec![];

while let Some(arg) = cx.argument_opt(args.len()) {
let arg = arg.as_value(&mut cx);
let arg = StaticHandle::new(&mut cx, arg)?;
args.push(arg);
}

let (deferred, promise) = cx.promise();
cx.spawn_local(move |mut cx| async move {
let acx = AsyncFunctionContext {
env: cx.env(),
arguments: args,
};
deferred.resolve(&mut cx, f(acx).await.unwrap());
()
});

Ok(promise)
})?;

self.exports.clone().set(self, key, wrapper)?;
Ok(())
}

Expand Down Expand Up @@ -851,16 +851,17 @@ impl<'a> Context<'a> for FunctionContext<'a> {}
#[cfg(feature = "async_local")]
pub struct AsyncFunctionContext {
env: Env,
// arguments: Vec<RootGlobal>,
arguments: Vec<StaticHandle<JsValue>>,
}

#[cfg(feature = "async_local")]
impl<'a> AsyncFunctionContext {
pub fn argument<V: Value>(&mut self, i: usize) -> JsResult<'a, V> {
// let arg = self.arguments.get(i).unwrap().clone();
// let handle = arg.into_inner(self);
// Ok(handle)
todo!()
let arg = self.arguments.get(i).unwrap().clone();
let arg = arg.from_static(self)?;
let value = unsafe { V::from_local(self.env(), arg.to_local()) };
let handle = Handle::new_internal(value);
Ok(handle)
}
}

Expand Down
7 changes: 5 additions & 2 deletions crates/neon/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ impl<'a, V: Value + 'a> Handle<'a, V> {
}
}

pub fn root_global(self, cx: &mut impl Context<'a>) -> NeonResult<RootGlobal<V>> {
RootGlobal::new(cx, self)
/// Detaches the value from the Nodejs garbage collector
/// and manages the variable lifetime via reference counting.
/// Useful when interacting with a value within async closures
pub fn to_static(self, cx: &mut impl Context<'a>) -> NeonResult<StaticHandle<V>> {
StaticHandle::new(cx, self)
}
}

Expand Down
42 changes: 24 additions & 18 deletions crates/neon/src/handle/root_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,36 @@ use crate::result::JsResult;
use crate::result::NeonResult;
use crate::types::JsFunction;
use crate::types::JsObject;
use crate::types::JsSymbol;

// This creates a rooted object and stores javascript
// values on it as a way to grant any JavaScript value
// a static lifetime

thread_local! {
// Symbol("__neon_cache")
static NEON_CACHE: OnceCell<Root<JsObject>> = OnceCell::default();
}

/// Reference counted JavaScript value with a static lifetime for use in async closures
pub struct RootGlobal<T> {
pub struct StaticHandle<T> {
pub(crate) count: Rc<RefCell<u32>>,
pub(crate) inner: Rc<String>,
pub(crate) inner: Rc<Root<JsSymbol>>,
_p: PhantomData<T>,
}

impl<T: Value> RootGlobal<T> {
impl<T: Value> StaticHandle<T> {
pub(crate) fn new<'a>(
cx: &mut impl Context<'a>,
value: Handle<'a, T>,
) -> NeonResult<RootGlobal<T>> {
) -> NeonResult<StaticHandle<T>> {
Ok(Self {
count: Rc::new(RefCell::new(1)),
inner: Rc::new(set_ref(cx, value)?),
_p: Default::default(),
})
}

pub fn clone<'a>(&self) -> RootGlobal<T> {
pub fn clone(&self) -> StaticHandle<T> {
let mut count = self.count.borrow_mut();
*count += 1;
drop(count);
Expand All @@ -49,16 +53,16 @@ impl<T: Value> RootGlobal<T> {
}
}

pub fn into_inner<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> {
get_ref(cx, &*self.inner)
pub fn from_static<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> {
get_ref(cx, &self.inner)
}

pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> {
let mut count = self.count.borrow_mut();
*count -= 1;

if *count == 0 {
delete_ref(cx, &*self.inner)?
delete_ref(cx, &self.inner)?
}

Ok(())
Expand All @@ -81,41 +85,43 @@ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> {
Ok(neon_cache.into_inner(cx))
}

fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult<String> {
fn set_ref<'a, V: Value>(
cx: &mut impl Context<'a>,
value: Handle<'a, V>,
) -> NeonResult<Root<JsSymbol>> {
let neon_cache = get_cache(cx)?;
// Is this safe?
let key = format!("{:?}", value.to_local());
let symbol = cx.symbol(format!("{:?}", value.to_local())).root(cx);

get_cache(cx)?
.get::<JsFunction, _, _>(cx, "set")?
.call_with(cx)
.this(neon_cache)
.arg(cx.string(&key))
.arg(symbol.clone(cx).into_inner(cx))
.arg(value)
.exec(cx)?;

Ok(key)
Ok(symbol)
}

fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &str) -> JsResult<'a, V> {
fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &Root<JsSymbol>) -> JsResult<'a, V> {
let neon_cache = get_cache(cx)?;

get_cache(cx)?
.get::<JsFunction, _, _>(cx, "get")?
.call_with(cx)
.this(neon_cache)
.arg(cx.string(&key))
.arg(key.clone(cx).into_inner(cx))
.apply(cx)
}

fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &str) -> NeonResult<()> {
fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &Root<JsSymbol>) -> NeonResult<()> {
let neon_cache = get_cache(cx)?;

get_cache(cx)?
.get::<JsFunction, _, _>(cx, "delete")?
.call_with(cx)
.this(neon_cache)
.arg(cx.string(&key))
.arg(key.clone(cx).into_inner(cx))
.exec(cx)?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/neon/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#[doc(no_inline)]
pub use crate::{
context::{
CallKind, ComputeContext, Context, ExecuteContext, FunctionContext, ModuleContext,
TaskContext,
AsyncContext, AsyncFunctionContext, CallKind, ComputeContext, Context, ExecuteContext,
FunctionContext, ModuleContext, TaskContext,
},
handle::{Handle, Root},
object::Object,
Expand Down

0 comments on commit 5b64045

Please sign in to comment.