Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for a streaming value with #server. #1284

Closed
MinaMatta98 opened this issue Jul 5, 2023 · 9 comments
Closed

Allow for a streaming value with #server. #1284

MinaMatta98 opened this issue Jul 5, 2023 · 9 comments

Comments

@MinaMatta98
Copy link

Is your feature request related to a problem? Please describe.
No

Describe the solution you'd like
Currently, #server macro does not allow for a return of a stream type, due to the requirement to implement Serialize.

Take the following server_fn as an example:

#[server(GetImage, "/api", "Url")]
pub async fn get_image(
    cx: Scope,
    path: String,
) -> Result<
    Option<
        impl futures_util::Stream<Item = Result<Vec<u8>, std::io::Error>>
            + Unpin
            + Serialize
            + Deserialize<'static>,
    >,
    ServerFnError,
> {
    use tokio::io::AsyncReadExt;

    let mut path = path;
    path.remove(0);
    let path = std::env::current_dir().unwrap().join(path);

    if let Ok(mut file) = tokio::fs::File::open(path).await {
        let mut buffer = Vec::new();
        file.read_to_end(&mut buffer).await?;

        let stream = futures_util::stream::once(async move { Ok(buffer) });
        tokio::pin!(stream);

        Ok(Some(stream))
    } else {
        Ok(None)
    }
}

A solution would be to stream the output of the return value.

Let's say that I had a rather large Vec<_> to return, a stream return value would be very useful in this case.

@gbj gbj added the enhancement New feature or request label Jul 5, 2023
@gbj
Copy link
Collaborator

gbj commented Jul 7, 2023

I'm open to implementing streaming server functions, but I've heard several different proposals from people imagining that they mean several different things, which makes me cautious. For example, some people assume that a server function that returns a Stream should be a websocket connection or server-sent events. In this case, it's clearly a streaming HTTP response with a clear beginning and end.

This kind of use case can currently be done by creating an API route in your server framework of choice that returns a streaming response. It could then be consumed with reqwest from the client using something like Response::bytes_stream()

Integrating Stream into something like <Suspense/>, HTTP streaming, etc. is a more daunting task.

@MinaMatta98
Copy link
Author

Hello,

Forgive the ambiguity. The request was for a stream to be consumed within a Suspense tag.

You don't even have to automate the consumption. You could implement a server fn that returns an impl Stream and allow the user to handle the sink internally within suspense.

@gbj
Copy link
Collaborator

gbj commented Jul 8, 2023

The request was for a stream to be consumed within a Suspense tag.

You don't even have to automate the consumption. You could implement a server fn that returns an impl Stream and allow the user to handle the sink internally within suspense.

Could you give an example of how that would look and how it would behave?

The way I'd typically consume a stream is

let mut stream = some_stream;

while let Some(data) = stream.next().await {
  // do something with data
}

How does that translate into <Suspense/>? I guess the specific questions here are:

  1. At which points should the fallback be shown? (Before the first chunk? Until the final chunk?)
  2. At which point should the rendered HTML of the children be sent to the browser from the server during SSR? (Once the first chunk has loaded? Not until all chunks have loaded?)

Feel free also to create an imaginary <Stream/> component with the perfect API for how you want to consume the stream... It may be conceptually better, and almost certainly easier to implement, if it's a distinct concept/API from Suspense anyway!

@MinaMatta98
Copy link
Author

Hello,

I initially was thinking of streaming an image, such that chunks of vec are returned. Think of images that render partially or even better, videos that are streamed.

I don't actually think that Suspense should automatically resolve the stream, but the implementation should be for the user.

For example, would it be possible to implement a render_stream_chunked and render_stream_whole fn that would control the flow within the Suspense tag? This could also be handled via a signal, with the while let Some...

This would mean that the user would control both points one and two that you have mentioned before.

If you wanted to implement it via a tag, maybe:

<Stream fallback=||() completed=bool  Action=(server_fn -> impl stream) map=some_signal >
{move || 
        view! {cx,
           <img src=base_64_enc_uri(same_signal)
        }
}
</Stream>

Could it be initially trialed by allowing #server functions to return streams, and letting users handle it? I ran into problems trying to return an impl stream from a server fn, due to the serialisation requirement.

@duttaoindril
Copy link

Another usecase of stream to consider that I would really love to have handled are LLM generations - in that case, I want the fallback only before the first message, and then show some computed result, the way a chatgpt stream looks

@Jinxit
Copy link
Contributor

Jinxit commented Aug 26, 2023

My use case for streaming server functions would be search results which I would like to present as they come in (one by one) instead of having to wait for the full list. This could of course be done, as you say, using websockets or SSE; but when everything else can be done with server functions it feels sort of strange to step outside of that world and the utility functionality I've built specifically for server functions.

@Jinxit
Copy link
Contributor

Jinxit commented Aug 26, 2023

Another use case is some sort of progressive enhancement where the server can give a "rough estimate" nearly instantly, then progressively update to a better value over the next few seconds.

@gbj gbj added feature request and removed enhancement New feature or request labels Sep 7, 2023
@MinaMatta98
Copy link
Author

Hello,

If this is any help, you could use futures::channels which already implement stream within the #server function, like the following:

#[post("/reportstream")]
pub async fn report(payload: web::Payload) -> impl Responder {

    let (mut tx, rx) = futures::channel::mpsc::unbounded::<
        std::result::Result<actix_web::web::Bytes, std::io::Error>,
    >();

  ...
  
 tokio::task::spawn(async move {
       /// some call made earlier bound to result
        while let Some(message) = result.next().await {
            let message = match message {
                Ok(message) => {
                    if let Some(message) = message.choices[0].delta.content.clone() {
                        message.clone()
                    } else {
                        String::new()
                    }
                }
                Err(e) => e.to_string(),
            };
            let _ = tx.start_send(Ok(Bytes::from(message)));
        }
    });

    HttpResponse::Ok().streaming(rx)
}

The channels already implement stream. You would have to make the macro a wrapper around the channel, although I do not know the specifics of your current implementation

@gbj gbj mentioned this issue Jan 4, 2024
20 tasks
@gbj
Copy link
Collaborator

gbj commented Jan 20, 2024

Streaming responses from server functions added in #2158.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants