Skip to content

Latest commit

 

History

History
77 lines (57 loc) · 2.88 KB

SelectingFutures.md

File metadata and controls

77 lines (57 loc) · 2.88 KB

Selecting Futures

There's one more type of spawning option to consider. It's a complicated one, so it got its own section. It's called select!. It's a macro that lets you wait for the first of several futures to complete---and then automatically cancels the other futures.

Implementing Timeouts

The code for this is in 03_async/select_timeout.

Let's start with a simple example. We'll spawn two futures. One will sleep for 1 second, and the other will sleep for 2 seconds. We'll use select! to wait for the first one to complete. Then we'll print a message and exit.

use std::time::Duration;
use tokio::time::sleep;

async fn do_work() {
    // Pretend to do some work that takes longer than expected
    sleep(Duration::from_secs(2)).await;
}

async fn timeout(seconds: f32) {
    // Wait for the specified number of seconds
    sleep(Duration::from_secs_f32(seconds)).await;
}

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = do_work() => println!("do_work() completed first"),
        _ = timeout(1.0) => println!("timeout() completed first"),
    }
}

The syntax is based on a match statement, but with an additional step. The format is: (value) = (function) => (what to do if it finishes first)

You can change the timeout to determine which will finish first. If you set it to 3 seconds, then do_work() will finish first. If you set it to 0.5 seconds, then timeout() will finish first.

Note that the other future is cancelled---but if you've done work that has side-effects (say saving to a file) the work that has already been performed will not be undone.

Receiving from Multiple Channels

The code for this is in 03_async/select_channels.

An easy way for an async function to be subscribed to multiple channels is to obtain the receivers, and then select! whichever one has data. Here's an example:

use tokio::sync::{mpsc, broadcast};

async fn receiver(mut rx: mpsc::Receiver<u32>, mut broadcast_rx: broadcast::Receiver<u32>) {
    loop {
        tokio::select! {
            Some(n) = rx.recv() => println!("Received message {n} on the mpsc channel"),
            Ok(n) = broadcast_rx.recv() => println!("Received message {n} on the broadcast channel"),
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<u32>(1);
    let (broadcast_tx, broadcast_rx) = broadcast::channel::<u32>(1);

    tokio::spawn(receiver(rx, broadcast_rx));

   for count in 0 .. 10 {
        if count % 2 == 0 {
            tx.send(count).await.unwrap();
        } else {
            broadcast_tx.send(count).unwrap();
        }
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
}

Note that if you have a continuous stream in the MPSC channel, the broadcast channel may take a while to fire! This pattern is good for sending "quit" messages and other control data---but only if it doesn't have to be instantaneous.