Skip to content

Commit

Permalink
fix streaming requests and clarify in docs
Browse files Browse the repository at this point in the history
  • Loading branch information
gbj committed Jan 19, 2024
1 parent 04747fc commit 8f2fbb0
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 63 deletions.
45 changes: 0 additions & 45 deletions examples/server_fns_axum/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub fn HomePage() -> impl IntoView {
<RkyvExample/>
<FileUpload/>
<FileWatcher/>
<StreamingValues/>
}
}

Expand Down Expand Up @@ -507,47 +506,3 @@ pub fn CustomErrorTypes() -> impl IntoView {
</p>
}
}

#[component]
pub fn StreamingValues() -> impl IntoView {
use futures::StreamExt;

/// You can create server functions that accept streaming values by using the encoding
/// `Streaming` (with type `ByteStream`) or encoding `StreamingText` (with type `TextStream`)
#[server(input = StreamingText, output = StreamingText)]
pub async fn streaming(input: TextStream) -> Result<TextStream, ServerFnError> {
println!("inside streaming() fn");
Ok(TextStream::from(input.into_inner().map(|text| format!("{}!!!", text.unwrap_or_else(|e| e.to_string())))))
}

let mut count = 0;
let (tx, rx) = futures::channel::mpsc::unbounded();
let (result, set_result) = create_signal("Click me...".to_string());


if cfg!(feature = "hydrate") {
spawn_local(async move {
logging::log!("calling streaming server fn");
match streaming(TextStream::new(rx)).await {
Ok(res) => {
logging::log!("after calling streaming()");
let mut stream = res.into_inner();
while let Some(chunk) = stream.next().await {
set_result(chunk.unwrap_or_else(|e| e.to_string()));
}
}, Err(e) => logging::log!("{e}") }
})
}

view! {
<h3>Streaming arguments and responses</h3>
<button
on:click=move |_| {
count += 1;
tx.unbounded_send(Ok(count.to_string())).expect("couldn't send into channel");
}
>
{result}
</button>
}
}
34 changes: 30 additions & 4 deletions server_fn/src/codec/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ use std::{fmt::Debug, pin::Pin};
/// An encoding that represents a stream of bytes.
///
/// A server function that uses this as its output encoding should return [`ByteStream`].
///
/// ## Browser Support for Streaming Input
///
/// Browser fetch requests do not currently support full request duplexing, which
/// means that that they do begin handling responses until the full request has been sent.
/// This means that if you use a streaming input encoding, the input stream needs to
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct Streaming;

impl Encoding for Streaming {
Expand Down Expand Up @@ -49,6 +58,15 @@ where
/// A stream of bytes.
///
/// A server function can return this type if its output encoding is [`Streaming`].
///
/// ## Browser Support for Streaming Input
///
/// Browser fetch requests do not currently support full request duplexing, which
/// means that that they do begin handling responses until the full request has been sent.
/// This means that if you use a streaming input encoding, the input stream needs to
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct ByteStream<CustErr = NoCustomError>(
Pin<Box<dyn Stream<Item = Result<Bytes, ServerFnError<CustErr>>> + Send>>,
);
Expand Down Expand Up @@ -115,10 +133,14 @@ where
///
/// A server function that uses this as its output encoding should return [`TextStream`].
///
/// **Note**: Browser fetch requests do not currently support full request duplexing, which
/// ## Browser Support for Streaming Input
///
/// Browser fetch requests do not currently support full request duplexing, which
/// means that that they do begin handling responses until the full request has been sent.
/// This means that if you use streaming text as an input encoding, the input stream needs to
/// This means that if you use a streaming input encoding, the input stream needs to
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct StreamingText;

impl Encoding for StreamingText {
Expand All @@ -130,10 +152,14 @@ impl Encoding for StreamingText {
///
/// A server function can return this type if its output encoding is [`StreamingText`].
///
/// **Note**: Browser fetch requests do not currently support full request duplexing, which
/// ## Browser Support for Streaming Input
///
/// Browser fetch requests do not currently support full request duplexing, which
/// means that that they do begin handling responses until the full request has been sent.
/// This means that if you use streaming text as an input encoding, the input stream needs to
/// This means that if you use a streaming input encoding, the input stream needs to
/// end before the output will begin.
///
/// Streaming requests are only allowed over HTTP2 or HTTP3.
pub struct TextStream<CustErr = NoCustomError>(
Pin<Box<dyn Stream<Item = Result<String, ServerFnError<CustErr>>> + Send>>,
);
Expand Down
47 changes: 33 additions & 14 deletions server_fn/src/request/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use crate::{client::get_server_url, error::ServerFnError};
use bytes::Bytes;
use futures::{Stream, StreamExt};
pub use gloo_net::http::Request;
use js_sys::Uint8Array;
use js_sys::{Reflect, Uint8Array};
use send_wrapper::SendWrapper;
use wasm_bindgen::JsValue;
use wasm_streams::ReadableStream;
use web_sys::{FormData, UrlSearchParams};
use web_sys::{FormData, Headers, RequestInit, UrlSearchParams};

/// A `fetch` request made in the browser.
#[derive(Debug)]
Expand Down Expand Up @@ -143,17 +143,36 @@ impl<CustErr> ClientReq<CustErr> for BrowserRequest {
content_type: &str,
body: impl Stream<Item = Bytes> + 'static,
) -> Result<Self, ServerFnError<CustErr>> {
let stream = ReadableStream::from_stream(body.map(|bytes| {
let data = Uint8Array::from(bytes.as_ref());
let data = JsValue::from(data);
Ok(data) as Result<JsValue, JsValue>
}));
Ok(Self(SendWrapper::new(
Request::post(path)
.header("Content-Type", content_type)
.header("Accept", accepts)
.body(stream.into_raw())
.map_err(|e| ServerFnError::Request(e.to_string()))?,
)))
let req = streaming_request(path, accepts, content_type, body)
.map_err(|e| ServerFnError::Request(format!("{e:?}")))?;
Ok(Self(SendWrapper::new(req)))
}
}

fn streaming_request(
path: &str,
accepts: &str,
content_type: &str,
body: impl Stream<Item = Bytes> + 'static,
) -> Result<Request, JsValue> {
let stream = ReadableStream::from_stream(body.map(|bytes| {
let data = Uint8Array::from(bytes.as_ref());
let data = JsValue::from(data);
Ok(data) as Result<JsValue, JsValue>
}))
.into_raw();
let headers = Headers::new()?;
headers.append("Content-Type", content_type)?;
headers.append("Accept", accepts)?;
let mut init = RequestInit::new();
init.headers(&headers).method("POST").body(Some(&stream));

// Chrome requires setting `duplex: "half"` on streaming requests
Reflect::set(
&init,
&JsValue::from_str("duplex"),
&JsValue::from_str("half"),
)?;
let req = web_sys::Request::new_with_str_and_init(path, &init)?;
Ok(Request::from(req))
}

0 comments on commit 8f2fbb0

Please sign in to comment.