Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cancelling & restarting TextStream using use_future() leaks connections & breaks down if you do it really fast #3300

Open
johnny-smitherson opened this issue Dec 6, 2024 · 2 comments · May be fixed by #3560
Assignees
Labels
bug Something isn't working
Milestone

Comments

@johnny-smitherson
Copy link

johnny-smitherson commented Dec 6, 2024

Problem

Cancelling and restarting a use_future task leaks connections and breaks down if it happens faster than about 1 time/second. This means users double-clicking a button might leak connections, and users spamming the button will hang the functionality.

We start from the fullstack text stream example. We will change it as follows:.

On the server we've got a TextStream server function. We make a small change here: we check the tx.send call for error (if it's cancelled) and print a warning message.

On the front-end we've consumer that opens the stream and reads from it with use_future.

The button is changed so it now cancels the old future and restarts a new one.

#[allow(non_snake_case)]

use dioxus::prelude::*;
use dioxus_logger::tracing::{error, info, warn};

fn main() {
    dioxus_logger::init(dioxus_logger::tracing::Level::INFO).expect("failed to init logger");
    info!("dioxus launch...");
    dioxus::launch(|| {
        rsx! {
            Router::<Route> {}
        }
    });
}

#[derive(Routable, Clone, Debug, PartialEq)]
#[rustfmt::skip]
enum Route {
    #[route("/")]
    Hello{},
}

#[component]
fn Hello() -> Element {
    let mut response = use_signal(String::new);
    use futures_util::StreamExt;
    let mut fut = use_future(move || async move {
        response.write().clear();
        if let Ok(stream) = test_stream().await {
            response.write().push_str("Stream started\n");
            let mut stream = stream.into_inner();
            while let Some(Ok(text)) = stream.next().await {
                response.write().push_str(&text);
            }
        }
    });
    rsx! {
        button {
            onclick: move |_|  {
                fut.cancel();
                fut.restart();
            },
            "Cancel & Restart stream"
        }
        pre {
            "{response}"
        }
    }
}
use server_fn::codec::StreamingText;
use server_fn::codec::TextStream;
#[server(output = StreamingText)]
pub async fn test_stream() -> Result<TextStream, ServerFnError> {
    // let (tx, rx) = async_channel::bounded(1);
    let (mut tx, rx) = futures::channel::mpsc::unbounded();
    tokio::spawn(async move {
        loop {
            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
            if let Err(_) = tx.unbounded_send(Ok("Hello, world!\n".to_string())) {
                warn!(" test_stream() : stream close");
                tx.close_channel();
                return;
            }
        }
    });

    Ok(TextStream::new(rx))
}

Steps To Reproduce

Steps to reproduce the behavior:

  • fullstack text stream example
  • on front-end, read text stream inside use_future, and make the button cancel and restart the use_future
  • click slowly; observe cancellations in the browser tools network tab
  • spam click the button (a few clicks per second should do)
  • connections are leaked
  • if you click fast enough, whole page stops working

Expected behavior

all except last one connections closed, regardless of how much the button is being spammed, or how many hot-reloads are being triggered in a short time

Screenshots

We click the button slowly (one click per second max), cancellation works:

Screenshot 2024-12-06 214104 Screenshot 2024-12-06 214133

If we spam the button with the mouse, it doesn't cancel the requests anymore:

Screenshot 2024-12-06 214406 Screenshot 2024-12-06 214425

Instead, they are closed 5 min later when I close the browser tab:

Screenshot 2024-12-06 214743

Environment:

  • Dioxus version: 0.6.0-rc.0
  • Rust version: 1.82
  • OS info: windows 11 firefox 133
  • App platform: web + fullstac

Edit - Workaround (sort of)

Use broadcast channels to send cancellation message into all the older futures. Use another async channel to get a confirmation back that it indeed got the first message. Only then, restart the future. Not doing both of these results in leaks/breakage.

Spam clicking the button now gets hung up on the recv() call which unblocks only when the old task is dead. Is this blocked async closure leaked forever? Still, better than leaking whole streams.

Anyway, this solution breaks down when you increase the server stream message interval to 5 seconds: the connection is closed in the browser only after the server closes it, and it's closed on the server only after failing to write -- so we end up leaking connections until there is more traffic on the old connections that we wanted cancelled.

#[component]
fn Hello() -> Element {
    let mut response = use_signal(String::new);
    use futures_util::StreamExt;
    let cancel = use_signal(|| tokio::sync::broadcast::channel::<()>(1));
    let cancel_ack = use_signal(|| async_channel::bounded::<()>(1));
    let mut fut = use_future(move || {
        async move {
            info!("future started.");
            response.write().clear();
            if let Ok(stream) = test_stream().await {
                response.write().push_str("Stream started\n");
                let mut stream = stream.into_inner();
                let mut cancel_rx = cancel.peek().0.subscribe();
                let cancel_ack_tx = cancel_ack.peek().0.clone();
                let cancel_future = cancel_rx.recv();
                futures::pin_mut!(cancel_future);
                loop {
                    cancel_future = match futures::future::select(stream.next(), cancel_future).await {
                        futures::future::Either::Left((stream_value, _next_fut)) => {
                            if let Some(Ok(text)) = stream_value {
                                response.write().push_str(&text);
                                _next_fut
                            } else {
                                info!("stream finished.");
                                return;
                            }
                        }
                        futures::future::Either::Right((_stop_value, _)) => {
                            if let Err(e) = cancel_ack_tx.send(()).await {
                                error!("failed to send cancel ack message: {:?}", e);
                            }
                            info!("cancel channel got message: cancelling stream.");
                            // async_std::task::sleep(std::time::Duration::from_millis(1000)).await;
                            return;
                        }
                    }
                }
            }
    }});
    rsx! {
        button {
            onclick: move |_| async move {
                info!("button clicked.");
                if let Err(e) = cancel.peek().0.clone().send(()) {
                    warn!("cancel failed!!! {:?}", e);
                }
                info!("cancel sent, waiting for resp...");
                if let Err(e) = cancel_ack.peek().1.recv().await {
                    warn!("cancel_ack read failed: {:?}!", e);
                }
                info!("cancel ack received.");
                fut.cancel();
                fut.restart();
                info!("future restarted.");
            },
            "Cancel & Restart stream"
        }
        pre {
            "{response}"
        }
    }
}

Edit2 - another problem

Even with the crutch above, it's still very easy to leak connections.

All you have to do is have hot-reloading on and spam "save" in a source file:

Screenshot 2024-12-06 232849 Screenshot 2024-12-06 232741

Edit 3 - conflicting docstrings

So I'm running let mut fut = use_future(...); fut.cancel()

The cancel method has docstring /// Forcefully cancel a future.

It then calls Task::cancel which has docstring /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you -- doesn't sound like it does what the previous comment says?

By "abort handle" I eventually found out futures::stream::Abortable which I tried but it didn't abort the connections.

I also tried:

  • tokio::task::spawn, won't compile because signals aren't Send
  • tokio::task::spawn_local, after adding rt feature, running it panics with "spawn_local called from outside of a task::LocalSet or LocalRuntime". Couldn't figure out what to do from there
  • async_std::task::spawn_local - the future can't access dioxus context. Also, cancel doesn't kill all the closures/connections either.
  • wasm_bindgen_futures::spawn_local -- doesn't even have a cancel/abort handle method.
@jkelleyrtp jkelleyrtp added the bug Something isn't working label Jan 7, 2025
@jkelleyrtp jkelleyrtp added this to the 0.6.2 milestone Jan 9, 2025
@ealmloff ealmloff self-assigned this Jan 13, 2025
@ealmloff ealmloff linked a pull request Jan 13, 2025 that will close this issue
@ealmloff
Copy link
Member

ealmloff commented Jan 13, 2025

The stream does cancel properly on desktop + fullstack. I think the gloo implementation of server_fn returns a future that is not cancel safe somewhere. Bumping the version of the server function crate to 0.7 seems to fix this issue. You can test the fix with:

dioxus = { git = "https://github.com/ealmloff/dioxus", branch = "update-server-fn", features = ["fullstack", "router"] }

@johnny-smitherson
Copy link
Author

yes it works for me, thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants