From 4e798cbfcfec205dfd690d312c01b9561ae605d7 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Sat, 30 Nov 2024 18:30:44 +0200 Subject: [PATCH] maybe --- .../async_derived/arc_async_derived.rs | 30 +++++++------ reactive_graph/tests/async_derived.rs | 42 ++++++++++++++++++- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index aaf1e6914c..420c87572e 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -325,13 +325,12 @@ macro_rules! spawn_derived { (Some(value), Some(inner), Some(wakers), Some(loading)) => { let owner = inner.read().or_poisoned().owner.clone(); - loading.store(true, Ordering::Relaxed); - if first_run.is_none() { - for sub in (&inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_dirty(); - } + inner.write().or_poisoned().state = AsyncDerivedState::Notifying; + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_dirty(); } + inner.write().or_poisoned().state = AsyncDerivedState::Clean; // generate new Future let fut = initial_fut.take().unwrap_or_else(|| { @@ -360,6 +359,8 @@ macro_rules! spawn_derived { ready_tx }); + loading.store(true, Ordering::Relaxed); + let (this_version, suspense_ids) = { let mut guard = inner.write().or_poisoned(); guard.version += 1; @@ -407,8 +408,10 @@ impl ArcAsyncDerived { loading: Arc, ready_tx: Option>, ) { - *value.write().await = Some(new_value); - Self::notify_subs(&wakers, &inner, &loading, ready_tx); + let mut guard = value.write().await; + let is_first_value = guard.is_none(); + *guard = Some(new_value); + Self::notify_subs(&wakers, &inner, &loading, ready_tx, is_first_value); } fn notify_subs( @@ -416,6 +419,7 @@ impl ArcAsyncDerived { inner: &Arc>, loading: &Arc, ready_tx: Option>, + notify_sync_subs: bool, ) { loading.store(false, Ordering::Relaxed); @@ -430,8 +434,10 @@ impl ArcAsyncDerived { } // notify reactive subscribers that we're not loading any more - for sub in (&inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_dirty(); + if notify_sync_subs { + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_dirty(); + } } // notify async .awaiters @@ -589,9 +595,7 @@ impl ReadUntracked for ArcAsyncDerived { type Value = ReadGuard, AsyncPlain>>; fn try_read_untracked(&self) -> Option { - if self.loading.load(Ordering::Relaxed) - || self.value.blocking_read().is_none() - { + if self.value.blocking_read().is_none() { if let Some(suspense_context) = use_context::() { let handle = suspense_context.task_id(); let ready = SpecialNonReactiveFuture::new(self.ready()); @@ -612,7 +616,7 @@ impl ReadUntracked for ArcAsyncDerived { impl Notify for ArcAsyncDerived { fn notify(&self) { - Self::notify_subs(&self.wakers, &self.inner, &self.loading, None); + Self::notify_subs(&self.wakers, &self.inner, &self.loading, None, true); } } diff --git a/reactive_graph/tests/async_derived.rs b/reactive_graph/tests/async_derived.rs index 27205074dc..9a2928078c 100644 --- a/reactive_graph/tests/async_derived.rs +++ b/reactive_graph/tests/async_derived.rs @@ -1,9 +1,10 @@ use any_spawner::Executor; use reactive_graph::{ computed::{ArcAsyncDerived, AsyncDerived}, - owner::Owner, + effect::Effect, + owner::{Owner, StoredValue}, signal::RwSignal, - traits::{Get, Read, Set, With, WithUntracked}, + traits::{Get, GetValue, Read, Set, With, WithUntracked, WriteValue}, }; use std::future::pending; @@ -137,3 +138,40 @@ async fn async_derived_with_initial() { signal2.set(1); assert_eq!(derived.await, 2); } + +#[tokio::test] +async fn only_notifies_once_per_run() { + _ = Executor::init_tokio(); + let owner = Owner::new(); + owner.set(); + + let signal = RwSignal::new(0); + let derived1 = AsyncDerived::new(move || async move { + Executor::tick().await; + signal.get() + 1 + }); + let derived2 = AsyncDerived::new(move || async move { + let value = derived1.await; + value * 2 + }); + + let effect_runs = StoredValue::new(0); + Effect::new_isomorphic(move |_| { + *effect_runs.write_value() += 1; + println!("{:?}", derived2.get()); + }); + + Executor::tick().await; + assert_eq!(derived2.await, 2); + assert_eq!(effect_runs.get_value(), 1); + + signal.set(2); + Executor::tick().await; + assert_eq!(derived2.await, 6); + assert_eq!(effect_runs.get_value(), 2); + + signal.set(3); + Executor::tick().await; + assert_eq!(derived2.await, 8); + assert_eq!(effect_runs.get_value(), 3); +}