From e78ce7e6b96cae15b7c936ddda535ccd371ea363 Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Sat, 1 Apr 2023 11:25:00 -0400 Subject: [PATCH] feat: `create_blocking_resource` (#752) --- leptos_dom/src/ssr.rs | 83 ++++++---- leptos_dom/src/ssr_in_order.rs | 255 ++++++++++++++++++++----------- leptos_reactive/src/hydration.rs | 22 ++- leptos_reactive/src/resource.rs | 105 ++++++++++++- leptos_reactive/src/scope.rs | 52 +++++-- leptos_reactive/src/suspense.rs | 23 ++- 6 files changed, 387 insertions(+), 153 deletions(-) diff --git a/leptos_dom/src/ssr.rs b/leptos_dom/src/ssr.rs index ed347259c2..4e0baf00ad 100644 --- a/leptos_dom/src/ssr.rs +++ b/leptos_dom/src/ssr.rs @@ -133,7 +133,7 @@ pub fn render_to_stream_with_prefix_undisposed_with_context( let runtime = create_runtime(); let ( - (shell, prefix, pending_resources, pending_fragments, serializers), + (shell, pending_resources, pending_fragments, serializers), scope, disposer, ) = run_scope_undisposed(runtime, { @@ -146,57 +146,44 @@ pub fn render_to_stream_with_prefix_undisposed_with_context( let resources = cx.pending_resources(); let pending_resources = serde_json::to_string(&resources).unwrap(); - let prefix = prefix(cx); ( shell, - prefix, pending_resources, cx.pending_fragments(), cx.serialization_resolvers(), ) } }); + let cx = Scope { runtime, id: scope }; + let blocking_fragments = FuturesUnordered::new(); let fragments = FuturesUnordered::new(); - for (fragment_id, (fut, _)) in pending_fragments { - fragments.push(async move { (fragment_id, fut.await) }) + + for (fragment_id, data) in pending_fragments { + if data.should_block { + blocking_fragments + .push(async move { (fragment_id, data.out_of_order.await) }); + } else { + fragments + .push(async move { (fragment_id, data.out_of_order.await) }); + } } // resources and fragments // stream HTML for each as it resolves - // TODO can remove id_before_suspense entirely now - let fragments = fragments.map(|(fragment_id, html)| { - format!( - r#" - - - "# - ) - }); + let fragments = fragments_to_chunks(fragments); // stream data for each Resource as it resolves let resources = render_serializers(serializers); // HTML for the view function and script to store resources let stream = futures::stream::once(async move { + let mut blocking = String::new(); + let mut blocking_fragments = fragments_to_chunks(blocking_fragments); + while let Some(fragment) = blocking_fragments.next().await { + blocking.push_str(&fragment); + } + let prefix = prefix(cx); format!( r#" {prefix} @@ -206,6 +193,7 @@ pub fn render_to_stream_with_prefix_undisposed_with_context( __LEPTOS_RESOLVED_RESOURCES = new Map(); __LEPTOS_RESOURCE_RESOLVERS = new Map(); + {blocking} "# ) }) @@ -222,6 +210,37 @@ pub fn render_to_stream_with_prefix_undisposed_with_context( (stream, runtime, scope) } +fn fragments_to_chunks( + fragments: impl Stream, +) -> impl Stream { + fragments.map(|(fragment_id, html)| { + format!( + r#" + + + "# + ) + }) +} + impl View { /// Consumes the node and renders it into an HTML string. pub fn render_to_string(self, _cx: Scope) -> Cow<'static, str> { diff --git a/leptos_dom/src/ssr_in_order.rs b/leptos_dom/src/ssr_in_order.rs index fcf75e3a75..ea3cd40f7e 100644 --- a/leptos_dom/src/ssr_in_order.rs +++ b/leptos_dom/src/ssr_in_order.rs @@ -15,7 +15,7 @@ use leptos_reactive::{ create_runtime, run_scope_undisposed, suspense::StreamChunk, RuntimeId, Scope, ScopeId, }; -use std::borrow::Cow; +use std::{borrow::Cow, collections::VecDeque}; /// Renders a view to HTML, waiting to return until all `async` [Resource](leptos_reactive::Resource)s /// loaded in `` elements have finished loading. @@ -80,29 +80,48 @@ pub fn render_to_stream_in_order_with_prefix_undisposed_with_context( // create the runtime let runtime = create_runtime(); - let ((chunks, prefix, pending_resources, serializers), scope_id, disposer) = - run_scope_undisposed(runtime, |cx| { - // add additional context - additional_context(cx); + let ( + ( + blocking_fragments_ready, + chunks, + prefix, + pending_resources, + serializers, + ), + scope_id, + disposer, + ) = run_scope_undisposed(runtime, |cx| { + // add additional context + additional_context(cx); - // render view and return chunks - let view = view(cx); + // render view and return chunks + let view = view(cx); - let prefix = prefix(cx); - ( - view.into_stream_chunks(cx), - prefix, - serde_json::to_string(&cx.pending_resources()).unwrap(), - cx.serialization_resolvers(), - ) - }); + ( + cx.blocking_fragments_ready(), + view.into_stream_chunks(cx), + prefix, + serde_json::to_string(&cx.pending_resources()).unwrap(), + cx.serialization_resolvers(), + ) + }); + let cx = Scope { + runtime, + id: scope_id, + }; let (tx, rx) = futures::channel::mpsc::unbounded(); + let (prefix_tx, prefix_rx) = futures::channel::oneshot::channel(); leptos_reactive::spawn_local(async move { - handle_chunks(tx, chunks).await; + blocking_fragments_ready.await; + let remaining_chunks = handle_blocking_chunks(tx.clone(), chunks).await; + let prefix = prefix(cx); + prefix_tx.send(prefix).expect("to send prefix"); + handle_chunks(tx, remaining_chunks).await; }); let stream = futures::stream::once(async move { + let prefix = prefix_rx.await.expect("to receive prefix"); format!( r#" {prefix} @@ -126,18 +145,61 @@ pub fn render_to_stream_in_order_with_prefix_undisposed_with_context( } #[async_recursion(?Send)] -async fn handle_chunks(tx: UnboundedSender, chunks: Vec) { +async fn handle_blocking_chunks( + tx: UnboundedSender, + mut queued_chunks: VecDeque, +) -> VecDeque { + let mut buffer = String::new(); + while let Some(chunk) = queued_chunks.pop_front() { + match chunk { + StreamChunk::Sync(sync) => buffer.push_str(&sync), + StreamChunk::Async { + chunks, + should_block, + } => { + if should_block { + // add static HTML before the Suspense and stream it down + tx.unbounded_send(std::mem::take(&mut buffer)) + .expect("failed to send async HTML chunk"); + + // send the inner stream + let suspended = chunks.await; + handle_blocking_chunks(tx.clone(), suspended).await; + } else { + // TODO: should probably first check if there are any *other* blocking chunks + queued_chunks.push_front(StreamChunk::Async { + chunks, + should_block: false, + }); + break; + } + } + } + } + + // send final sync chunk + tx.unbounded_send(std::mem::take(&mut buffer)) + .expect("failed to send final HTML chunk"); + + queued_chunks +} + +#[async_recursion(?Send)] +async fn handle_chunks( + tx: UnboundedSender, + chunks: VecDeque, +) { let mut buffer = String::new(); for chunk in chunks { match chunk { StreamChunk::Sync(sync) => buffer.push_str(&sync), - StreamChunk::Async(suspended) => { + StreamChunk::Async { chunks, .. } => { // add static HTML before the Suspense and stream it down tx.unbounded_send(std::mem::take(&mut buffer)) .expect("failed to send async HTML chunk"); // send the inner stream - let suspended = suspended.await; + let suspended = chunks.await; handle_chunks(tx.clone(), suspended).await; } } @@ -149,8 +211,8 @@ async fn handle_chunks(tx: UnboundedSender, chunks: Vec) { impl View { /// Renders the view into a set of HTML chunks that can be streamed. - pub fn into_stream_chunks(self, cx: Scope) -> Vec { - let mut chunks = Vec::new(); + pub fn into_stream_chunks(self, cx: Scope) -> VecDeque { + let mut chunks = VecDeque::new(); self.into_stream_chunks_helper(cx, &mut chunks); chunks } @@ -158,37 +220,42 @@ impl View { fn into_stream_chunks_helper( self, cx: Scope, - chunks: &mut Vec, + chunks: &mut VecDeque, ) { match self { View::Suspense(id, _) => { let id = id.to_string(); - if let Some((_, fragment)) = cx.take_pending_fragment(&id) { - chunks.push(StreamChunk::Async(fragment)); + if let Some(data) = cx.take_pending_fragment(&id) { + chunks.push_back(StreamChunk::Async { + chunks: data.in_order, + should_block: data.should_block, + }); } } - View::Text(node) => chunks.push(StreamChunk::Sync(node.content)), + View::Text(node) => { + chunks.push_back(StreamChunk::Sync(node.content)) + } View::Component(node) => { cfg_if! { if #[cfg(debug_assertions)] { let name = crate::ssr::to_kebab_case(&node.name); - chunks.push(StreamChunk::Sync(format!(r#""#, HydrationCtx::to_string(&node.id, false)).into())); + chunks.push_back(StreamChunk::Sync(format!(r#""#, HydrationCtx::to_string(&node.id, false)).into())); for child in node.children { child.into_stream_chunks_helper(cx, chunks); } - chunks.push(StreamChunk::Sync(format!(r#""#, HydrationCtx::to_string(&node.id, true)).into())); + chunks.push_back(StreamChunk::Sync(format!(r#""#, HydrationCtx::to_string(&node.id, true)).into())); } else { for child in node.children { child.into_stream_chunks_helper(cx, chunks); } - chunks.push(StreamChunk::Sync(format!(r#""#, HydrationCtx::to_string(&node.id, true)).into())) + chunks.push_back(StreamChunk::Sync(format!(r#""#, HydrationCtx::to_string(&node.id, true)).into())) } } } View::Element(el) => { #[cfg(debug_assertions)] if let Some(id) = &el.view_marker { - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!("").into(), )); } @@ -196,7 +263,7 @@ impl View { for chunk in el_chunks { match chunk { StringOrView::String(string) => { - chunks.push(StreamChunk::Sync(string)) + chunks.push_back(StreamChunk::Sync(string)) } StringOrView::View(view) => { view().into_stream_chunks_helper(cx, chunks); @@ -232,18 +299,18 @@ impl View { .join(""); if el.is_void { - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!("<{tag_name}{attrs}/>").into(), )); } else if let Some(inner_html) = inner_html { - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!( "<{tag_name}{attrs}>{inner_html}" ) .into(), )); } else { - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!("<{tag_name}{attrs}>").into(), )); @@ -255,20 +322,20 @@ impl View { } } ElementChildren::InnerHtml(inner_html) => { - chunks.push(StreamChunk::Sync(inner_html)); + chunks.push_back(StreamChunk::Sync(inner_html)); } // handled above ElementChildren::Chunks(_) => unreachable!(), } - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!("").into(), )); } } #[cfg(debug_assertions)] if let Some(id) = &el.view_marker { - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!("").into(), )); } @@ -280,10 +347,10 @@ impl View { u.id.clone(), "", false, - Box::new(move |chunks: &mut Vec| { + Box::new(move |chunks: &mut VecDeque| { #[cfg(debug_assertions)] { - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!( "", HydrationCtx::to_string(&u.id, true) @@ -293,7 +360,7 @@ impl View { } #[cfg(not(debug_assertions))] - chunks.push(StreamChunk::Sync( + chunks.push_back(StreamChunk::Sync( format!( "", HydrationCtx::to_string(&u.id, true) @@ -301,7 +368,7 @@ impl View { .into(), )); }) - as Box)>, + as Box)>, ), CoreComponent::DynChild(node) => { let child = node.child.take(); @@ -309,34 +376,39 @@ impl View { node.id, "dyn-child", true, - Box::new(move |chunks: &mut Vec| { - if let Some(child) = *child { - // On debug builds, `DynChild` has two marker nodes, - // so there is no way for the text to be merged with - // surrounding text when the browser parses the HTML, - // but in release, `DynChild` only has a trailing marker, - // and the browser automatically merges the dynamic text - // into one single node, so we need to artificially make the - // browser create the dynamic text as it's own text node - if let View::Text(t) = child { - chunks.push( - if !cfg!(debug_assertions) { - StreamChunk::Sync( - format!("{}", t.content) + Box::new( + move |chunks: &mut VecDeque| { + if let Some(child) = *child { + // On debug builds, `DynChild` has two marker nodes, + // so there is no way for the text to be merged with + // surrounding text when the browser parses the HTML, + // but in release, `DynChild` only has a trailing marker, + // and the browser automatically merges the dynamic text + // into one single node, so we need to artificially make the + // browser create the dynamic text as it's own text node + if let View::Text(t) = child { + chunks.push_back( + if !cfg!(debug_assertions) { + StreamChunk::Sync( + format!( + "{}", + t.content + ) .into(), - ) - } else { - StreamChunk::Sync(t.content) - }, - ); - } else { - child.into_stream_chunks_helper( - cx, chunks, - ); + ) + } else { + StreamChunk::Sync(t.content) + }, + ); + } else { + child.into_stream_chunks_helper( + cx, chunks, + ); + } } - } - }) - as Box)>, + }, + ) + as Box)>, ) } CoreComponent::Each(node) => { @@ -345,33 +417,40 @@ impl View { node.id, "each", true, - Box::new(move |chunks: &mut Vec| { - for node in children.into_iter().flatten() { - let id = node.id; + Box::new( + move |chunks: &mut VecDeque| { + for node in children.into_iter().flatten() { + let id = node.id; - #[cfg(debug_assertions)] - { - chunks.push(StreamChunk::Sync( - format!( + #[cfg(debug_assertions)] + { + chunks.push_back( + StreamChunk::Sync( + format!( "", HydrationCtx::to_string(&id, false) ) - .into(), - )); - node.child.into_stream_chunks_helper( - cx, chunks, - ); - chunks.push(StreamChunk::Sync( - format!( + .into(), + ), + ); + node.child + .into_stream_chunks_helper( + cx, chunks, + ); + chunks.push_back( + StreamChunk::Sync( + format!( "", HydrationCtx::to_string(&id, true) ) - .into(), - )); + .into(), + ), + ); + } } - } - }) - as Box)>, + }, + ) + as Box)>, ) } }; @@ -379,13 +458,13 @@ impl View { if wrap { cfg_if! { if #[cfg(debug_assertions)] { - chunks.push(StreamChunk::Sync(format!("", HydrationCtx::to_string(&id, false)).into())); + chunks.push_back(StreamChunk::Sync(format!("", HydrationCtx::to_string(&id, false)).into())); content(chunks); - chunks.push(StreamChunk::Sync(format!("", HydrationCtx::to_string(&id, true)).into())); + chunks.push_back(StreamChunk::Sync(format!("", HydrationCtx::to_string(&id, true)).into())); } else { let _ = name; content(chunks); - chunks.push(StreamChunk::Sync(format!("", HydrationCtx::to_string(&id, true)).into())) + chunks.push_back(StreamChunk::Sync(format!("", HydrationCtx::to_string(&id, true)).into())) } } } else { diff --git a/leptos_reactive/src/hydration.rs b/leptos_reactive/src/hydration.rs index df5a23f495..8a61cf5adb 100644 --- a/leptos_reactive/src/hydration.rs +++ b/leptos_reactive/src/hydration.rs @@ -1,20 +1,26 @@ #![forbid(unsafe_code)] use crate::{runtime::PinnedFuture, suspense::StreamChunk, ResourceId}; use cfg_if::cfg_if; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; pub struct SharedContext { pub events: Vec<()>, pub pending_resources: HashSet, pub resolved_resources: HashMap, #[allow(clippy::type_complexity)] - // index String is the fragment ID: tuple is - // `( - // Future of HTML when resolved (out-of-order) - // Future of additional stream chunks when resolved (in-order) - // )` - pub pending_fragments: - HashMap, PinnedFuture>)>, + pub pending_fragments: HashMap, +} + +/// Represents its pending `` fragment. +pub struct FragmentData { + /// Future that represents how it should be render for an out-of-order stream. + pub out_of_order: PinnedFuture, + /// Future that represents how it should be render for an in-order stream. + pub in_order: PinnedFuture>, + /// Whether the stream should wait for this fragment before sending any data. + pub should_block: bool, + /// Future that will resolve when the fragment is ready. + pub is_ready: Option>, } impl std::fmt::Debug for SharedContext { diff --git a/leptos_reactive/src/resource.rs b/leptos_reactive/src/resource.rs index 6f3b21d515..d256474b06 100644 --- a/leptos_reactive/src/resource.rs +++ b/leptos_reactive/src/resource.rs @@ -106,6 +106,73 @@ pub fn create_resource_with_initial_value( fetcher: impl Fn(S) -> Fu + 'static, initial_value: Option, ) -> Resource +where + S: PartialEq + Debug + Clone + 'static, + T: Serializable + 'static, + Fu: Future + 'static, +{ + create_resource_helper( + cx, + source, + fetcher, + initial_value, + ResourceSerialization::Serializable, + ) +} + +/// Creates a “blocking” [Resource](crate::Resource). When server-side rendering is used, +/// this resource will cause any `` you read it under to block the initial +/// chunk of HTML from being sent to the client. This means that if you set things like +/// HTTP headers or `` metadata in that ``, that header material will +/// be included in the server’s original response. +/// +/// This causes a slow time to first byte (TTFB) but is very useful for loading data that +/// is essential to the first load. For example, a blog post page that needs to include +/// the title of the blog post in the page’s initial HTML `` tag for SEO reasons +/// might use a blocking resource to load blog post metadata, which will prevent the page from +/// returning until that data has loaded. +/// +/// **Note**: This is not “blocking” in the sense that it blocks the current thread. Rather, +/// it is blocking in the sense that it blocks the server from sending a response. +#[cfg_attr( + debug_assertions, + instrument( + level = "trace", + skip_all, + fields( + scope = ?cx.id, + ty = %std::any::type_name::<T>(), + signal_ty = %std::any::type_name::<S>(), + ) + ) +)] +#[track_caller] +pub fn create_blocking_resource<S, T, Fu>( + cx: Scope, + source: impl Fn() -> S + 'static, + fetcher: impl Fn(S) -> Fu + 'static, +) -> Resource<S, T> +where + S: PartialEq + Debug + Clone + 'static, + T: Serializable + 'static, + Fu: Future<Output = T> + 'static, +{ + create_resource_helper( + cx, + source, + fetcher, + None, + ResourceSerialization::Blocking, + ) +} + +fn create_resource_helper<S, T, Fu>( + cx: Scope, + source: impl Fn() -> S + 'static, + fetcher: impl Fn(S) -> Fu + 'static, + initial_value: Option<T>, + serializable: ResourceSerialization, +) -> Resource<S, T> where S: PartialEq + Debug + Clone + 'static, T: Serializable + 'static, @@ -132,7 +199,7 @@ where resolved: Rc::new(Cell::new(resolved)), scheduled: Rc::new(Cell::new(false)), suspense_contexts: Default::default(), - serializable: true, + serializable, }); let id = with_runtime(cx.runtime, |runtime| { @@ -256,7 +323,7 @@ where resolved: Rc::new(Cell::new(resolved)), scheduled: Rc::new(Cell::new(false)), suspense_contexts: Default::default(), - serializable: false, + serializable: ResourceSerialization::Local, }); let id = with_runtime(cx.runtime, |runtime| { @@ -560,7 +627,19 @@ where resolved: Rc<Cell<bool>>, scheduled: Rc<Cell<bool>>, suspense_contexts: Rc<RefCell<HashSet<SuspenseContext>>>, - serializable: bool, + serializable: ResourceSerialization, +} + +/// Whether and how the resource can be serialized. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum ResourceSerialization { + /// Not serializable. + Local, + /// Can be serialized. + Serializable, + /// Can be serialized, and cause the first chunk to be blocked until + /// their suspense has resolved. + Blocking, } impl<S, T> ResourceState<S, T> @@ -600,7 +679,7 @@ where let serializable = self.serializable; if let Some(suspense_cx) = &suspense_cx { - if serializable { + if serializable != ResourceSerialization::Local { suspense_cx.has_local_only.set_value(false); } } else { @@ -633,7 +712,12 @@ where // because the context has been tracked here // on the first read, resource is already loading without having incremented if !has_value { - s.increment(serializable); + s.increment( + serializable != ResourceSerialization::Local, + ); + if serializable == ResourceSerialization::Blocking { + s.should_block.set_value(true); + } } } } @@ -674,7 +758,12 @@ where let suspense_contexts = self.suspense_contexts.clone(); for suspense_context in suspense_contexts.borrow().iter() { - suspense_context.increment(self.serializable); + suspense_context.increment( + self.serializable != ResourceSerialization::Local, + ); + if self.serializable == ResourceSerialization::Blocking { + suspense_context.should_block.set_value(true); + } } // run the Future @@ -692,7 +781,9 @@ where set_loading.update(|n| *n = false); for suspense_context in suspense_contexts.borrow().iter() { - suspense_context.decrement(serializable); + suspense_context.decrement( + serializable != ResourceSerialization::Local, + ); } } }) diff --git a/leptos_reactive/src/scope.rs b/leptos_reactive/src/scope.rs index b99be87f91..b91fab9245 100644 --- a/leptos_reactive/src/scope.rs +++ b/leptos_reactive/src/scope.rs @@ -1,13 +1,17 @@ #![forbid(unsafe_code)] use crate::{ console_warn, + hydration::FragmentData, node::NodeId, runtime::{with_runtime, RuntimeId}, suspense::StreamChunk, PinnedFuture, ResourceId, StoredValueId, SuspenseContext, }; use futures::stream::FuturesUnordered; -use std::{collections::HashMap, fmt}; +use std::{ + collections::{HashMap, VecDeque}, + fmt, +}; #[doc(hidden)] #[must_use = "Scope will leak memory if the disposer function is never called"] @@ -374,7 +378,7 @@ impl Scope { context: SuspenseContext, key: &str, out_of_order_resolver: impl FnOnce() -> String + 'static, - in_order_resolver: impl FnOnce() -> Vec<StreamChunk> + 'static, + in_order_resolver: impl FnOnce() -> VecDeque<StreamChunk> + 'static, ) { use crate::create_isomorphic_effect; use futures::StreamExt; @@ -383,6 +387,7 @@ impl Scope { let mut shared_context = runtime.shared_context.borrow_mut(); let (tx1, mut rx1) = futures::channel::mpsc::unbounded(); let (tx2, mut rx2) = futures::channel::mpsc::unbounded(); + let (tx3, mut rx3) = futures::channel::mpsc::unbounded(); create_isomorphic_effect(*self, move |_| { let pending = context @@ -393,21 +398,26 @@ impl Scope { if pending == 0 { _ = tx1.unbounded_send(()); _ = tx2.unbounded_send(()); + _ = tx3.unbounded_send(()); } }); shared_context.pending_fragments.insert( key.to_string(), - ( - Box::pin(async move { + FragmentData { + out_of_order: Box::pin(async move { rx1.next().await; out_of_order_resolver() }), - Box::pin(async move { + in_order: Box::pin(async move { rx2.next().await; in_order_resolver() }), - ), + should_block: context.should_block(), + is_ready: Some(Box::pin(async move { + rx3.next().await; + })), + }, ); }) } @@ -416,10 +426,7 @@ impl Scope { /// /// The keys are hydration IDs. Values are tuples of two pinned /// `Future`s that return content for out-of-order and in-order streaming, respectively. - pub fn pending_fragments( - &self, - ) -> HashMap<String, (PinnedFuture<String>, PinnedFuture<Vec<StreamChunk>>)> - { + pub fn pending_fragments(&self) -> HashMap<String, FragmentData> { with_runtime(self.runtime, |runtime| { let mut shared_context = runtime.shared_context.borrow_mut(); std::mem::take(&mut shared_context.pending_fragments) @@ -427,14 +434,31 @@ impl Scope { .unwrap_or_default() } + /// A future that will resolve when all blocking fragments are ready. + pub fn blocking_fragments_ready(self) -> PinnedFuture<()> { + use futures::StreamExt; + + let mut ready = with_runtime(self.runtime, |runtime| { + let mut shared_context = runtime.shared_context.borrow_mut(); + let ready = FuturesUnordered::new(); + for (_, data) in shared_context.pending_fragments.iter_mut() { + if data.should_block { + if let Some(is_ready) = data.is_ready.take() { + ready.push(is_ready); + } + } + } + ready + }) + .unwrap_or_default(); + Box::pin(async move { while ready.next().await.is_some() {} }) + } + /// Takes the pending HTML for a single `<Suspense/>` node. /// /// Returns a tuple of two pinned `Future`s that return content for out-of-order /// and in-order streaming, respectively. - pub fn take_pending_fragment( - &self, - id: &str, - ) -> Option<(PinnedFuture<String>, PinnedFuture<Vec<StreamChunk>>)> { + pub fn take_pending_fragment(&self, id: &str) -> Option<FragmentData> { with_runtime(self.runtime, |runtime| { let mut shared_context = runtime.shared_context.borrow_mut(); shared_context.pending_fragments.remove(id) diff --git a/leptos_reactive/src/suspense.rs b/leptos_reactive/src/suspense.rs index 8c56c8e639..97af49ccd6 100644 --- a/leptos_reactive/src/suspense.rs +++ b/leptos_reactive/src/suspense.rs @@ -6,7 +6,7 @@ use crate::{ RwSignal, Scope, SignalUpdate, StoredValue, WriteSignal, }; use futures::Future; -use std::{borrow::Cow, pin::Pin}; +use std::{borrow::Cow, collections::VecDeque, pin::Pin}; /// Tracks [Resource](crate::Resource)s that are read under a suspense context, /// i.e., within a [`Suspense`](https://docs.rs/leptos_core/latest/leptos_core/fn.Suspense.html) component. @@ -17,13 +17,21 @@ pub struct SuspenseContext { set_pending_resources: WriteSignal<usize>, pub(crate) pending_serializable_resources: RwSignal<usize>, pub(crate) has_local_only: StoredValue<bool>, + pub(crate) should_block: StoredValue<bool>, } impl SuspenseContext { - /// Whether the suspense contains local resources at this moment, and therefore can't be + /// Whether the suspense contains local resources at this moment, + /// and therefore can't be serialized pub fn has_local_only(&self) -> bool { self.has_local_only.get_value() } + + /// Whether any blocking resources are read under this suspense context, + /// meaning the HTML stream should not begin until it has resolved. + pub fn should_block(&self) -> bool { + self.should_block.get_value() + } } impl std::hash::Hash for SuspenseContext { @@ -46,11 +54,13 @@ impl SuspenseContext { let (pending_resources, set_pending_resources) = create_signal(cx, 0); let pending_serializable_resources = create_rw_signal(cx, 0); let has_local_only = store_value(cx, true); + let should_block = store_value(cx, false); Self { pending_resources, set_pending_resources, pending_serializable_resources, has_local_only, + should_block, } } @@ -101,14 +111,19 @@ pub enum StreamChunk { /// A chunk of synchronous HTML. Sync(Cow<'static, str>), /// A future that resolves to be a list of additional chunks. - Async(Pin<Box<dyn Future<Output = Vec<StreamChunk>>>>), + Async { + /// The HTML chunks this contains. + chunks: Pin<Box<dyn Future<Output = VecDeque<StreamChunk>>>>, + /// Whether this should block the stream. + should_block: bool, + }, } impl std::fmt::Debug for StreamChunk { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { StreamChunk::Sync(data) => write!(f, "StreamChunk::Sync({data:?})"), - StreamChunk::Async(_) => write!(f, "StreamChunk::Async(_)"), + StreamChunk::Async { .. } => write!(f, "StreamChunk::Async(_)"), } } }