From 3615a15c3fb9c188325746105e6c7861bb2329aa Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Sat, 23 Nov 2024 23:47:23 +0200 Subject: [PATCH 1/5] Hydration fix --- .../async_derived/arc_async_derived.rs | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index ecb747299e..95071e0dad 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -311,6 +311,8 @@ macro_rules! spawn_derived { let inner = Arc::downgrade(&this.inner); let wakers = Arc::downgrade(&this.wakers); let loading = Arc::downgrade(&this.loading); + #[cfg(debug_assertions)] + let defined_at = this.defined_at; let fut = async move { while rx.next().await.is_some() { let update_if_necessary = if $should_track { @@ -323,8 +325,29 @@ macro_rules! spawn_derived { if update_if_necessary || first_run.is_some() { match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) { (Some(value), Some(inner), Some(wakers), Some(loading)) => { + let this = ArcAsyncDerived { + #[cfg(debug_assertions)] + defined_at, + value, + wakers, + inner, + loading, + }; + + let owner = this.inner.read().or_poisoned().owner.clone(); + + this.loading.store(true, Ordering::Relaxed); + + if first_run.is_none() { + owner.with(|| { + let _ = this.try_read_untracked(); + for sub in (&this.inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_dirty(); + } + }); + } + // generate new Future - let owner = inner.read().or_poisoned().owner.clone(); let fut = initial_fut.take().unwrap_or_else(|| { let fut = if $should_track { owner.with_cleanup(|| { @@ -342,6 +365,8 @@ macro_rules! spawn_derived { Box::pin(fut) }); + this.loading.store(true, Ordering::Relaxed); + // register with global transition listener, if any let ready_tx = first_run.take().unwrap_or_else(|| { let (ready_tx, ready_rx) = oneshot::channel(); @@ -351,11 +376,8 @@ macro_rules! spawn_derived { ready_tx }); - // generate and assign new value - loading.store(true, Ordering::Relaxed); - let (this_version, suspense_ids) = { - let mut guard = inner.write().or_poisoned(); + let mut guard = this.inner.write().or_poisoned(); guard.version += 1; let version = guard.version; let suspense_ids = mem::take(&mut guard.suspenses) @@ -369,10 +391,10 @@ macro_rules! spawn_derived { drop(suspense_ids); - let latest_version = inner.read().or_poisoned().version; + let latest_version = this.inner.read().or_poisoned().version; if latest_version == this_version { - Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await; + Self::set_inner_value(new_value, this.value, this.wakers, this.inner, this.loading, Some(ready_tx)).await; } } _ => break, @@ -583,18 +605,22 @@ impl ReadUntracked for ArcAsyncDerived { type Value = ReadGuard, AsyncPlain>>; fn try_read_untracked(&self) -> Option { - if let Some(suspense_context) = use_context::() { - let handle = suspense_context.task_id(); - let ready = SpecialNonReactiveFuture::new(self.ready()); - crate::spawn(async move { - ready.await; - drop(handle); - }); - self.inner - .write() - .or_poisoned() - .suspenses - .push(suspense_context); + if self.loading.load(Ordering::Relaxed) + || self.value.blocking_read().is_none() + { + if let Some(suspense_context) = use_context::() { + let handle = suspense_context.task_id(); + let ready = SpecialNonReactiveFuture::new(self.ready()); + crate::spawn(async move { + ready.await; + drop(handle); + }); + self.inner + .write() + .or_poisoned() + .suspenses + .push(suspense_context); + } } AsyncPlain::try_new(&self.value).map(ReadGuard::new) } From c911affc14177241ecb7619a480c5c125ceb2ac8 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Sat, 23 Nov 2024 23:57:17 +0200 Subject: [PATCH 2/5] remove duplicate loading.set(true) --- reactive_graph/src/computed/async_derived/arc_async_derived.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index 95071e0dad..24c694e654 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -365,8 +365,6 @@ macro_rules! spawn_derived { Box::pin(fut) }); - this.loading.store(true, Ordering::Relaxed); - // register with global transition listener, if any let ready_tx = first_run.take().unwrap_or_else(|| { let (ready_tx, ready_rx) = oneshot::channel(); From 9e6ea0573f9f75162a908cfd8235b27b2c0723fa Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Sun, 24 Nov 2024 00:11:05 +0200 Subject: [PATCH 3/5] Remove extra unnecessary read --- .../src/computed/async_derived/arc_async_derived.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index 24c694e654..4be4c0e6f3 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -339,12 +339,9 @@ macro_rules! spawn_derived { this.loading.store(true, Ordering::Relaxed); if first_run.is_none() { - owner.with(|| { - let _ = this.try_read_untracked(); - for sub in (&this.inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_dirty(); - } - }); + for sub in (&this.inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_dirty(); + } } // generate new Future From e59928506179672f092cfb97ac51331188f03487 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Sun, 24 Nov 2024 00:14:45 +0200 Subject: [PATCH 4/5] Remove this now no longer needed --- .../async_derived/arc_async_derived.rs | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index 4be4c0e6f3..aaf1e6914c 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -311,8 +311,6 @@ macro_rules! spawn_derived { let inner = Arc::downgrade(&this.inner); let wakers = Arc::downgrade(&this.wakers); let loading = Arc::downgrade(&this.loading); - #[cfg(debug_assertions)] - let defined_at = this.defined_at; let fut = async move { while rx.next().await.is_some() { let update_if_necessary = if $should_track { @@ -325,21 +323,12 @@ macro_rules! spawn_derived { if update_if_necessary || first_run.is_some() { match (value.upgrade(), inner.upgrade(), wakers.upgrade(), loading.upgrade()) { (Some(value), Some(inner), Some(wakers), Some(loading)) => { - let this = ArcAsyncDerived { - #[cfg(debug_assertions)] - defined_at, - value, - wakers, - inner, - loading, - }; - - let owner = this.inner.read().or_poisoned().owner.clone(); + let owner = inner.read().or_poisoned().owner.clone(); - this.loading.store(true, Ordering::Relaxed); + loading.store(true, Ordering::Relaxed); if first_run.is_none() { - for sub in (&this.inner.read().or_poisoned().subscribers).into_iter() { + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { sub.mark_dirty(); } } @@ -372,7 +361,7 @@ macro_rules! spawn_derived { }); let (this_version, suspense_ids) = { - let mut guard = this.inner.write().or_poisoned(); + let mut guard = inner.write().or_poisoned(); guard.version += 1; let version = guard.version; let suspense_ids = mem::take(&mut guard.suspenses) @@ -386,10 +375,10 @@ macro_rules! spawn_derived { drop(suspense_ids); - let latest_version = this.inner.read().or_poisoned().version; + let latest_version = inner.read().or_poisoned().version; if latest_version == this_version { - Self::set_inner_value(new_value, this.value, this.wakers, this.inner, this.loading, Some(ready_tx)).await; + Self::set_inner_value(new_value, value, wakers, inner, loading, Some(ready_tx)).await; } } _ => break, From 4e798cbfcfec205dfd690d312c01b9561ae605d7 Mon Sep 17 00:00:00 2001 From: Zak Stucke Date: Sat, 30 Nov 2024 18:30:44 +0200 Subject: [PATCH 5/5] maybe --- .../async_derived/arc_async_derived.rs | 30 +++++++------ reactive_graph/tests/async_derived.rs | 42 ++++++++++++++++++- 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/reactive_graph/src/computed/async_derived/arc_async_derived.rs b/reactive_graph/src/computed/async_derived/arc_async_derived.rs index aaf1e6914c..420c87572e 100644 --- a/reactive_graph/src/computed/async_derived/arc_async_derived.rs +++ b/reactive_graph/src/computed/async_derived/arc_async_derived.rs @@ -325,13 +325,12 @@ macro_rules! spawn_derived { (Some(value), Some(inner), Some(wakers), Some(loading)) => { let owner = inner.read().or_poisoned().owner.clone(); - loading.store(true, Ordering::Relaxed); - if first_run.is_none() { - for sub in (&inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_dirty(); - } + inner.write().or_poisoned().state = AsyncDerivedState::Notifying; + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_dirty(); } + inner.write().or_poisoned().state = AsyncDerivedState::Clean; // generate new Future let fut = initial_fut.take().unwrap_or_else(|| { @@ -360,6 +359,8 @@ macro_rules! spawn_derived { ready_tx }); + loading.store(true, Ordering::Relaxed); + let (this_version, suspense_ids) = { let mut guard = inner.write().or_poisoned(); guard.version += 1; @@ -407,8 +408,10 @@ impl ArcAsyncDerived { loading: Arc, ready_tx: Option>, ) { - *value.write().await = Some(new_value); - Self::notify_subs(&wakers, &inner, &loading, ready_tx); + let mut guard = value.write().await; + let is_first_value = guard.is_none(); + *guard = Some(new_value); + Self::notify_subs(&wakers, &inner, &loading, ready_tx, is_first_value); } fn notify_subs( @@ -416,6 +419,7 @@ impl ArcAsyncDerived { inner: &Arc>, loading: &Arc, ready_tx: Option>, + notify_sync_subs: bool, ) { loading.store(false, Ordering::Relaxed); @@ -430,8 +434,10 @@ impl ArcAsyncDerived { } // notify reactive subscribers that we're not loading any more - for sub in (&inner.read().or_poisoned().subscribers).into_iter() { - sub.mark_dirty(); + if notify_sync_subs { + for sub in (&inner.read().or_poisoned().subscribers).into_iter() { + sub.mark_dirty(); + } } // notify async .awaiters @@ -589,9 +595,7 @@ impl ReadUntracked for ArcAsyncDerived { type Value = ReadGuard, AsyncPlain>>; fn try_read_untracked(&self) -> Option { - if self.loading.load(Ordering::Relaxed) - || self.value.blocking_read().is_none() - { + if self.value.blocking_read().is_none() { if let Some(suspense_context) = use_context::() { let handle = suspense_context.task_id(); let ready = SpecialNonReactiveFuture::new(self.ready()); @@ -612,7 +616,7 @@ impl ReadUntracked for ArcAsyncDerived { impl Notify for ArcAsyncDerived { fn notify(&self) { - Self::notify_subs(&self.wakers, &self.inner, &self.loading, None); + Self::notify_subs(&self.wakers, &self.inner, &self.loading, None, true); } } diff --git a/reactive_graph/tests/async_derived.rs b/reactive_graph/tests/async_derived.rs index 27205074dc..9a2928078c 100644 --- a/reactive_graph/tests/async_derived.rs +++ b/reactive_graph/tests/async_derived.rs @@ -1,9 +1,10 @@ use any_spawner::Executor; use reactive_graph::{ computed::{ArcAsyncDerived, AsyncDerived}, - owner::Owner, + effect::Effect, + owner::{Owner, StoredValue}, signal::RwSignal, - traits::{Get, Read, Set, With, WithUntracked}, + traits::{Get, GetValue, Read, Set, With, WithUntracked, WriteValue}, }; use std::future::pending; @@ -137,3 +138,40 @@ async fn async_derived_with_initial() { signal2.set(1); assert_eq!(derived.await, 2); } + +#[tokio::test] +async fn only_notifies_once_per_run() { + _ = Executor::init_tokio(); + let owner = Owner::new(); + owner.set(); + + let signal = RwSignal::new(0); + let derived1 = AsyncDerived::new(move || async move { + Executor::tick().await; + signal.get() + 1 + }); + let derived2 = AsyncDerived::new(move || async move { + let value = derived1.await; + value * 2 + }); + + let effect_runs = StoredValue::new(0); + Effect::new_isomorphic(move |_| { + *effect_runs.write_value() += 1; + println!("{:?}", derived2.get()); + }); + + Executor::tick().await; + assert_eq!(derived2.await, 2); + assert_eq!(effect_runs.get_value(), 1); + + signal.set(2); + Executor::tick().await; + assert_eq!(derived2.await, 6); + assert_eq!(effect_runs.get_value(), 2); + + signal.set(3); + Executor::tick().await; + assert_eq!(derived2.await, 8); + assert_eq!(effect_runs.get_value(), 3); +}