-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support async #49
Support async #49
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
|
150cda3
to
d4fe2e0
Compare
8a0a693
to
a12e38a
Compare
src/async/consumer/mod.rs
Outdated
.callbacks | ||
.get(job.kind()) | ||
.ok_or(Failed::BadJobType(job.kind().to_string()))?; | ||
let exe_result = tokio::spawn(handler(job)).await.expect("joined ok"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need to spawn
this just to immediately .await
it.
Thanks for taking this on! I actually think we should go one step further and fully get rid of the sync version. I don't think there's all that much value in keeping both at the same time, and it adds significant maintenance complexity burden to the crate. |
My pleasure! I structured it this way so that to easily put it behind an |
Yeah, I see the temptation, but honestly, I think it'll be both easier, and easier to review, to just swap the whole thing over in one go 👍 |
|
||
[dev-dependencies] | ||
mockstream = "0.0.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the repo looks abandoned which is strange since those mocks are so useful
85f4886
to
9fdcd85
Compare
This reverts commit 7ab8ab3.
Thank you for working on this, @rustworthy! I'm currently struggling to use this library efficiently to process tasks for an async Rocket application and this improvement will be a big help. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems so 😄
Reviewable status: 44 of 59 files reviewed, 14 unresolved discussions (waiting on @jonhoo)
Cargo.toml
line 46 at r10 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Yay for
cargo hack
!I'm totally on board with supporting both. That's the same thing we ended up doing in the
imap
crate over in this huge PR. The design we landed on there may end up being a useful reference.
I've looked through the crate. We are taking a little bit different approach here. Instead if having the option as part of the ClientBuilder
, we are making a TlsStream
construct (with two different back-ends) available to the user, which they then can use to connect_with
. As for the TlsError
, we are following here the approach similar to IMAP crate.
Also I am not apposed to extending a ClientBuilder
in a way that enables TLS, but this looks like a subject of dedicated PR. This change will not be breaking.
P.S.: I see you are using sync methods in rust-imap 😉
src/error.rs
line 174 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
nit: typo: rustls
done.
src/lib.rs
line 93 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Should this also have
#[doc(cfg(..))]
?
You probably mean #[cfg_attr(docsrs, doc(cfg(...)))]
and if so - we already have this in here so that the docs look like:
src/proto/single/cmd.rs
line 56 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Yep, I wonder if we can do the same thing for
JobId
here as we did forBatchId
? But happy to do that in a follow-up PR if you'd prefer.
Yeah, I'd prefer to do this with a chaser.
src/proto/single/id.rs
line 11 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
In general, internal
Clone
calls are frowned upon in Rust. I'd probably takeimpl Into<String>
here instead ofAsRef<str> + Clone + Display
.
Done.
src/proto/single/id.rs
line 30 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I don't think you need this — isn't it generally implied that every type is
AsRef<Self>
?
The compile is not happy then:
src/proto/single/mod.rs
line 264 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
nit: let's move this
use
to the top of the file
Done.
src/proto/single/mod.rs
line 294 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
nit: you could
impl PartialEq<str> for JobId
so that they can be directly compared against strings.
Done.
Indeed, this now looks much nicer:
assert!(&job.jid != "");
src/tls.rs
line 94 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Ah, so,
connect
doesn't require a&'static str
, it just requires a value that is'static
(i.e., is owned). AndString
is. So you can construct aDomainName<'static>
from aString
usingDomainName::try_from(the_string)
.
Updated!
src/worker/builder.rs
line 13 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I could go either way. If you think it works out nicer in the overall docs to stick with
E
, then let's do that 👍
I really like it the way it is - E
like a Neptune's trident 😄
src/worker/builder.rs
line 84 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I still think it's pretty awkward to have to declare the type for the closure's arguments — people aren't very used to having to do that with closures, and I worry that they'll instead just get stuck and not know what to do. I suppose we could document it, but providing a method that just takes care of it for them (
register_fn
) feels like the nicest still in my opinion. If you feel strongly though, I'm okay with moving to justregister
(+ docs).
I just wanted to make sure that was not only about the change being breaking, but also the question of ergonomics. Keeping both register
and register_fn
.
src/worker/health.rs
line 12 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Yep, that looks right! Does removing the
AsyncBufRead
bound onClient
allow you to remove it here as well (and keep justAsyncWrite + Unpin
)?
Already, or mean a different place?
src/worker/health.rs
line 24 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
We should make this a permanlink so that it continues pointing to the right line in that file even if it changes in the future. Just press the 'y' key on your keyboard while you're on the page and GitHub should change the URL to a permalink one with the commit hash in it.
Also, I think in Markdown you're supposed to put links instead
<>
for them to render reliably as actual links.
Done.
src/worker/mod.rs
line 117 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Ah, I assume this is because
connect
consumesself
rather than taking&mut self
? Should we change the builder methods to all returnSelf
instead of&mut Self
then to allow this kind of chaining?
Beautiful, isn't it?
src/worker/runner.rs
line 92 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Not sure what you mean by "userland" here? It's usually used to refer to something that isn't happening in the kernel, but that doesn't apply here I don't think?
I mentioned it a few lines later like:
/// The `repr(transparent)` macro is to guarantee that this single-field struct
/// and the wrapped handler have the same layout and so it is safe to operate on
/// the in-memory representations of _the_ handler (submitted to us
/// from the userland) and its enclosed (by us) self.
But I see what you mean, the word is "reserved". I'll use the from the user code
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh boy, we're so close!
Reviewed 17 of 17 files at r13, all commit messages.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @rustworthy)
Cargo.toml
line 46 at r10 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
I've looked through the crate. We are taking a little bit different approach here. Instead if having the option as part of the
ClientBuilder
, we are making aTlsStream
construct (with two different back-ends) available to the user, which they then can use toconnect_with
. As for theTlsError
, we are following here the approach similar to IMAP crate.Also I am not apposed to extending a
ClientBuilder
in a way that enables TLS, but this looks like a subject of dedicated PR. This change will not be breaking.P.S.: I see you are using sync methods in rust-imap 😉
I wonder whether instead of having an enum
we can use dynamic dispatch here instead. That would make it easier to extend with other backends later should we wish, and also allows other things than just TLS to be used. Will likely also be faster over-all. But I don't feel too strongly about this one :)
Cargo.toml
line 67 at r13 (raw file):
# TryFrom<String> for ServerName<'static> has been implemented: # https://github.com/rustls/pki-types/compare/rustls:3793627...rustls:1303efa# rustls-pki-types = { version = "1.0.1", optional = true }
I don't think you need this since we already specify 1.0.1
in the dependency listing above.
src/proto/single/id.rs
line 30 at r12 (raw file):
Oh, interesting, I guess this is due to a shortcoming of the Rust type system as documented here: https://doc.rust-lang.org/std/convert/trait.AsRef.html#reflexivity
src/proto/single/mod.rs
line 294 at r12 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
Done.
Indeed, this now looks much nicer:
assert!(&job.jid != "");
this will also produce nicer errors:
Code snippet:
assert_ne!(&job.jid, "");
src/worker/health.rs
line 12 at r11 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
Already, or mean a different place?
I meant here — this still has S: AsyncBufRead + AsyncWrite + Reconnect + Send + Unpin + 'static
src/worker/mod.rs
line 117 at r11 (raw file):
Nice!
src/worker/runner.rs
line 92 at r12 (raw file):
I guess what I mean is that the most important aspect (i.e., first line) here for this type is, in my opinion, something like:
A closure that implements [
JobRunner
].
So I'd probably make that be the first line.
src/worker/runner.rs
line 39 at r13 (raw file):
/// }; /// /// let mut w = WorkerBuilder::default()
By the way, a common way to construct builders is to have this:
impl Worker {
pub fn builder() -> WorkerBuilder { WorkerBuilder::default() }
}
That way this can become let mut w = Worker::builder().register(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already feel like I am being captured by this closure 😆
Reviewable status: 49 of 59 files reviewed, 7 unresolved discussions (waiting on @jonhoo)
Cargo.toml
line 46 at r10 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I wonder whether instead of having an
enum
we can use dynamic dispatch here instead. That would make it easier to extend with other backends later should we wish, and also allows other things than just TLS to be used. Will likely also be faster over-all. But I don't feel too strongly about this one :)
If we are talking about error enum, I agree that the name can be more generic and updated it to be Stream
with a doc comment that this is the place where errors from underlying non-standard streams will live, whether they are TLS or any other protocol that may be introduced in the future. And going for enum rather than a generic Box<dyn impl Into<TslError>
simply means being consistent with the rest of the code base.
As for the speed, I hope we are always talking about marginal diff and we will need to measure it anyhow to be able to claim performance increase.
After a while. Now that I've played a bit the IMAP crate, I feel like you might be talking about something else, like Client<Box<dyn Connection>>
and Session<Box<dyn Connection>>
. If you feel like something is sub-optimal in the current implementation and is worth investing more time, let's improve it, but I suggest doing so in a dedicated PR, if needed.
Cargo.toml
line 67 at r13 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I don't think you need this since we already specify
1.0.1
in the dependency listing above.
I added this as a dev dep (we use pki-types in e2e tests) already after this pin had been added, and cargo knew to add the pinned version 1.0.1.
Indeed, removing the pin now will make the min versions tests pass, but this looks for like a happy chance? Say, if we do not longer use rustls-pki-types
as a dev dep, the tests will not pass.
src/proto/single/id.rs
line 30 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Oh, interesting, I guess this is due to a shortcoming of the Rust type system as documented here: https://doc.rust-lang.org/std/convert/trait.AsRef.html#reflexivity
Does not look like a big deal except for this part:
Note, however, that not all types from std contain such an implementation,
and those cannot be added by external code due to orphan rules
src/proto/single/mod.rs
line 294 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
this will also produce nicer errors:
... which we will never witness I hope 😉
updated!
src/worker/health.rs
line 12 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I meant here — this still has
S: AsyncBufRead + AsyncWrite + Reconnect + Send + Unpin + 'static
AsyncBufRead is needed to be able to self.c.heartbeat. But Reconnect not needed indeed. The minimal bound will be:
where
S: AsyncBufRead + AsyncWrite + Send + Unpin,
E: StdError,
But I removed excessive AsyncBufRead
from ReadToken
:
pub struct ReadToken<'a, S>(pub(crate) &'a mut Client<S>)
where
S: AsyncWrite + Unpin + Send;
src/worker/runner.rs
line 92 at r12 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I guess what I mean is that the most important aspect (i.e., first line) here for this type is, in my opinion, something like:
A closure that implements [
JobRunner
].So I'd probably make that be the first line.
Done
src/worker/runner.rs
line 39 at r13 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
By the way, a common way to construct builders is to have this:
impl Worker { pub fn builder() -> WorkerBuilder { WorkerBuilder::default() } }That way this can become
let mut w = Worker::builder().register(...)
We are already using this approach with Job
and Batch
In Worker
s case things are not that pretty though. Have a look.
impl<S: AsyncWrite + Send + Unpin, E> Worker<S, E> {
/// Creates an ergonomic constructor for a new [`Worker`].
///
/// Also equivalent to [`WorkerBuilder::default`].
pub fn builder() -> WorkerBuilder<E> {
WorkerBuilder::default()
}
}
In the "worst" case scenario, we will need to declare our intentions like so:
let mut w = Worker::<tokio::net::TcpStream, std::io::Error>::builder()
If we are registering a handler with a concrete error type (which we normally are) then this can of course be simplified to:
let mut w = Worker::<tokio::net::TcpStream, _>::builder()
But I am still not sure I find this ergonomic, but wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost nothing left!
Reviewed 10 of 10 files at r14, all commit messages.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @rustworthy)
Cargo.toml
line 46 at r10 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
If we are talking about error enum, I agree that the name can be more generic and updated it to be
Stream
with a doc comment that this is the place where errors from underlying non-standard streams will live, whether they are TLS or any other protocol that may be introduced in the future. And going for enum rather than a genericBox<dyn impl Into<TslError>
simply means being consistent with the rest of the code base.As for the speed, I hope we are always talking about marginal diff and we will need to measure it anyhow to be able to claim performance increase.
After a while. Now that I've played a bit the IMAP crate, I feel like you might be talking about something else, like
Client<Box<dyn Connection>>
andSession<Box<dyn Connection>>
. If you feel like something is sub-optimal in the current implementation and is worth investing more time, let's improve it, but I suggest doing so in a dedicated PR, if needed.
I was actually thinking of going even further by removing the generic parameter from Client
entirely and just have it hold a Box<dyn Connection>
internally. But agree, we can do that in a follow-up!
Cargo.toml
line 67 at r13 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
I added this as a dev dep (we use pki-types in e2e tests) already after this pin had been added, and cargo knew to add the pinned version
1.0.1.
Indeed, removing the pin now will make the min versions tests pass, but this looks for like a happy chance? Say, if we do not longer use
rustls-pki-types
as a dev dep, the tests will not pass.
Sure, that's fine, then let's leave it in 👍
src/error.rs
line 45 at r14 (raw file):
Serialization(#[source] serde_json::Error), /// Indicates an error in the underlying non-standard stream, e.g. TSL stream.
nit: TLS, not TSL ;)
src/error.rs
line 172 at r14 (raw file):
#[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] #[error("underlying tls stream")] NativeTLS(#[source] tokio_native_tls::native_tls::Error),
I think clippy will complain about this name, since it's pretty firmly in the camp of requiring NativeTls
src/proto/client/ent.rs
line 66 at r14 (raw file):
return Ok(Some(result)); } match bid_read_res.unwrap_err() {
Wouldn't this be better as a match bid_read_res
rather than an if let Ok
followed by a unwrap_err
?
src/proto/single/cmd.rs
line 56 at r11 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
Yeah, I'd prefer to do this with a chaser.
👍 Are you keeping a list? :p
src/proto/single/ent/mod.rs
line 130 at r9 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
Those are actually changes in
chrono
rather thantime
. I thoughtTimeDelta
is a construct fromtime
crate, but it is not. This is a new name forDuration
, which makes perfect sense since it supports negative values, delta - is a better naming.Those
try_<unit>
methods have been added in September 2023 and released in [email protected]
Sounds like we should add another thing to the -Zminimal-versions
dep section in Cargo.toml
to help with that maybe?
src/worker/health.rs
line 12 at r11 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
AsyncBufRead is needed to be able to self.c.heartbeat. But Reconnect not needed indeed. The minimal bound will be:
where S: AsyncBufRead + AsyncWrite + Send + Unpin, E: StdError,But I removed excessive
AsyncBufRead
fromReadToken
:pub struct ReadToken<'a, S>(pub(crate) &'a mut Client<S>) where S: AsyncWrite + Unpin + Send;
Excellent! I do wonder whether we could avoid the need for AsyncBufRead
for heartbeat
, but that's more of a curiosity than a necessity
src/worker/runner.rs
line 39 at r13 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
We are already using this approach with
Job
andBatch
In
Worker
s case things are not that pretty though. Have a look.impl<S: AsyncWrite + Send + Unpin, E> Worker<S, E> { /// Creates an ergonomic constructor for a new [`Worker`]. /// /// Also equivalent to [`WorkerBuilder::default`]. pub fn builder() -> WorkerBuilder<E> { WorkerBuilder::default() } }In the "worst" case scenario, we will need to declare our intentions like so:
let mut w = Worker::<tokio::net::TcpStream, std::io::Error>::builder()If we are registering a handler with a concrete error type (which we normally are) then this can of course be simplified to:
let mut w = Worker::<tokio::net::TcpStream, _>::builder()But I am still not sure I find this ergonomic, but wdyt?
Ah, yes, that's pretty awkward. I think the way I've seen this worked around in the past is to add the builder
method to something like
impl Worker<tokio::net::TcpStream, ()> {
pub fn builder<E>() -> WorkerBuilder<E> {
}
}
because there's no requirement that the impl
type actually matches what the builder ultimately produces. If you find it too awkward though, I'm okay leaving it as-is.
src/worker/runner.rs
line 96 at r14 (raw file):
} /// A closure that implements [JobRunner].
nit:
[`JobRunner`]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 49 of 59 files reviewed, 7 unresolved discussions (waiting on @jonhoo)
Cargo.toml
line 46 at r10 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I was actually thinking of going even further by removing the generic parameter from
Client
entirely and just have it hold aBox<dyn Connection>
internally. But agree, we can do that in a follow-up!
Added to the list
src/error.rs
line 45 at r14 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
nit: TLS, not TSL ;)
this is how I encrypted the protocol name itself 😆
src/error.rs
line 172 at r14 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
I think clippy will complain about this name, since it's pretty firmly in the camp of requiring
NativeTls
Looks like it is not complaining, but I still updated those names
src/proto/client/ent.rs
line 66 at r14 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Wouldn't this be better as a
match bid_read_res
rather than anif let Ok
followed by aunwrap_err
?
It was to avoid indenting twice, but I am actually ok with both options. Updated
src/proto/single/cmd.rs
line 56 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
👍 Are you keeping a list? :p
used to keep in memory but now flushed it 😄
src/proto/single/ent/mod.rs
line 130 at r9 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Sounds like we should add another thing to the
-Zminimal-versions
dep section inCargo.toml
to help with that maybe?
This is already solved, and yes - by pinning chrono
at 0.4.32 in Cargo.toml
. But while we were working on this PR things have changed. Please read further.
I initially put it here JFYI, because this is a really interesting case where the job with updated deps will fail because a method is declared deprecated there (or even removed) and we are encouraged to use a new one, while the job with minimal versions will fail because this new method in not there just yet.
So the chrono::Duration::seconds(n)
was announced deprecated with this commit in this 0.4.35 release because it is panicking inside, while the one which we are encouraged to use and which is not panicking rather returning us a result to handle chrono::Duration::try_seconds(n)
was only released with chrono v.0.4.32.
It tuned out though, that in 0.4.36 release in late March this deprecation was reverted. And so now we are ok again, using those panicking methods. Ok - because we are only using those to construct TimeDelta (a.k.a Duration) for testing purposes. And we can removing the chrono pinning at 0.4.32 for now. 😅
src/worker/health.rs
line 12 at r11 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Excellent! I do wonder whether we could avoid the need for
AsyncBufRead
forheartbeat
, but that's more of a curiosity than a necessity
I am not sure we can. In Worker::heartbeat
we are doing single::read_json
which internally calls the RESP read
which in its turn wants to read_line
from the stream. read_line
is implemented in AsyncBufReadExt
which requires the thing to impl AsyncBufRead
. This is how we end up with it being a bound on Worker::listen_for_heartbeat
. Luckily this is not part of the crate's public API.
src/worker/runner.rs
line 39 at r13 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Ah, yes, that's pretty awkward. I think the way I've seen this worked around in the past is to add the
builder
method to something likeimpl Worker<tokio::net::TcpStream, ()> { pub fn builder<E>() -> WorkerBuilder<E> { } }because there's no requirement that the
impl
type actually matches what the builder ultimately produces. If you find it too awkward though, I'm okay leaving it as-is.
I think this unit type is beautiful. Updated.
In case the error type cannot be inferred, we will specify like so (just like we used to do with WorkerBuilder::default()
in such cases):
let w = Worker::builder::<io::Error>()
.register_fn("never_called", |_| async move { unreachable!() })
.connect(None)
.await
.unwrap();
While the happy scenario is when compiler can infer the type
let mut worker = Worker::builder()
.labels(vec!["rust".into(), local.into()])
.workers(1)
.wid(WorkerId::random())
.register_fn("order", move |job| async move {
...
Ok::<(), io::Error>(())
})
.register_fn("image", |_| async move { unreachable!() })
.connect(None)
.await
.unwrap();
I updated the docs and also made a couple of tests use Worker::builder
instead of WorkerBuilder::default
src/worker/runner.rs
line 96 at r14 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
nit:
[`JobRunner`]
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one nit left, and it's non-blocking, so approving 🎉
Thank you so much for sticking with this. We can do the nit in a follow-up, and so I'll merge this once CI is happy :D
Reviewed 7 of 7 files at r15, 3 of 3 files at r16, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @rustworthy)
src/proto/client/ent.rs
line 66 at r14 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
It was to avoid indenting twice, but I am actually ok with both options. Updated
Ah, but you don't have to. You can do:
match single::read_bid(&mut self.0.stream).await {
Ok(bid) => Ok(Some(bid)),
Err(Error::Protocol(error::Protocol::Internal { msg })) => {
if msg.starts_with("No such batch") {
return Ok(None);
}
Err(error::Protocol::Internal { msg }.into())
},
Err(another) => Err(another),
}
it's a nit though, so won't block merging on this 👍
src/proto/single/cmd.rs
line 56 at r11 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
used to keep in memory but now flushed it 😄
Hah, nice, thank you!
src/proto/single/ent/mod.rs
line 130 at r9 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
This is already solved, and yes - by pinning
chrono
at 0.4.32 inCargo.toml
. But while we were working on this PR things have changed. Please read further.I initially put it here JFYI, because this is a really interesting case where the job with updated deps will fail because a method is declared deprecated there (or even removed) and we are encouraged to use a new one, while the job with minimal versions will fail because this new method in not there just yet.
So the
chrono::Duration::seconds(n)
was announced deprecated with this commit in this 0.4.35 release because it is panicking inside, while the one which we are encouraged to use and which is not panicking rather returning us a result to handlechrono::Duration::try_seconds(n)
was only released with chrono v.0.4.32.It tuned out though, that in 0.4.36 release in late March this deprecation was reverted. And so now we are ok again, using those panicking methods. Ok - because we are only using those to construct TimeDelta (a.k.a Duration) for testing purposes. And we can removing the chrono pinning at 0.4.32 for now. 😅
Oh wow, well, glad it came full circle and we can leave it as it was! Thanks for keeping tabs on it :)
src/worker/health.rs
line 12 at r11 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
I am not sure we can. In
Worker::heartbeat
we are doingsingle::read_json
which internally calls the RESPread
which in its turn wants toread_line
from the stream.read_line
is implemented inAsyncBufReadExt
which requires the thing to implAsyncBufRead
. This is how we end up with it being a bound onWorker::listen_for_heartbeat
. Luckily this is not part of the crate's public API.
Yep, seems reasonable, thanks for exploring!
src/worker/runner.rs
line 39 at r13 (raw file):
Previously, rustworthy (Pavieł Michalkievič) wrote…
I think this unit type is beautiful. Updated.
In case the error type cannot be inferred, we will specify like so (just like we used to do with
WorkerBuilder::default()
in such cases):let w = Worker::builder::<io::Error>() .register_fn("never_called", |_| async move { unreachable!() }) .connect(None) .await .unwrap();While the happy scenario is when compiler can infer the type
let mut worker = Worker::builder() .labels(vec!["rust".into(), local.into()]) .workers(1) .wid(WorkerId::random()) .register_fn("order", move |job| async move { ... Ok::<(), io::Error>(()) }) .register_fn("image", |_| async move { unreachable!() }) .connect(None) .await .unwrap();I updated the docs and also made a couple of tests use
Worker::builder
instead ofWorkerBuilder::default
Excellent, looks great!
And it's in! 🎉 |
@rustworthy Which of the PRs you have open do you think I should review next? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonhoo I will need to do some extra work on the two open PRs in this repo, so I've just turned them into drafts.
But I got one open in the IMAP - just an e2e example for outlook oauth2.0
Reviewable status: all files reviewed, 1 unresolved discussion
src/proto/client/ent.rs
line 66 at r14 (raw file):
Previously, jonhoo (Jon Gjengset) wrote…
Ah, but you don't have to. You can do:
match single::read_bid(&mut self.0.stream).await { Ok(bid) => Ok(Some(bid)), Err(Error::Protocol(error::Protocol::Internal { msg })) => { if msg.starts_with("No such batch") { return Ok(None); } Err(error::Protocol::Internal { msg }.into()) }, Err(another) => Err(another), }it's a nit though, so won't block merging on this 👍
Adding to the list.
|
As per issue
Concerns:
serde_json::to_writer
when inasync fn issue
of theAsyncFaktoryCommand
, so serializing into a vector and only then async writing to the buffered stream. I've read through this discussion, looks like there is simply no much interest in this feature.Performance:
In release mode,
loadtest
from main andloadtest
from current branch are showing similar results (10-11k jobs per second on Intel Core i5-10210U, os: Garuda Linux with 6.7.0-zen3-1-zen) [I should probably get a new machine], with async version being 5-10% less performant, which should be attributed to the async runtime overhead. There is no chance the async capabilities will compensate for this overhead in the loadtest, since the job handler in this binary will not do any IO intensive tasks, where async bindings will shine (sidenote: the business logic of a job handler has never been of any particular interest in the crate's load tests, it is rather the general throughput which is being measured inloadtest
).TODO:
tokio_rustls
)args
public)This change is