diff --git a/server_fn/src/codec/json.rs b/server_fn/src/codec/json.rs index db9292cbe9..d4324d61d6 100644 --- a/server_fn/src/codec/json.rs +++ b/server_fn/src/codec/json.rs @@ -1,12 +1,15 @@ -use super::{Encoding, FromReq, FromRes}; +use super::{Encoding, FromReq, FromRes, Streaming}; use crate::{ - error::ServerFnError, + error::{NoCustomError, ServerFnError}, request::{ClientReq, Req}, response::{ClientRes, Res}, IntoReq, IntoRes, }; +use bytes::Bytes; +use futures::{Stream, StreamExt}; use http::Method; use serde::{de::DeserializeOwned, Serialize}; +use std::pin::Pin; /// Pass arguments and receive responses as JSON in the body of a `POST` request. pub struct Json; @@ -66,3 +69,152 @@ where .map_err(|e| ServerFnError::Deserialization(e.to_string())) } } + +/// An encoding that represents a stream of JSON data. +/// +/// A server function that uses this as its output encoding should return [`StreamingJson`] +/// +/// ## Browser Support for Streaming Input +/// +/// Browser fetch requests do not currently support full request duplexing, which +/// means that that they do begin handling responses until the full request has been sent. +/// This means that if you use a streaming input encoding, the input stream needs to +/// end before the output will begin. +/// +/// Streaming requests are only allowed over HTTP2 or HTTP3. +pub struct StreamingJson; + +impl Encoding for StreamingJson { + // Each chunk is encoded as a JSON object, but the overall stream is not valid JSON so this uses the default stream content type + const CONTENT_TYPE: &'static str = Streaming::CONTENT_TYPE; + const METHOD: Method = Streaming::METHOD; +} + +/// A stream of typed data encoded as JSON. +/// +/// A server function can return this type if its output encoding is [`StreamingJson`]. +/// +/// ## Browser Support for Streaming Input +/// +/// Browser fetch requests do not currently support full request duplexing, which +/// means that that they do begin handling responses until the full request has been sent. +/// This means that if you use a streaming input encoding, the input stream needs to +/// end before the output will begin. +/// +/// Streaming requests are only allowed over HTTP2 or HTTP3. +pub struct JsonStream( + Pin>> + Send>>, +); + +impl std::fmt::Debug for JsonStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("JsonStream").finish() + } +} + +impl JsonStream { + /// Creates a new `ByteStream` from the given stream. + pub fn new( + value: impl Stream> + Send + 'static, + ) -> Self { + Self(Box::pin(value.map(|value| value.map(Into::into)))) + } +} + +impl JsonStream { + /// Consumes the wrapper, returning a stream of text. + pub fn into_inner( + self, + ) -> impl Stream>> + Send { + self.0 + } +} + +impl From for JsonStream +where + S: Stream + Send + 'static, +{ + fn from(value: S) -> Self { + Self(Box::pin(value.map(Ok))) + } +} + +impl IntoReq for S +where + Request: ClientReq, + S: Stream + Send + 'static, + T: Serialize + 'static, +{ + fn into_req( + self, + path: &str, + accepts: &str, + ) -> Result> { + let data: JsonStream = self.into(); + Request::try_new_streaming( + path, + accepts, + Streaming::CONTENT_TYPE, + data.0.map(|chunk| { + serde_json::to_vec(&chunk) + .unwrap_or_else(|_| Vec::new()) + .into() + }), + ) + } +} + +impl FromReq for S +where + Request: Req + Send + 'static, + // The additional `Stream` bound is never used, but it is required to avoid an error where `T` is unconstrained + S: Stream + From> + Send + 'static, + T: DeserializeOwned + 'static, +{ + async fn from_req(req: Request) -> Result> { + let data = req.try_into_stream()?; + let s = JsonStream::new(data.map(|chunk| { + chunk.and_then(|bytes| { + serde_json::from_slice(bytes.as_ref()) + .map_err(|e| ServerFnError::Deserialization(e.to_string())) + }) + })); + Ok(s.into()) + } +} + +impl IntoRes + for JsonStream +where + Response: Res, + CustErr: 'static, + T: Serialize + 'static, +{ + async fn into_res(self) -> Result> { + Response::try_from_stream( + Streaming::CONTENT_TYPE, + self.into_inner().map(|value| { + serde_json::to_vec(&value?) + .map(Bytes::from) + .map_err(|e| ServerFnError::Serialization(e.to_string())) + }), + ) + } +} + +impl FromRes + for JsonStream +where + Response: ClientRes + Send, + T: DeserializeOwned, +{ + async fn from_res(res: Response) -> Result> { + let stream = res.try_into_stream()?; + Ok(JsonStream::new(stream.map(|chunk| { + chunk.and_then(|bytes| { + serde_json::from_slice(bytes.as_ref()) + .map_err(|e| ServerFnError::Deserialization(e.to_string())) + }) + }))) + } +} diff --git a/server_fn/src/codec/mod.rs b/server_fn/src/codec/mod.rs index b9a157a451..ed34e6003e 100644 --- a/server_fn/src/codec/mod.rs +++ b/server_fn/src/codec/mod.rs @@ -170,7 +170,7 @@ pub trait IntoRes { /// data from a response. They are often quite short, usually consisting /// of just two steps: /// 1. Extracting a [`String`], [`Bytes`](bytes::Bytes), or a [`Stream`](futures::Stream) -/// from the response body. +/// from the response body. /// 2. Deserializing the data type from that value. /// /// For example, here’s the implementation for [`Json`].