-
Notifications
You must be signed in to change notification settings - Fork 256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC(rumqttc): publish
/ subscribe
/ unsubscribe
methods return a promise that resolves into pkid when packet is handled by Eventloop
#805
Comments
publish
/ subscribe
/ unsubscribe
methods should return a promise that resolves into the pkid when packet is handled by Eventloop
publish
/ subscribe
/ unsubscribe
methods should return a promise that resolves into the pkid when packet is handled by Eventloop
publish
/ subscribe
/ unsubscribe
methods return a promise that resolves into pkid when packet is handled by Eventloop
publish
/ subscribe
/ unsubscribe
methods return a promise that resolves into pkid when packet is handled by Eventloop
publish
/ subscribe
/ unsubscribe
methods return a promise that resolves into pkid when packet is handled by Eventloop
so something like, struct Pkid {
id_rx: Receiver<u32>,
}
impl Future for Pkid {
type Output = u32;
fn poll(..) -> .. {
// if we recv something on -d_rx, return id
// we will send on that channel for state.
}
} right? so I am curious how are we going to send the
when is it non-zero by default?
that would mean the receiver is dropped as well. this would cause the |
We can just use oneshot as is, no need of reimplementing
Include it in the request?
When the publish is edited before being sent, or in the case of uplink when it is read back from file, user can choose to do that,
Not in the case of oneshot, it might error out, but we need not care about this either ways from within |
gotcha. so unlike main...pkid-poc , by using the channels we are avoiding direct use of that is what i meant by fail 😅
okie. |
Hi, I am one of the possible users of this functionality from #349 :) Thanks for bringing this up again Could you elaborate on the usage pattern for the returned promises in a multi-message scenario? For a single message, it makes sense to wait for the return of the pkid and then continue. In case that I have a high channel capacity and an limited inflight queue, then the task using the client would block/wait at high load peaks, where the event loop would need some time to process the message. Sure, I could spawn a task that listens to the one-shot channel and after receiving the pkid it passes it on to an mpsc that gathers all the pkids. Also the processing of the promise is a bit time critical because if I do it to late, the one-shot channel would still have a pkid value, whose PubAck was already processed by the eventloop. Sorry that I don't have any constructive feedback, just dropping some thoughts from the user perspective |
|
|
Should this feature be feature gated to ensure we don't inadvertently make life worse for other users? I am not entirely sure how memory/cpu usage would increase, maybe a good idea to figure out with some profiling @swanandx |
@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library. |
We need a code review on the PR mentioned above to continue with this |
Adding more thoughts on this:
enum Error {
Mqtt(mqttbytes::Error),
PacketIdZero,
EmptySubscription,
OutgoingPacketTooLarge { .. },
}
type RequestResult = Result<Pkid, Error>; |
Another question regarding the proposal - is there a guarantee that the pkid will be returned to the user before the associated PubAck can arrive? I've used other MQTT libraries before where by the time the id is returned, the puback may have already arrived, which makes matching the ids difficult, since you have to handle the case where you may receive a puback for an id that you are not yet tracking in your application. Although, admittedly, even if we can guarantee that the pkid will be returned prior to the actual publish packet being sent, there is still a race condition for the asynchronous application (can the application track the id returned by the PkidPromise before the puback gets received) One solution I've seen before that sidesteps this is to have publish return a future for the eventual ack, but that probably clashes with the polling receive pattern on the event loop in rumqttc. The intended design appears to be that since the user is controlling the event loop, they're responsible for distributing information received on it, which does makes sense, but unfortunately does leave puback matching to be a rather unpleasant experience. |
While there might be a theoretical possibility of the puback being received faster than the pkid being resolved on the client side, it is practically impossible(afaik). Thus it should be fine to consider this circumstance as an extreme case?
Maybe what we should do is setup a future that resolves on |
The issue I'm trying to solve here is an application with distributed functionality - the main event loop and connection is handled/managed in a different place than the actual MQTT operations take place. For a simplified, but illustrative example, consider that publish logic is happening in a different thread/task. The publishing thread/task naturally wants to wait upon a PUBACK, but the PUBACK is actually received by the event loop in the main thread, thus there needs to be some kind concurrency solution to communicate this information. This is further confounded if there are multiple publishers in multiple threads. The central event loop managing logic has no idea which publisher to deliver PUBACK notifications to, because the PKID of any given outgoing publish is only known to the publishing threads - the event loop can see it happen with an outgoing event, but it doesn't know which copy of the Client the publish came from. Thus, when the PUBACK comes in with a matching PKID, the main thread does not know which copy of the Client to send the PUBACK notification to. The only way around this that I've been able to think of, is that the publishing thread/task needs to communicate to the central thread which PKID it sent, so the central thread can send back the corresponding PUBACK notification accordingly. My concern is that this process may allow in some cases a PUBACK to be able to arrive before all the relevant information to handle it has been communicated and tracked. In other libraries, I've seen this issue resolved with a publish returning a Future that can be awaited upon the eventual PUBACK, but I can appreciate that that might be somewhat against your design goals here. |
My motivation to get the PkIds was also to track which messages I sent got a PubAck from the MQTT Broker. So having the Future resolve after the PubAck was received instead of returning the PkId would simplify my use case because I do not have to manually write the code that correlates the returned PkIds with the PubAck events from the EventLoop. |
For what it's worth, the current implementation on the After experimenting with the pkid branch in general, honestly, I'm starting to feel that |
Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome! |
I put some comments in the POC, please check. I like the implement of get notified on acknowledgements instead of doing matching additional pkids. Should you need any assistance, don't hesitate to reach out. I'm eagerly anticipating the integration of this feature into the main branch. |
I did some changes based on your POC and did some validation locally, I share the changes in PR here: #851, feel free to use them to in improvement feature or discuss. |
Given the successful POC and code reviews, I believe the ACK await feature is great. What are the next steps to ensure a smooth integration? |
We are working on making time for the review(running low on bandwidth), post that we should be good to go. Would have been wonderful if there were more inputs along those lines from the community! |
According to the recent changes in #869, the notice struct needs update.
Is there any suggestion on balance the needs? |
Another thing, while waiting ACKs, it's possible the connection is reconnected with broker CONNACK session present=0, in such case, the pending waitings should be terminated with some error (state error that session closed by broker). |
What do you think about making the |
I've rebased the Currently this conflicts with the My idea on how to fix this such that only people who use this feature have to pay a performance penalty is like this: Instead of always returning a |
I think another good addition would be a non consuming For example, you are storing the This would be similar to the One implementation could be: diff --git a/rumqttc/src/notice.rs b/rumqttc/src/notice.rs
index 4818335..2142e81 100644
--- a/rumqttc/src/notice.rs
+++ b/rumqttc/src/notice.rs
@@ -53,6 +53,21 @@ pub fn wait(self) -> NoticeResult {
pub async fn wait_async(self) -> NoticeResult {
self.0.await?
}
+
+ /// Attempts to check if the broker acknowledged the packet, without blocking the current thread
+ /// or consuming the notice.
+ ///
+ /// It will return [`None`] if the packet wasn't acknowledged.
+ ///
+ /// Multiple calls to this functions can fail with [`NoticeError::Recv`] if the notice was
+ /// already waited and the packet was already acknowledged and [`Some`] value was returned.
+ pub fn try_wait(&mut self) -> Option<NoticeResult> {
+ match self.0.try_recv() {
+ Ok(res) => Some(res),
+ Err(oneshot::error::TryRecvError::Closed) => Some(Err(NoticeError::Recv)),
+ Err(oneshot::error::TryRecvError::Empty) => None,
+ }
+ }
}
#[derive(Debug)] Here, it returns an |
That's a great suggestion, thanks for the input! |
Having this feature would be really helpful for my use-case. How is the progress? Is there already a version that I can try out? Thank you! |
I'm one of the people that don't care the slight performance loss :) And I don't think using HashMap would ruin badly on the performance or resource consumption, because generally we wouldn't configure the Q depth of the eventloop too big (currently I'm using 100 for Just put a vote on moving this forward, firstly make this feature usable, then we would have a long time window to tune the performance in parallel with features like this https://github.com/bytebeamio/rumqtt/blob/main/rumqttc/design.md#disk-aware-request-stream being implemented. |
Note that I won't have any time to continue working on this since at work it was reverted as a failed experiment since there were other issues with the unreleased version of rumqttc which I didn't have enough time to look into. I'm on your side regarding the performance loss, but the |
This is the RFC for API changes in
publish
/subscribe
/unsubscribe
in rumqttc to resolve #349. Feel free to comment and provide feedback, thanks!Example in the case of Publish
Within rumqttc this will be implemented similar to how oneshot channels work, i.e. here we will respond onto the oneshot channel with the pkid value. Also we need not go so far when we know that the publish is on QoS 0 or when pkid is non-zero by default maybe?
Please note that this is not a breaking change as users can just ignore/drop the promise token.
The text was updated successfully, but these errors were encountered: