Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scoped Futures #1761

Merged
merged 10 commits into from
Sep 28, 2023
1 change: 1 addition & 0 deletions leptos_reactive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ web-sys = { version = "0.3", optional = true, features = [
cfg-if = "1"
indexmap = "2"
self_cell = "1.0.0"
pin-project = "1"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand Down
7 changes: 5 additions & 2 deletions leptos_reactive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ pub use resource::*;
use runtime::*;
pub use runtime::{
as_child_of_current_owner, batch, create_runtime, current_runtime,
on_cleanup, run_as_child, set_current_runtime, untrack,
untrack_with_diagnostics, with_current_owner, with_owner, Owner, RuntimeId,
on_cleanup, run_as_child, set_current_runtime,
spawn_local_with_current_owner, spawn_local_with_owner,
try_spawn_local_with_current_owner, try_spawn_local_with_owner,
try_with_owner, untrack, untrack_with_diagnostics, with_current_owner,
with_owner, Owner, RuntimeId, ScopedFuture,
};
pub use selector::*;
pub use serialization::*;
Expand Down
182 changes: 169 additions & 13 deletions leptos_reactive/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use cfg_if::cfg_if;
use core::hash::BuildHasherDefault;
use futures::stream::FuturesUnordered;
use indexmap::IndexSet;
use pin_project::pin_project;
use rustc_hash::{FxHashMap, FxHasher};
use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
use std::{
Expand All @@ -23,6 +24,7 @@ use std::{
marker::PhantomData,
pin::Pin,
rc::Rc,
task::Poll,
};

pub(crate) type PinnedFuture<T> = Pin<Box<dyn Future<Output = T>>>;
Expand Down Expand Up @@ -817,25 +819,39 @@ where
///
/// ## Panics
/// Panics if there is no current reactive runtime.
pub fn with_owner<T>(owner: Owner, f: impl FnOnce() -> T + 'static) -> T
where
T: 'static,
{
pub fn with_owner<T>(owner: Owner, f: impl FnOnce() -> T) -> T {
try_with_owner(owner, f)
.expect("runtime/scope should be alive when with_owner runs")
}

/// Runs the given code with the given reactive owner.
pub fn try_with_owner<T>(owner: Owner, f: impl FnOnce() -> T) -> Option<T> {
with_runtime(|runtime| {
let prev_observer = runtime.observer.take();
let prev_owner = runtime.owner.take();
runtime
.nodes
.try_borrow()
.map(|nodes| nodes.contains_key(owner.0))
.map(|scope_exists| {
scope_exists.then(|| {
let prev_observer = runtime.observer.take();
let prev_owner = runtime.owner.take();

runtime.owner.set(Some(owner.0));
runtime.observer.set(Some(owner.0));
runtime.owner.set(Some(owner.0));
runtime.observer.set(Some(owner.0));

let v = f();
let v = f();

runtime.observer.set(prev_observer);
runtime.owner.set(prev_owner);
runtime.observer.set(prev_observer);
runtime.owner.set(prev_owner);

v
v
})
})
.ok()
.flatten()
})
.expect("runtime should be alive when with_owner runs")
.ok()
.flatten()
}

/// Runs the given function as a child of the current Owner, once.
Expand Down Expand Up @@ -1469,3 +1485,143 @@ pub fn untrack<T>(f: impl FnOnce() -> T) -> T {
pub fn untrack_with_diagnostics<T>(f: impl FnOnce() -> T) -> T {
Runtime::current().untrack(f, true)
}

/// Allows running a future that has access to a given scope.
#[pin_project]
pub struct ScopedFuture<Fut: Future> {
owner: Owner,
#[pin]
future: Fut,
}

impl<Fut: Future + 'static> Future for ScopedFuture<Fut> {
type Output = Option<Fut::Output>;

fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
// TODO: we need to think about how to make this
// not panic for scopes that have been cleaned up...
// or perhaps we can force the scope to not be cleaned
// up until all futures that have a handle to them are
// dropped...

let this = self.project();

if let Some(poll) = try_with_owner(*this.owner, || this.future.poll(cx))
{
match poll {
Poll::Ready(res) => Poll::Ready(Some(res)),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}

impl<Fut: Future> ScopedFuture<Fut> {
/// Creates a new future that will have access to the `[Owner]`'s
/// scope context.
pub fn new(owner: Owner, fut: Fut) -> Self {
Self { owner, future: fut }
}

/// Runs the future in the current [`Owner`]'s scope context.
///
/// # Panics
/// Panics if there is no current [`Owner`] context available.
#[track_caller]
pub fn new_current(fut: Fut) -> Self {
Self {
owner: Owner::current().expect(
"`ScopedFuture::new_current()` to be called within an `Owner` \
context",
),
future: fut,
}
}
}

/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
pub fn spawn_local_with_owner(
owner: Owner,
fut: impl Future<Output = ()> + 'static,
) {
let scoped_future = ScopedFuture::new(owner, fut);

crate::spawn_local(async move {
if scoped_future.await.is_none() {
// TODO: should we warn here?
// /* warning message */
}
});
}

/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
///
/// # Panics
/// Panics if there is no [`Owner`] context available.
#[track_caller]
pub fn spawn_local_with_current_owner(fut: impl Future<Output = ()> + 'static) {
let scoped_future = ScopedFuture::new_current(fut);

crate::spawn_local(async move {
if scoped_future.await.is_none() {
// TODO: should we warn here?
// /* warning message */
}
});
}

/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
///
/// Since futures run in the background, it is possible that
/// the scope has been cleaned up since the future started running.
/// If this happens, the future will not be completed.
///
/// The `on_cancelled` callback can be used to notify you that the
/// future was cancelled.
pub fn try_spawn_local_with_owner(
owner: Owner,
fut: impl Future<Output = ()> + 'static,
on_cancelled: impl FnOnce() + 'static,
) {
let scoped_future = ScopedFuture::new(owner, fut);

crate::spawn_local(async move {
if scoped_future.await.is_none() {
on_cancelled();
}
});
}

/// Runs a future that has access to the provided [`Owner`]'s
/// scope context.
///
/// Since futures run in the background, it is possible that
/// the scope has been cleaned up since the future started running.
/// If this happens, the future will not be completed.
///
/// The `on_cancelled` callback can be used to notify you that the
/// future was cancelled.
///
/// # Panics
/// Panics if there is no [`Owner`] context available.
#[track_caller]
pub fn try_spawn_local_with_current_owner(
fut: impl Future<Output = ()> + 'static,
on_cancelled: impl FnOnce() + 'static,
) {
let scoped_future = ScopedFuture::new_current(fut);

crate::spawn_local(async move {
if scoped_future.await.is_none() {
on_cancelled();
}
});
}
Loading