Skip to content

Commit

Permalink
feat: add a StreamingJson encoding (#2623)
Browse files Browse the repository at this point in the history
  • Loading branch information
ealmloff authored Jun 28, 2024
1 parent 44cd327 commit 551f9b0
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 3 deletions.
156 changes: 154 additions & 2 deletions server_fn/src/codec/json.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use super::{Encoding, FromReq, FromRes};
use super::{Encoding, FromReq, FromRes, Streaming};
use crate::{
error::ServerFnError,
error::{NoCustomError, ServerFnError},
request::{ClientReq, Req},
response::{ClientRes, Res},
IntoReq, IntoRes,
};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use http::Method;
use serde::{de::DeserializeOwned, Serialize};
use std::pin::Pin;
/// Pass arguments and receive responses as JSON in the body of a `POST` request.
pub struct Json;

Expand Down Expand Up @@ -66,3 +69,152 @@ where
.map_err(|e| ServerFnError::Deserialization(e.to_string()))
}
}

/// An encoding that represents a stream of JSON data.
///
/// A server function that uses this as its output encoding should return [`StreamingJson`]
///
/// ## Browser Support for Streaming Input
///
/// Browser fetch requests do not currently support full request duplexing, which
/// means that that they do begin handling responses until the full request has been sent.
/// This means that if you use a streaming input encoding, the input stream needs to
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct StreamingJson;

impl Encoding for StreamingJson {
// Each chunk is encoded as a JSON object, but the overall stream is not valid JSON so this uses the default stream content type
const CONTENT_TYPE: &'static str = Streaming::CONTENT_TYPE;
const METHOD: Method = Streaming::METHOD;
}

/// A stream of typed data encoded as JSON.
///
/// A server function can return this type if its output encoding is [`StreamingJson`].
///
/// ## Browser Support for Streaming Input
///
/// Browser fetch requests do not currently support full request duplexing, which
/// means that that they do begin handling responses until the full request has been sent.
/// This means that if you use a streaming input encoding, the input stream needs to
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct JsonStream<T, CustErr = NoCustomError>(
Pin<Box<dyn Stream<Item = Result<T, ServerFnError<CustErr>>> + Send>>,
);

impl<T, CustErr> std::fmt::Debug for JsonStream<T, CustErr> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("JsonStream").finish()
}
}

impl<T> JsonStream<T> {
/// Creates a new `ByteStream` from the given stream.
pub fn new(
value: impl Stream<Item = Result<T, ServerFnError>> + Send + 'static,
) -> Self {
Self(Box::pin(value.map(|value| value.map(Into::into))))
}
}

impl<T, CustErr> JsonStream<T, CustErr> {
/// Consumes the wrapper, returning a stream of text.
pub fn into_inner(
self,
) -> impl Stream<Item = Result<T, ServerFnError<CustErr>>> + Send {
self.0
}
}

impl<S, T: 'static, CustErr: 'static> From<S> for JsonStream<T, CustErr>
where
S: Stream<Item = T> + Send + 'static,
{
fn from(value: S) -> Self {
Self(Box::pin(value.map(Ok)))
}
}

impl<CustErr, S, T, Request> IntoReq<StreamingJson, Request, CustErr> for S
where
Request: ClientReq<CustErr>,
S: Stream<Item = T> + Send + 'static,
T: Serialize + 'static,
{
fn into_req(
self,
path: &str,
accepts: &str,
) -> Result<Request, ServerFnError<CustErr>> {
let data: JsonStream<T> = self.into();
Request::try_new_streaming(
path,
accepts,
Streaming::CONTENT_TYPE,
data.0.map(|chunk| {
serde_json::to_vec(&chunk)
.unwrap_or_else(|_| Vec::new())
.into()
}),
)
}
}

impl<CustErr, T, S, Request> FromReq<StreamingJson, Request, CustErr> for S
where
Request: Req<CustErr> + Send + 'static,
// The additional `Stream<Item = T>` bound is never used, but it is required to avoid an error where `T` is unconstrained
S: Stream<Item = T> + From<JsonStream<T>> + Send + 'static,
T: DeserializeOwned + 'static,
{
async fn from_req(req: Request) -> Result<Self, ServerFnError<CustErr>> {
let data = req.try_into_stream()?;
let s = JsonStream::new(data.map(|chunk| {
chunk.and_then(|bytes| {
serde_json::from_slice(bytes.as_ref())
.map_err(|e| ServerFnError::Deserialization(e.to_string()))
})
}));
Ok(s.into())
}
}

impl<CustErr, T, Response> IntoRes<StreamingJson, Response, CustErr>
for JsonStream<T, CustErr>
where
Response: Res<CustErr>,
CustErr: 'static,
T: Serialize + 'static,
{
async fn into_res(self) -> Result<Response, ServerFnError<CustErr>> {
Response::try_from_stream(
Streaming::CONTENT_TYPE,
self.into_inner().map(|value| {
serde_json::to_vec(&value?)
.map(Bytes::from)
.map_err(|e| ServerFnError::Serialization(e.to_string()))
}),
)
}
}

impl<CustErr, T, Response> FromRes<StreamingJson, Response, CustErr>
for JsonStream<T>
where
Response: ClientRes<CustErr> + Send,
T: DeserializeOwned,
{
async fn from_res(res: Response) -> Result<Self, ServerFnError<CustErr>> {
let stream = res.try_into_stream()?;
Ok(JsonStream::new(stream.map(|chunk| {
chunk.and_then(|bytes| {
serde_json::from_slice(bytes.as_ref())
.map_err(|e| ServerFnError::Deserialization(e.to_string()))
})
})))
}
}
2 changes: 1 addition & 1 deletion server_fn/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub trait IntoRes<Encoding, Response, CustErr> {
/// data from a response. They are often quite short, usually consisting
/// of just two steps:
/// 1. Extracting a [`String`], [`Bytes`](bytes::Bytes), or a [`Stream`](futures::Stream)
/// from the response body.
/// from the response body.
/// 2. Deserializing the data type from that value.
///
/// For example, here’s the implementation for [`Json`].
Expand Down

0 comments on commit 551f9b0

Please sign in to comment.