Skip to content

Commit

Permalink
maybe
Browse files Browse the repository at this point in the history
  • Loading branch information
zakstucke committed Nov 30, 2024
1 parent e599285 commit 4e798cb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 15 deletions.
30 changes: 17 additions & 13 deletions reactive_graph/src/computed/async_derived/arc_async_derived.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,12 @@ macro_rules! spawn_derived {
(Some(value), Some(inner), Some(wakers), Some(loading)) => {
let owner = inner.read().or_poisoned().owner.clone();

loading.store(true, Ordering::Relaxed);

if first_run.is_none() {
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_dirty();
}
inner.write().or_poisoned().state = AsyncDerivedState::Notifying;
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_dirty();
}
inner.write().or_poisoned().state = AsyncDerivedState::Clean;

// generate new Future
let fut = initial_fut.take().unwrap_or_else(|| {
Expand Down Expand Up @@ -360,6 +359,8 @@ macro_rules! spawn_derived {
ready_tx
});

loading.store(true, Ordering::Relaxed);

let (this_version, suspense_ids) = {
let mut guard = inner.write().or_poisoned();
guard.version += 1;
Expand Down Expand Up @@ -407,15 +408,18 @@ impl<T: 'static> ArcAsyncDerived<T> {
loading: Arc<AtomicBool>,
ready_tx: Option<oneshot::Sender<()>>,
) {
*value.write().await = Some(new_value);
Self::notify_subs(&wakers, &inner, &loading, ready_tx);
let mut guard = value.write().await;
let is_first_value = guard.is_none();
*guard = Some(new_value);
Self::notify_subs(&wakers, &inner, &loading, ready_tx, is_first_value);
}

fn notify_subs(
wakers: &Arc<RwLock<Vec<Waker>>>,
inner: &Arc<RwLock<ArcAsyncDerivedInner>>,
loading: &Arc<AtomicBool>,
ready_tx: Option<oneshot::Sender<()>>,
notify_sync_subs: bool,
) {
loading.store(false, Ordering::Relaxed);

Expand All @@ -430,8 +434,10 @@ impl<T: 'static> ArcAsyncDerived<T> {
}

// notify reactive subscribers that we're not loading any more
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_dirty();
if notify_sync_subs {
for sub in (&inner.read().or_poisoned().subscribers).into_iter() {
sub.mark_dirty();
}
}

// notify async .awaiters
Expand Down Expand Up @@ -589,9 +595,7 @@ impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {
type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;

fn try_read_untracked(&self) -> Option<Self::Value> {
if self.loading.load(Ordering::Relaxed)
|| self.value.blocking_read().is_none()
{
if self.value.blocking_read().is_none() {
if let Some(suspense_context) = use_context::<SuspenseContext>() {
let handle = suspense_context.task_id();
let ready = SpecialNonReactiveFuture::new(self.ready());
Expand All @@ -612,7 +616,7 @@ impl<T: 'static> ReadUntracked for ArcAsyncDerived<T> {

impl<T: 'static> Notify for ArcAsyncDerived<T> {
fn notify(&self) {
Self::notify_subs(&self.wakers, &self.inner, &self.loading, None);
Self::notify_subs(&self.wakers, &self.inner, &self.loading, None, true);
}
}

Expand Down
42 changes: 40 additions & 2 deletions reactive_graph/tests/async_derived.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use any_spawner::Executor;
use reactive_graph::{
computed::{ArcAsyncDerived, AsyncDerived},
owner::Owner,
effect::Effect,
owner::{Owner, StoredValue},
signal::RwSignal,
traits::{Get, Read, Set, With, WithUntracked},
traits::{Get, GetValue, Read, Set, With, WithUntracked, WriteValue},
};
use std::future::pending;

Expand Down Expand Up @@ -137,3 +138,40 @@ async fn async_derived_with_initial() {
signal2.set(1);
assert_eq!(derived.await, 2);
}

#[tokio::test]
async fn only_notifies_once_per_run() {
_ = Executor::init_tokio();
let owner = Owner::new();
owner.set();

let signal = RwSignal::new(0);
let derived1 = AsyncDerived::new(move || async move {
Executor::tick().await;
signal.get() + 1
});
let derived2 = AsyncDerived::new(move || async move {
let value = derived1.await;
value * 2
});

let effect_runs = StoredValue::new(0);
Effect::new_isomorphic(move |_| {
*effect_runs.write_value() += 1;
println!("{:?}", derived2.get());
});

Executor::tick().await;
assert_eq!(derived2.await, 2);
assert_eq!(effect_runs.get_value(), 1);

signal.set(2);
Executor::tick().await;
assert_eq!(derived2.await, 6);
assert_eq!(effect_runs.get_value(), 2);

signal.set(3);
Executor::tick().await;
assert_eq!(derived2.await, 8);
assert_eq!(effect_runs.get_value(), 3);
}

0 comments on commit 4e798cb

Please sign in to comment.