Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-tokio executor & timeouts #297

Open
vhdirk opened this issue Mar 19, 2020 · 12 comments
Open

Non-tokio executor & timeouts #297

vhdirk opened this issue Mar 19, 2020 · 12 comments

Comments

@vhdirk
Copy link

vhdirk commented Mar 19, 2020

When using an executor that is different from tokio (in my case glib), launching a request from the client results in the following message at runtime:

050, marker: <Phantomcow> }), cache: ThreadCache { is_unread: true, has_attachment: false, tags: [] } }))
 INFO  inox_gtk::components::thread_view                 > load_thread: Thread { thread: Some(Thread { ptr: 0x55a29e2a0050, marker: <Phantomcow> }), cache: ThreadCache { is_unread: true, has_attachment: false, tags: [] } }
thread 'main' panicked at 'there is no timer running, must be called from the context of Tokio runtime', /home/dirk/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.13/src/time/driver/handle.rs:24:9
stack backtrace:

It seems tokio timeouts are not compatible with other runtimes. It'd be cool if there was a runtime-agnostic solution. Peraps https://github.com/async-rs/futures-timer ?

@vorot93
Copy link
Contributor

vorot93 commented Mar 21, 2020

It is possible to use your own runtime, provided that it is within the context of Tokio.

@ghost
Copy link

ghost commented Aug 18, 2020

@vorot93 The problem with the Tokio context is twofold:

  1. It is either really difficult or impossible to set up the Tokio context inside your custom executor.
  2. There are no practical examples, documentation, or guides on how to do that.

Speaking as a previous Tokio developer and somebody who has been building the async Rust ecosystem for the past several years, I have no idea how to make a "hello world" example of futures::executor::ThreadPool with the Tokio context set up inside it. It's most likely not possible at all!

I read @carllerche's example you linked to, but how do you substitute // Run the runtime somewhere (background thread?) with ThreadPool?

Whenever the concern of Tokio's incompatibility with other libraries is brought up, the argument is that it's possible to set up the Tokio context inside other runtimes. But until the Tokio documentation shows the most basic "hello world" of how to do that, it is simply not possible to use Tokio with the rest of the async ecosystem.

@vhdirk Possible runtime-agnostic solutions are futures-timer and async-io (my own crate).

@LucioFranco
Copy link

Hey, so I spent a few minutes this morning throwing together an example of using tokio with the futures executor ThreadPool. This can be made generic to any executor as long as you wrap the future as shown in the example.

https://gist.github.com/LucioFranco/ec8c6617b7193fd062d1ddc32b5bb991

Most of the complexity here is making it easy to spawn the future with a global runtime. Otherwise, the main thing here is just wrapping the poll call in a handle.enter(|| ...) like carl showed.

Hopefully, this is helpful! We could probably add this as an example to the tokio repo.

@ghost
Copy link

ghost commented Aug 18, 2020

@LucioFranco Thank you, this is the first runnable example that I've encountered! Still, I must admit that the code is quite intimidating for a 'hello world'. If I may make a few suggestions on how to improve the situation:

  • There should also be an example showing how to set up the context inside block_on() - the entry point into async code.
  • Exposing TokioIo in tokio-utils or some crate like that would be helpful. Or at the very least, including its implementation in the docs for tokio::runtime::Runtime. If we're supposed to wrap Future::poll() for individual futures into Handle::enter(), why is there only Handle::enter() and not something like Handle::wrap_future()?
  • It is not always obvious where to get the runtime Handle from. Should I start my own runtime or do Handle::try_current()? Illustrating that in an example would be nice. Calling try_current() is especially useful when starting ThreadPool from an #[tokio::main].
  • Rust users struggle with compatibility between tokio::io::{AsyncRead, AsyncWrite} and futures::io::{AsyncRead, AsyncWrite}. There is no mention of tokio_util::compat anywhere in https://docs.rs/tokio (you just have to hear about it from someone) and there are zero doc-examples showing how to use it.

If you're looking for the most bang-for-buck, you could just make the following two examples:

  1. A #[tokio::main] that starts a ThreadPool inheriting the tokio runtime. Then, inside the ThreadPool, spawn a task that calls tokio::time::delay_for().

  2. A regular fn main() that creates a tokio runtime and runs block_on() that does futures::io::copy(tokio::io::stdin(), tokio::io::stdout()). This is a classic "cat" example.

@LucioFranco
Copy link

@stjepang this is some great feedback! I will make sure we incorporate a lot of these ideas into tokio 0.3. Will follow up with a proper issue in tokio. Thanks!

@tikue
Copy link
Collaborator

tikue commented Aug 18, 2020

This is something tarpc doesn't handle very gracefully today: tokio is an optional feature, and yet tarpc uses tokio timers regardless of whether the tokio feature is enabled. Is it possible to set up just the tokio timer without setting up a full tokio runtime?

@LucioFranco
Copy link

@tikue you need something to drive the timer, in tokio we use epoll_timeout/sleep to achieve this. So you need to run the driver anyways. This is the case for every runtime. futures-timer just spawns a background thread and uses a similar implementation.

@stjepang I've opened a bunch of issues in tokio that should cover some of this. Thanks again for the feedback!

@tikue
Copy link
Collaborator

tikue commented Aug 18, 2020

@LucioFranco yeah totally, I know there's no getting around that :) but is there a way to set up just the timer's required runtime rather than a full tokio runtime? Sorry if this is a dumb question; I've just kinda ignored the runtime details since #[tokio::main] was introduced :)

@LucioFranco
Copy link

@tikue ah yes! If you look at that example I use enable_all you can instead just enable the timer via https://docs.rs/tokio/0.2.22/tokio/runtime/struct.Builder.html#method.enable_time

@tikue
Copy link
Collaborator

tikue commented Aug 18, 2020

Nice, thanks!

@stevefan1999-personal
Copy link
Contributor

stevefan1999-personal commented May 21, 2022

I've experimented with replacing something like Tokio MPSC into futures and flume, but so far it fails the test by exceeding the deadline. Yet to have the idea why

Anyway, if you are interested checkout https://github.com/stevefan1999-personal/tarpc/tree/patch-no-tokio

@SteveLauC
Copy link

Is it possible to add an abstraction layer for the runtime? Openraft does it in this way:

pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + OptionalSync + 'static {
    /// The error type of [`Self::JoinHandle`].
    type JoinError: Debug + Display + OptionalSend;

    /// The return type of [`Self::spawn`].
    type JoinHandle<T: OptionalSend + 'static>: Future<Output = Result<T, Self::JoinError>>
        + OptionalSend
        + OptionalSync
        + Unpin;

    /// The type that enables the user to sleep in an asynchronous runtime.
    type Sleep: Future<Output = ()> + OptionalSend + OptionalSync;

    /// A measurement of a monotonically non-decreasing clock.
    type Instant: Instant;

    /// The timeout error type.
    type TimeoutError: Debug + Display + OptionalSend;

    /// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
    /// to await the outcome of a [`Future`].
    type Timeout<R, T: Future<Output = R> + OptionalSend>: Future<Output = Result<R, Self::TimeoutError>> + OptionalSend;

    /// Type of a thread-local random number generator.
    type ThreadLocalRng: rand::Rng;

    /// Spawn a new task.
    fn spawn<T>(future: T) -> Self::JoinHandle<T::Output>
    where
        T: Future + OptionalSend + 'static,
        T::Output: OptionalSend + 'static;

    /// Wait until `duration` has elapsed.
    fn sleep(duration: Duration) -> Self::Sleep;

    /// Wait until `deadline` is reached.
    fn sleep_until(deadline: Self::Instant) -> Self::Sleep;

    /// Require a [`Future`] to complete before the specified duration has elapsed.
    fn timeout<R, F: Future<Output = R> + OptionalSend>(duration: Duration, future: F) -> Self::Timeout<R, F>;

    /// Require a [`Future`] to complete before the specified instant in time.
    fn timeout_at<R, F: Future<Output = R> + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout<R, F>;

    /// Check if the [`Self::JoinError`] is `panic`.
    fn is_panic(join_error: &Self::JoinError) -> bool;

    /// Get the random number generator to use for generating random numbers.
    ///
    /// # Note
    ///
    /// This is a per-thread instance, which cannot be shared across threads or
    /// sent to another thread.
    fn thread_rng() -> Self::ThreadLocalRng;

    type Mpsc: Mpsc;

    type MpscUnbounded: MpscUnbounded;

    type Watch: Watch;

    type Oneshot: Oneshot;

    type Mutex<T: OptionalSend + 'static>: Mutex<T>;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants