Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async #49

Merged
merged 119 commits into from
May 11, 2024
Merged

Support async #49

merged 119 commits into from
May 11, 2024

Conversation

rustworthy
Copy link
Collaborator

@rustworthy rustworthy commented Jan 17, 2024

As per issue

Concerns:

  • Cannot use serde_json::to_writer when in async fn issue of the AsyncFaktoryCommand, so serializing into a vector and only then async writing to the buffered stream. I've read through this discussion, looks like there is simply no much interest in this feature.

Performance:
In release mode, loadtest from main and loadtest from current branch are showing similar results (10-11k jobs per second on Intel Core i5-10210U, os: Garuda Linux with 6.7.0-zen3-1-zen) [I should probably get a new machine], with async version being 5-10% less performant, which should be attributed to the async runtime overhead. There is no chance the async capabilities will compensate for this overhead in the loadtest, since the job handler in this binary will not do any IO intensive tasks, where async bindings will shine (sidenote: the business logic of a job handler has never been of any particular interest in the crate's load tests, it is rather the general throughput which is being measured in loadtest).

TODO:


This change is Reviewable

Copy link

codecov bot commented Jan 17, 2024

Codecov Report

Attention: Patch coverage is 58.27273% with 459 lines in your changes are missing coverage. Please review.

Project coverage is 66.2%. Comparing base (ff2a27b) to head (cf3fb4e).

Additional details and impacted files
Files Coverage Δ
src/proto/client/options.rs 100.0% <100.0%> (ø)
src/worker/builder.rs 100.0% <100.0%> (ø)
src/error.rs 42.3% <0.0%> (-1.7%) ⬇️
src/proto/single/mod.rs 94.1% <94.4%> (+0.1%) ⬆️
src/proto/batch/mod.rs 91.6% <62.5%> (+19.0%) ⬆️
src/proto/mod.rs 50.0% <50.0%> (-20.4%) ⬇️
src/worker/state.rs 93.1% <93.1%> (ø)
src/proto/single/cmd.rs 94.9% <91.0%> (-1.9%) ⬇️
src/proto/single/ent/progress.rs 0.0% <0.0%> (ø)
src/proto/single/resp.rs 87.5% <90.9%> (+2.7%) ⬆️
... and 14 more

... and 2 files with indirect coverage changes

@rustworthy rustworthy force-pushed the feat/async-support branch 2 times, most recently from 8a0a693 to a12e38a Compare January 27, 2024 08:04
.callbacks
.get(job.kind())
.ok_or(Failed::BadJobType(job.kind().to_string()))?;
let exe_result = tokio::spawn(handler(job)).await.expect("joined ok");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to spawn this just to immediately .await it.

@jonhoo
Copy link
Owner

jonhoo commented Jan 28, 2024

Thanks for taking this on! I actually think we should go one step further and fully get rid of the sync version. I don't think there's all that much value in keeping both at the same time, and it adds significant maintenance complexity burden to the crate.

@rustworthy
Copy link
Collaborator Author

Thanks for taking this on! I actually think we should go one step further and fully get rid of the sync version. I don't think there's all that much value in keeping both at the same time, and it adds significant maintenance complexity burden to the crate.

My pleasure! I structured it this way so that to easily put it behind an async feature flag, but yes - this would mean maintenance overhead which I though one could ease a little bit with the mirrored project structure in the src/async.
I will then "merge" the async version with the original one, switching to async completely.

@jonhoo
Copy link
Owner

jonhoo commented Feb 4, 2024

Yeah, I see the temptation, but honestly, I think it'll be both easier, and easier to review, to just swap the whole thing over in one go 👍


[dev-dependencies]
mockstream = "0.0.3"
Copy link
Collaborator Author

@rustworthy rustworthy Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the repo looks abandoned which is strange since those mocks are so useful

@justindthomas
Copy link

Thank you for working on this, @rustworthy! I'm currently struggling to use this library efficiently to process tasks for an async Rocket application and this improvement will be a big help.

Copy link
Collaborator Author

@rustworthy rustworthy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems so 😄

Reviewable status: 44 of 59 files reviewed, 14 unresolved discussions (waiting on @jonhoo)


Cargo.toml line 46 at r10 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Yay for cargo hack!

I'm totally on board with supporting both. That's the same thing we ended up doing in the imap crate over in this huge PR. The design we landed on there may end up being a useful reference.

I've looked through the crate. We are taking a little bit different approach here. Instead if having the option as part of the ClientBuilder, we are making a TlsStream construct (with two different back-ends) available to the user, which they then can use to connect_with. As for the TlsError, we are following here the approach similar to IMAP crate.

Also I am not apposed to extending a ClientBuilder in a way that enables TLS, but this looks like a subject of dedicated PR. This change will not be breaking.

P.S.: I see you are using sync methods in rust-imap 😉


src/error.rs line 174 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

nit: typo: rustls

done.


src/lib.rs line 93 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Should this also have #[doc(cfg(..))]?

You probably mean #[cfg_attr(docsrs, doc(cfg(...)))] and if so - we already have this in here so that the docs look like:

image.png


src/proto/single/cmd.rs line 56 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Yep, I wonder if we can do the same thing for JobId here as we did for BatchId? But happy to do that in a follow-up PR if you'd prefer.

Yeah, I'd prefer to do this with a chaser.


src/proto/single/id.rs line 11 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

In general, internal Clone calls are frowned upon in Rust. I'd probably take impl Into<String> here instead of AsRef<str> + Clone + Display.

Done.


src/proto/single/id.rs line 30 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I don't think you need this — isn't it generally implied that every type is AsRef<Self>?

The compile is not happy then:
image.png


src/proto/single/mod.rs line 264 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

nit: let's move this use to the top of the file

Done.


src/proto/single/mod.rs line 294 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

nit: you could impl PartialEq<str> for JobId so that they can be directly compared against strings.

Done.

Indeed, this now looks much nicer:

 assert!(&job.jid != "");

src/tls.rs line 94 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Ah, so, connect doesn't require a &'static str, it just requires a value that is 'static (i.e., is owned). And String is. So you can construct a DomainName<'static> from a String using DomainName::try_from(the_string).

Updated!


src/worker/builder.rs line 13 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I could go either way. If you think it works out nicer in the overall docs to stick with E, then let's do that 👍

I really like it the way it is - Elike a Neptune's trident 😄


src/worker/builder.rs line 84 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I still think it's pretty awkward to have to declare the type for the closure's arguments — people aren't very used to having to do that with closures, and I worry that they'll instead just get stuck and not know what to do. I suppose we could document it, but providing a method that just takes care of it for them (register_fn) feels like the nicest still in my opinion. If you feel strongly though, I'm okay with moving to just register (+ docs).

I just wanted to make sure that was not only about the change being breaking, but also the question of ergonomics. Keeping both register and register_fn.


src/worker/health.rs line 12 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Yep, that looks right! Does removing the AsyncBufRead bound on Client allow you to remove it here as well (and keep just AsyncWrite + Unpin)?

Already, or mean a different place?


src/worker/health.rs line 24 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

We should make this a permanlink so that it continues pointing to the right line in that file even if it changes in the future. Just press the 'y' key on your keyboard while you're on the page and GitHub should change the URL to a permalink one with the commit hash in it.

Also, I think in Markdown you're supposed to put links instead <> for them to render reliably as actual links.

Done.


src/worker/mod.rs line 117 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Ah, I assume this is because connect consumes self rather than taking &mut self? Should we change the builder methods to all return Self instead of &mut Self then to allow this kind of chaining?

Beautiful, isn't it?

image.png


src/worker/runner.rs line 92 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Not sure what you mean by "userland" here? It's usually used to refer to something that isn't happening in the kernel, but that doesn't apply here I don't think?

I mentioned it a few lines later like:

/// The `repr(transparent)` macro is to guarantee that this single-field struct
/// and the wrapped handler have the same layout and so it is safe to operate on
/// the in-memory representations of _the_ handler (submitted to us
/// from the userland) and its enclosed (by us) self.

But I see what you mean, the word is "reserved". I'll use the from the user code.

@rustworthy rustworthy requested a review from jonhoo April 28, 2024 12:34
Copy link
Owner

@jonhoo jonhoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh boy, we're so close!

Reviewed 17 of 17 files at r13, all commit messages.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @rustworthy)


Cargo.toml line 46 at r10 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

I've looked through the crate. We are taking a little bit different approach here. Instead if having the option as part of the ClientBuilder, we are making a TlsStream construct (with two different back-ends) available to the user, which they then can use to connect_with. As for the TlsError, we are following here the approach similar to IMAP crate.

Also I am not apposed to extending a ClientBuilder in a way that enables TLS, but this looks like a subject of dedicated PR. This change will not be breaking.

P.S.: I see you are using sync methods in rust-imap 😉

I wonder whether instead of having an enum we can use dynamic dispatch here instead. That would make it easier to extend with other backends later should we wish, and also allows other things than just TLS to be used. Will likely also be faster over-all. But I don't feel too strongly about this one :)


Cargo.toml line 67 at r13 (raw file):

# TryFrom<String> for ServerName<'static> has been implemented:
# https://github.com/rustls/pki-types/compare/rustls:3793627...rustls:1303efa#
rustls-pki-types = { version = "1.0.1", optional = true }

I don't think you need this since we already specify 1.0.1 in the dependency listing above.


src/proto/single/id.rs line 30 at r12 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

The compile is not happy then:
image.png

Oh, interesting, I guess this is due to a shortcoming of the Rust type system as documented here: https://doc.rust-lang.org/std/convert/trait.AsRef.html#reflexivity


src/proto/single/mod.rs line 294 at r12 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

Done.

Indeed, this now looks much nicer:

 assert!(&job.jid != "");

this will also produce nicer errors:

Code snippet:

assert_ne!(&job.jid, "");

src/worker/health.rs line 12 at r11 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

Already, or mean a different place?

I meant here — this still has S: AsyncBufRead + AsyncWrite + Reconnect + Send + Unpin + 'static


src/worker/mod.rs line 117 at r11 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

Beautiful, isn't it?

image.png

Nice!


src/worker/runner.rs line 92 at r12 (raw file):
I guess what I mean is that the most important aspect (i.e., first line) here for this type is, in my opinion, something like:

A closure that implements [JobRunner].

So I'd probably make that be the first line.


src/worker/runner.rs line 39 at r13 (raw file):

/// };
///
/// let mut w = WorkerBuilder::default()

By the way, a common way to construct builders is to have this:

impl Worker {
    pub fn builder() -> WorkerBuilder { WorkerBuilder::default() }
}

That way this can become let mut w = Worker::builder().register(...)

Copy link
Collaborator Author

@rustworthy rustworthy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already feel like I am being captured by this closure 😆

Reviewable status: 49 of 59 files reviewed, 7 unresolved discussions (waiting on @jonhoo)


Cargo.toml line 46 at r10 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I wonder whether instead of having an enum we can use dynamic dispatch here instead. That would make it easier to extend with other backends later should we wish, and also allows other things than just TLS to be used. Will likely also be faster over-all. But I don't feel too strongly about this one :)

If we are talking about error enum, I agree that the name can be more generic and updated it to be Stream with a doc comment that this is the place where errors from underlying non-standard streams will live, whether they are TLS or any other protocol that may be introduced in the future. And going for enum rather than a generic Box<dyn impl Into<TslError> simply means being consistent with the rest of the code base.

As for the speed, I hope we are always talking about marginal diff and we will need to measure it anyhow to be able to claim performance increase.


After a while. Now that I've played a bit the IMAP crate, I feel like you might be talking about something else, like Client<Box<dyn Connection>> and Session<Box<dyn Connection>>. If you feel like something is sub-optimal in the current implementation and is worth investing more time, let's improve it, but I suggest doing so in a dedicated PR, if needed.


Cargo.toml line 67 at r13 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I don't think you need this since we already specify 1.0.1 in the dependency listing above.

I added this as a dev dep (we use pki-types in e2e tests) already after this pin had been added, and cargo knew to add the pinned version 1.0.1.

Indeed, removing the pin now will make the min versions tests pass, but this looks for like a happy chance? Say, if we do not longer use rustls-pki-types as a dev dep, the tests will not pass.


src/proto/single/id.rs line 30 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Oh, interesting, I guess this is due to a shortcoming of the Rust type system as documented here: https://doc.rust-lang.org/std/convert/trait.AsRef.html#reflexivity

Does not look like a big deal except for this part:

Note, however, that not all types from std contain such an implementation, 
and those cannot be added by external code due to orphan rules

src/proto/single/mod.rs line 294 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

this will also produce nicer errors:

... which we will never witness I hope 😉

updated!


src/worker/health.rs line 12 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I meant here — this still has S: AsyncBufRead + AsyncWrite + Reconnect + Send + Unpin + 'static

AsyncBufRead is needed to be able to self.c.heartbeat. But Reconnect not needed indeed. The minimal bound will be:

where
    S: AsyncBufRead + AsyncWrite + Send + Unpin,
    E: StdError,

But I removed excessive AsyncBufRead from ReadToken:

pub struct ReadToken<'a, S>(pub(crate) &'a mut Client<S>)
where
    S: AsyncWrite + Unpin + Send;

src/worker/runner.rs line 92 at r12 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I guess what I mean is that the most important aspect (i.e., first line) here for this type is, in my opinion, something like:

A closure that implements [JobRunner].

So I'd probably make that be the first line.

Done


src/worker/runner.rs line 39 at r13 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

By the way, a common way to construct builders is to have this:

impl Worker {
    pub fn builder() -> WorkerBuilder { WorkerBuilder::default() }
}

That way this can become let mut w = Worker::builder().register(...)

We are already using this approach with Job and Batch

In Workers case things are not that pretty though. Have a look.

impl<S: AsyncWrite + Send + Unpin, E> Worker<S, E> {
    /// Creates an ergonomic constructor for a new [`Worker`].
    ///
    /// Also equivalent to [`WorkerBuilder::default`].
    pub fn builder() -> WorkerBuilder<E> {
        WorkerBuilder::default()
    }
}

In the "worst" case scenario, we will need to declare our intentions like so:

let mut w = Worker::<tokio::net::TcpStream, std::io::Error>::builder()

If we are registering a handler with a concrete error type (which we normally are) then this can of course be simplified to:

let mut w = Worker::<tokio::net::TcpStream, _>::builder()

But I am still not sure I find this ergonomic, but wdyt?

@rustworthy rustworthy requested a review from jonhoo May 4, 2024 08:06
Copy link
Owner

@jonhoo jonhoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost nothing left!

Reviewed 10 of 10 files at r14, all commit messages.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @rustworthy)


Cargo.toml line 46 at r10 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

If we are talking about error enum, I agree that the name can be more generic and updated it to be Stream with a doc comment that this is the place where errors from underlying non-standard streams will live, whether they are TLS or any other protocol that may be introduced in the future. And going for enum rather than a generic Box<dyn impl Into<TslError> simply means being consistent with the rest of the code base.

As for the speed, I hope we are always talking about marginal diff and we will need to measure it anyhow to be able to claim performance increase.


After a while. Now that I've played a bit the IMAP crate, I feel like you might be talking about something else, like Client<Box<dyn Connection>> and Session<Box<dyn Connection>>. If you feel like something is sub-optimal in the current implementation and is worth investing more time, let's improve it, but I suggest doing so in a dedicated PR, if needed.

I was actually thinking of going even further by removing the generic parameter from Client entirely and just have it hold a Box<dyn Connection> internally. But agree, we can do that in a follow-up!


Cargo.toml line 67 at r13 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

I added this as a dev dep (we use pki-types in e2e tests) already after this pin had been added, and cargo knew to add the pinned version 1.0.1.

Indeed, removing the pin now will make the min versions tests pass, but this looks for like a happy chance? Say, if we do not longer use rustls-pki-types as a dev dep, the tests will not pass.

Sure, that's fine, then let's leave it in 👍


src/error.rs line 45 at r14 (raw file):

    Serialization(#[source] serde_json::Error),

    /// Indicates an error in the underlying non-standard stream, e.g. TSL stream.

nit: TLS, not TSL ;)


src/error.rs line 172 at r14 (raw file):

    #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))]
    #[error("underlying tls stream")]
    NativeTLS(#[source] tokio_native_tls::native_tls::Error),

I think clippy will complain about this name, since it's pretty firmly in the camp of requiring NativeTls


src/proto/client/ent.rs line 66 at r14 (raw file):

            return Ok(Some(result));
        }
        match bid_read_res.unwrap_err() {

Wouldn't this be better as a match bid_read_res rather than an if let Ok followed by a unwrap_err?


src/proto/single/cmd.rs line 56 at r11 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

Yeah, I'd prefer to do this with a chaser.

👍 Are you keeping a list? :p


src/proto/single/ent/mod.rs line 130 at r9 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

Those are actually changes in chrono rather than time. I thought TimeDelta is a construct from time crate, but it is not. This is a new name for Duration, which makes perfect sense since it supports negative values, delta - is a better naming.

Those try_<unit> methods have been added in September 2023 and released in [email protected]

Sounds like we should add another thing to the -Zminimal-versions dep section in Cargo.toml to help with that maybe?


src/worker/health.rs line 12 at r11 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

AsyncBufRead is needed to be able to self.c.heartbeat. But Reconnect not needed indeed. The minimal bound will be:

where
    S: AsyncBufRead + AsyncWrite + Send + Unpin,
    E: StdError,

But I removed excessive AsyncBufRead from ReadToken:

pub struct ReadToken<'a, S>(pub(crate) &'a mut Client<S>)
where
    S: AsyncWrite + Unpin + Send;

Excellent! I do wonder whether we could avoid the need for AsyncBufRead for heartbeat, but that's more of a curiosity than a necessity


src/worker/runner.rs line 39 at r13 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

We are already using this approach with Job and Batch

In Workers case things are not that pretty though. Have a look.

impl<S: AsyncWrite + Send + Unpin, E> Worker<S, E> {
    /// Creates an ergonomic constructor for a new [`Worker`].
    ///
    /// Also equivalent to [`WorkerBuilder::default`].
    pub fn builder() -> WorkerBuilder<E> {
        WorkerBuilder::default()
    }
}

In the "worst" case scenario, we will need to declare our intentions like so:

let mut w = Worker::<tokio::net::TcpStream, std::io::Error>::builder()

If we are registering a handler with a concrete error type (which we normally are) then this can of course be simplified to:

let mut w = Worker::<tokio::net::TcpStream, _>::builder()

But I am still not sure I find this ergonomic, but wdyt?

Ah, yes, that's pretty awkward. I think the way I've seen this worked around in the past is to add the builder method to something like

impl Worker<tokio::net::TcpStream, ()> {
  pub fn builder<E>() -> WorkerBuilder<E> {
  }
}

because there's no requirement that the impl type actually matches what the builder ultimately produces. If you find it too awkward though, I'm okay leaving it as-is.


src/worker/runner.rs line 96 at r14 (raw file):

}

/// A closure that implements [JobRunner].

nit:

[`JobRunner`]

Copy link
Collaborator Author

@rustworthy rustworthy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 49 of 59 files reviewed, 7 unresolved discussions (waiting on @jonhoo)


Cargo.toml line 46 at r10 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I was actually thinking of going even further by removing the generic parameter from Client entirely and just have it hold a Box<dyn Connection> internally. But agree, we can do that in a follow-up!

Added to the list


src/error.rs line 45 at r14 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

nit: TLS, not TSL ;)

this is how I encrypted the protocol name itself 😆


src/error.rs line 172 at r14 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

I think clippy will complain about this name, since it's pretty firmly in the camp of requiring NativeTls

Looks like it is not complaining, but I still updated those names


src/proto/client/ent.rs line 66 at r14 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Wouldn't this be better as a match bid_read_res rather than an if let Ok followed by a unwrap_err?

It was to avoid indenting twice, but I am actually ok with both options. Updated


src/proto/single/cmd.rs line 56 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

👍 Are you keeping a list? :p

used to keep in memory but now flushed it 😄


src/proto/single/ent/mod.rs line 130 at r9 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Sounds like we should add another thing to the -Zminimal-versions dep section in Cargo.toml to help with that maybe?

This is already solved, and yes - by pinning chrono at 0.4.32 in Cargo.toml. But while we were working on this PR things have changed. Please read further.

I initially put it here JFYI, because this is a really interesting case where the job with updated deps will fail because a method is declared deprecated there (or even removed) and we are encouraged to use a new one, while the job with minimal versions will fail because this new method in not there just yet.

So the chrono::Duration::seconds(n) was announced deprecated with this commit in this 0.4.35 release because it is panicking inside, while the one which we are encouraged to use and which is not panicking rather returning us a result to handle chrono::Duration::try_seconds(n) was only released with chrono v.0.4.32.

It tuned out though, that in 0.4.36 release in late March this deprecation was reverted. And so now we are ok again, using those panicking methods. Ok - because we are only using those to construct TimeDelta (a.k.a Duration) for testing purposes. And we can removing the chrono pinning at 0.4.32 for now. 😅


src/worker/health.rs line 12 at r11 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Excellent! I do wonder whether we could avoid the need for AsyncBufRead for heartbeat, but that's more of a curiosity than a necessity

I am not sure we can. In Worker::heartbeat we are doing single::read_json which internally calls the RESP read which in its turn wants to read_line from the stream. read_line is implemented in AsyncBufReadExt which requires the thing to impl AsyncBufRead. This is how we end up with it being a bound on Worker::listen_for_heartbeat. Luckily this is not part of the crate's public API.


src/worker/runner.rs line 39 at r13 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Ah, yes, that's pretty awkward. I think the way I've seen this worked around in the past is to add the builder method to something like

impl Worker<tokio::net::TcpStream, ()> {
  pub fn builder<E>() -> WorkerBuilder<E> {
  }
}

because there's no requirement that the impl type actually matches what the builder ultimately produces. If you find it too awkward though, I'm okay leaving it as-is.

I think this unit type is beautiful. Updated.

In case the error type cannot be inferred, we will specify like so (just like we used to do with WorkerBuilder::default() in such cases):

 let w = Worker::builder::<io::Error>()
        .register_fn("never_called", |_| async move { unreachable!() })
        .connect(None)
        .await
        .unwrap();

While the happy scenario is when compiler can infer the type

    let mut worker = Worker::builder()
        .labels(vec!["rust".into(), local.into()])
        .workers(1)
        .wid(WorkerId::random())
        .register_fn("order", move |job| async move {
            ...
            Ok::<(), io::Error>(())
        })
        .register_fn("image", |_| async move { unreachable!() })
        .connect(None)
        .await
        .unwrap();

I updated the docs and also made a couple of tests use Worker::builder instead of WorkerBuilder::default


src/worker/runner.rs line 96 at r14 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

nit:

[`JobRunner`]

Done

@rustworthy rustworthy requested a review from jonhoo May 9, 2024 15:09
Copy link
Owner

@jonhoo jonhoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one nit left, and it's non-blocking, so approving 🎉
Thank you so much for sticking with this. We can do the nit in a follow-up, and so I'll merge this once CI is happy :D

Reviewed 7 of 7 files at r15, 3 of 3 files at r16, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @rustworthy)


src/proto/client/ent.rs line 66 at r14 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

It was to avoid indenting twice, but I am actually ok with both options. Updated

Ah, but you don't have to. You can do:

match single::read_bid(&mut self.0.stream).await {
    Ok(bid) => Ok(Some(bid)),
    Err(Error::Protocol(error::Protocol::Internal { msg })) => {
	if msg.starts_with("No such batch") {
	    return Ok(None);
	}
	Err(error::Protocol::Internal { msg }.into())
    },
    Err(another) => Err(another),
}

it's a nit though, so won't block merging on this 👍


src/proto/single/cmd.rs line 56 at r11 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

used to keep in memory but now flushed it 😄

Hah, nice, thank you!


src/proto/single/ent/mod.rs line 130 at r9 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

This is already solved, and yes - by pinning chrono at 0.4.32 in Cargo.toml. But while we were working on this PR things have changed. Please read further.

I initially put it here JFYI, because this is a really interesting case where the job with updated deps will fail because a method is declared deprecated there (or even removed) and we are encouraged to use a new one, while the job with minimal versions will fail because this new method in not there just yet.

So the chrono::Duration::seconds(n) was announced deprecated with this commit in this 0.4.35 release because it is panicking inside, while the one which we are encouraged to use and which is not panicking rather returning us a result to handle chrono::Duration::try_seconds(n) was only released with chrono v.0.4.32.

It tuned out though, that in 0.4.36 release in late March this deprecation was reverted. And so now we are ok again, using those panicking methods. Ok - because we are only using those to construct TimeDelta (a.k.a Duration) for testing purposes. And we can removing the chrono pinning at 0.4.32 for now. 😅

Oh wow, well, glad it came full circle and we can leave it as it was! Thanks for keeping tabs on it :)


src/worker/health.rs line 12 at r11 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

I am not sure we can. In Worker::heartbeat we are doing single::read_json which internally calls the RESP read which in its turn wants to read_line from the stream. read_line is implemented in AsyncBufReadExt which requires the thing to impl AsyncBufRead. This is how we end up with it being a bound on Worker::listen_for_heartbeat. Luckily this is not part of the crate's public API.

Yep, seems reasonable, thanks for exploring!


src/worker/runner.rs line 39 at r13 (raw file):

Previously, rustworthy (Pavieł Michalkievič) wrote…

I think this unit type is beautiful. Updated.

In case the error type cannot be inferred, we will specify like so (just like we used to do with WorkerBuilder::default() in such cases):

 let w = Worker::builder::<io::Error>()
        .register_fn("never_called", |_| async move { unreachable!() })
        .connect(None)
        .await
        .unwrap();

While the happy scenario is when compiler can infer the type

    let mut worker = Worker::builder()
        .labels(vec!["rust".into(), local.into()])
        .workers(1)
        .wid(WorkerId::random())
        .register_fn("order", move |job| async move {
            ...
            Ok::<(), io::Error>(())
        })
        .register_fn("image", |_| async move { unreachable!() })
        .connect(None)
        .await
        .unwrap();

I updated the docs and also made a couple of tests use Worker::builder instead of WorkerBuilder::default

Excellent, looks great!

@jonhoo jonhoo merged commit eac9545 into jonhoo:main May 11, 2024
16 of 19 checks passed
@jonhoo
Copy link
Owner

jonhoo commented May 11, 2024

And it's in! 🎉
Now for all the follow-up PRs 😅

@jonhoo
Copy link
Owner

jonhoo commented May 12, 2024

@rustworthy Which of the PRs you have open do you think I should review next?

Copy link
Collaborator Author

@rustworthy rustworthy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonhoo I will need to do some extra work on the two open PRs in this repo, so I've just turned them into drafts.

But I got one open in the IMAP - just an e2e example for outlook oauth2.0

Reviewable status: all files reviewed, 1 unresolved discussion


src/proto/client/ent.rs line 66 at r14 (raw file):

Previously, jonhoo (Jon Gjengset) wrote…

Ah, but you don't have to. You can do:

match single::read_bid(&mut self.0.stream).await {
    Ok(bid) => Ok(Some(bid)),
    Err(Error::Protocol(error::Protocol::Internal { msg })) => {
	if msg.starts_with("No such batch") {
	    return Ok(None);
	}
	Err(error::Protocol::Internal { msg }.into())
    },
    Err(another) => Err(another),
}

it's a nit though, so won't block merging on this 👍

Adding to the list.

@rustworthy
Copy link
Collaborator Author

@rustworthy Which of the PRs you have open do you think I should review next?

Both #57 and #59 are now ready for review

@rustworthy rustworthy mentioned this pull request Aug 18, 2024
4 tasks
@rustworthy rustworthy deleted the feat/async-support branch October 19, 2024 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants