Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC(rumqttc): publish / subscribe / unsubscribe methods return a promise that resolves into pkid when packet is handled by Eventloop #805

Open
de-sh opened this issue Feb 22, 2024 · 29 comments

Comments

@de-sh
Copy link
Contributor

de-sh commented Feb 22, 2024

This is the RFC for API changes in publish / subscribe / unsubscribe in rumqttc to resolve #349. Feel free to comment and provide feedback, thanks!

Example in the case of Publish

// async
let promise = async_client.publish(..).await?;
let pkid = promise.await;

// blocking
let promise = client.publish(..)?;
let pkid = promise.blocking_recv();

Within rumqttc this will be implemented similar to how oneshot channels work, i.e. here we will respond onto the oneshot channel with the pkid value. Also we need not go so far when we know that the publish is on QoS 0 or when pkid is non-zero by default maybe?

Please note that this is not a breaking change as users can just ignore/drop the promise token.

@de-sh de-sh changed the title RFC: rumqttc publish methods should return Pkid promise that resolves when packet is handled RFC: rumqttc publish / subscribe / unsubscribe methods should return a promise that resolves into the pkid when packet is handled by Eventloop Feb 22, 2024
@de-sh de-sh changed the title RFC: rumqttc publish / subscribe / unsubscribe methods should return a promise that resolves into the pkid when packet is handled by Eventloop RFC: rumqttc publish / subscribe / unsubscribe methods return a promise that resolves into pkid when packet is handled by Eventloop Feb 22, 2024
@de-sh de-sh changed the title RFC: rumqttc publish / subscribe / unsubscribe methods return a promise that resolves into pkid when packet is handled by Eventloop RFC(rumqttc): publish / subscribe / unsubscribe methods return a promise that resolves into pkid when packet is handled by Eventloop Feb 22, 2024
@swanandx
Copy link
Member

so something like, publish will return Pkid where:

struct Pkid {
  id_rx: Receiver<u32>,
}

impl Future for Pkid {
  type Output = u32;
  fn poll(..) -> .. {
    // if we recv something on -d_rx, return id
    // we will send on that channel for state.
  }
}

right?

so I am curious how are we going to send the Sender to state so we can use it for sending the pkid?

when pkid is non-zero by default maybe?

when is it non-zero by default?

as users can just ignore/drop the promise token.

that would mean the receiver is dropped as well. this would cause the Sender to fail!

@de-sh
Copy link
Contributor Author

de-sh commented Feb 22, 2024

impl Future for Pkid {
 type Output = u32;
 fn poll(..) -> .. {
   // if we recv something on -d_rx, return id
   // we will send on that channel for state.
 }
}

We can just use oneshot as is, no need of reimplementing

so I am curious how are we going to send the Sender to state so we can use it for sending the pkid?

Include it in the request?

when is it non-zero by default?

When the publish is edited before being sent, or in the case of uplink when it is read back from file, user can choose to do that, Eventloop will skip the pkid set stage in outgoing_*

that would mean the receiver is dropped as well. this would cause the Sender to fail!

Not in the case of oneshot, it might error out, but we need not care about this either ways from within Eventloop

@swanandx
Copy link
Member

Include it in the request?

gotcha. so unlike main...pkid-poc , by using the channels we are avoiding direct use of Arc<Mutex<_>> !

Not in the case of oneshot, it might error out,

that is what i meant by fail 😅

but we need not care about this either ways from within Eventloop

okie.

@dlips
Copy link

dlips commented Feb 22, 2024

Hi, I am one of the possible users of this functionality from #349 :) Thanks for bringing this up again

Could you elaborate on the usage pattern for the returned promises in a multi-message scenario?

For a single message, it makes sense to wait for the return of the pkid and then continue. In case that I have a high channel capacity and an limited inflight queue, then the task using the client would block/wait at high load peaks, where the event loop would need some time to process the message. Sure, I could spawn a task that listens to the one-shot channel and after receiving the pkid it passes it on to an mpsc that gathers all the pkids.

Also the processing of the promise is a bit time critical because if I do it to late, the one-shot channel would still have a pkid value, whose PubAck was already processed by the eventloop.

Sorry that I don't have any constructive feedback, just dropping some thoughts from the user perspective

@de-sh
Copy link
Contributor Author

de-sh commented Feb 22, 2024

Could you elaborate on the usage pattern for the returned promises in a multi-message scenario?

  1. We could do a lot of things here when considering async code, but not much when it comes to sync code.
  2. One could use the joinset.join_next() to stream pkids

@dlips
Copy link

dlips commented Feb 22, 2024

joinset.join_next() is exactly what I was looking for. Missed it when looking through the tokio docs. That would solve my problem. Thank you :)

@de-sh
Copy link
Contributor Author

de-sh commented Feb 22, 2024

Should this feature be feature gated to ensure we don't inadvertently make life worse for other users? I am not entirely sure how memory/cpu usage would increase, maybe a good idea to figure out with some profiling @swanandx

@cartertinney
Copy link

cartertinney commented Mar 27, 2024

@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library.

@de-sh
Copy link
Contributor Author

de-sh commented Mar 28, 2024

@de-sh Do we have a timeline for this inclusion? I need to be able to get pkid in order to wait for puback, in order to use this library.

We need a code review on the PR mentioned above to continue with this

@de-sh
Copy link
Contributor Author

de-sh commented Mar 28, 2024

Adding more thoughts on this:

Maybe it is a good idea to have this same mechanism return the error responses relating to the publish packet so that the client side can keep track of the errors as well?

enum Error {
  Mqtt(mqttbytes::Error),
  PacketIdZero,
  EmptySubscription,
  OutgoingPacketTooLarge { .. },
}

type RequestResult = Result<Pkid, Error>;

@cartertinney
Copy link

cartertinney commented Mar 28, 2024

@de-sh

Another question regarding the proposal - is there a guarantee that the pkid will be returned to the user before the associated PubAck can arrive? I've used other MQTT libraries before where by the time the id is returned, the puback may have already arrived, which makes matching the ids difficult, since you have to handle the case where you may receive a puback for an id that you are not yet tracking in your application.

Although, admittedly, even if we can guarantee that the pkid will be returned prior to the actual publish packet being sent, there is still a race condition for the asynchronous application (can the application track the id returned by the PkidPromise before the puback gets received)

One solution I've seen before that sidesteps this is to have publish return a future for the eventual ack, but that probably clashes with the polling receive pattern on the event loop in rumqttc. The intended design appears to be that since the user is controlling the event loop, they're responsible for distributing information received on it, which does makes sense, but unfortunately does leave puback matching to be a rather unpleasant experience.

@de-sh
Copy link
Contributor Author

de-sh commented Mar 31, 2024

(can the application track the id returned by the PkidPromise before the puback gets received)

While there might be a theoretical possibility of the puback being received faster than the pkid being resolved on the client side, it is practically impossible(afaik). Thus it should be fine to consider this circumstance as an extreme case?

I need to be able to get pkid in order to wait for puback, in order to use this library.

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

@cartertinney
Copy link

cartertinney commented Apr 1, 2024

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

The issue I'm trying to solve here is an application with distributed functionality - the main event loop and connection is handled/managed in a different place than the actual MQTT operations take place. For a simplified, but illustrative example, consider that publish logic is happening in a different thread/task. The publishing thread/task naturally wants to wait upon a PUBACK, but the PUBACK is actually received by the event loop in the main thread, thus there needs to be some kind concurrency solution to communicate this information.

This is further confounded if there are multiple publishers in multiple threads. The central event loop managing logic has no idea which publisher to deliver PUBACK notifications to, because the PKID of any given outgoing publish is only known to the publishing threads - the event loop can see it happen with an outgoing event, but it doesn't know which copy of the Client the publish came from. Thus, when the PUBACK comes in with a matching PKID, the main thread does not know which copy of the Client to send the PUBACK notification to. The only way around this that I've been able to think of, is that the publishing thread/task needs to communicate to the central thread which PKID it sent, so the central thread can send back the corresponding PUBACK notification accordingly. My concern is that this process may allow in some cases a PUBACK to be able to arrive before all the relevant information to handle it has been communicated and tracked.

In other libraries, I've seen this issue resolved with a publish returning a Future that can be awaited upon the eventual PUBACK, but I can appreciate that that might be somewhat against your design goals here.

@dlips
Copy link

dlips commented Apr 2, 2024

Maybe what we should do is setup a future that resolves on EventLoop receiving the associated puback(qos1) and pubcomp(qos2) when a publish is sent? can we define the usecase with more detail?

My motivation to get the PkIds was also to track which messages I sent got a PubAck from the MQTT Broker. So having the Future resolve after the PubAck was received instead of returning the PkId would simplify my use case because I do not have to manually write the code that correlates the returned PkIds with the PubAck events from the EventLoop.

@cartertinney
Copy link

cartertinney commented Apr 2, 2024

For what it's worth, the current implementation on the pkid branch seems to stop the event loop reporting PubAck right now. I don't receive PubAck at all on this branch, but I do receive it when using the main release. So yeah, I can get the pkid now, but I still can't match it with a PubAck.

After experimenting with the pkid branch in general, honestly, I'm starting to feel that .publish() really does need to return a PubAck future - trying to build the mechanism to get and deliver it on top just doesn't scale well to a complex application.

@de-sh
Copy link
Contributor Author

de-sh commented Apr 9, 2024

Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome!

b6a45b7

@xiaocq2001
Copy link
Contributor

xiaocq2001 commented Apr 17, 2024

Made some changes to the design, but basically the same concept, please review the POC presented in the following commit(with example), comments on this are welcome!

b6a45b7

I put some comments in the POC, please check. I like the implement of get notified on acknowledgements instead of doing matching additional pkids.

Should you need any assistance, don't hesitate to reach out. I'm eagerly anticipating the integration of this feature into the main branch.

@xiaocq2001
Copy link
Contributor

I did some changes based on your POC and did some validation locally, I share the changes in PR here: #851, feel free to use them to in improvement feature or discuss.

@xiaocq2001
Copy link
Contributor

Given the successful POC and code reviews, I believe the ACK await feature is great. What are the next steps to ensure a smooth integration?

@de-sh
Copy link
Contributor Author

de-sh commented May 8, 2024

We are working on making time for the review(running low on bandwidth), post that we should be good to go. Would have been wonderful if there were more inputs along those lines from the community!

@xiaocq2001
Copy link
Contributor

xiaocq2001 commented May 23, 2024

According to the recent changes in #869, the notice struct needs update.

FixedBitSet helps to reduce memory usage comparing to vec, but is not suitable for the ack waiting, since ack notification needs packet linked internal struct instead of single bit. For ack waiting, the current implement of HashMap seems to be the better fit (to keep order of packets, LinkedHashMap).

Is there any suggestion on balance the needs?

@xiaocq2001
Copy link
Contributor

xiaocq2001 commented May 23, 2024

Another thing, while waiting ACKs, it's possible the connection is reconnected with broker CONNACK session present=0, in such case, the pending waitings should be terminated with some error (state error that session closed by broker).

@joshuachp
Copy link

joshuachp commented Jun 12, 2024

What do you think about making the NoticeTx public? This will make possible the construction of the NoticeFuture for test and mocking via mockall. This includes also the success and error methods.

@FSMaxB
Copy link
Contributor

FSMaxB commented Jun 20, 2024

I've rebased the acked branch onto main in #883.

Currently this conflicts with the FixedBitset changes as mentioned by @xiaocq2001 which means I had to revert some of it.

My idea on how to fix this such that only people who use this feature have to pay a performance penalty is like this:

Instead of always returning a NoticeFuture, instead allow anyone to create a NoticeTx and NoticeFuture pair and change the API so that instead of returning NoticeFuture they take a NoticeTx as parameter. Then also in the Mqtt state, keed the FixedBitSet plus additionaly a HashMap<u16, NoticeTx> but only populate that second one if a notification for the given message was actually requested (by passing a NoticeTx). This means that you don't have to pay any penalty when not using that feature (except for the occasional lookup in an empty HashMap, which should be fast).

@joshuachp
Copy link

I think another good addition would be a non consuming try_wait(&mut self) method on the NotifceFuture. This will make it possible to not block while trying to wait for the acknowledgement of multiple packets.

For example, you are storing the NoticeFutures in a Vec, and you want to find the first acknowledge packet.

This would be similar to the try_recv(&mut) of the inner channel.

One implementation could be:

diff --git a/rumqttc/src/notice.rs b/rumqttc/src/notice.rs
index 4818335..2142e81 100644
--- a/rumqttc/src/notice.rs
+++ b/rumqttc/src/notice.rs
@@ -53,6 +53,21 @@ pub fn wait(self) -> NoticeResult {
     pub async fn wait_async(self) -> NoticeResult {
         self.0.await?
     }
+
+    /// Attempts to check if the broker acknowledged the packet, without blocking the current thread
+    /// or consuming the notice.
+    ///
+    /// It will return [`None`] if the packet wasn't acknowledged.
+    ///
+    /// Multiple calls to this functions can fail with [`NoticeError::Recv`] if the notice was
+    /// already waited and the packet was already acknowledged and [`Some`] value was returned.
+    pub fn try_wait(&mut self) -> Option<NoticeResult> {
+        match self.0.try_recv() {
+            Ok(res) => Some(res),
+            Err(oneshot::error::TryRecvError::Closed) => Some(Err(NoticeError::Recv)),
+            Err(oneshot::error::TryRecvError::Empty) => None,
+        }
+    }
 }
 
 #[derive(Debug)]

Here, it returns an Option instead of an error for the oneshot::error::TryRecvError::Empty, just to not add another variant to the error.

@de-sh
Copy link
Contributor Author

de-sh commented Jul 10, 2024

I think another good addition would be a non consuming try_wait(&mut self) method on the NotifceFuture. This will make it possible to not block while trying to wait for the acknowledgement of multiple packets.

That's a great suggestion, thanks for the input!

@BjoernLange
Copy link

Having this feature would be really helpful for my use-case. How is the progress? Is there already a version that I can try out? Thank you!

@KillingJacky
Copy link

KillingJacky commented Sep 25, 2024

I've rebased the acked branch onto main in #883.

Currently this conflicts with the FixedBitset changes as mentioned by @xiaocq2001 which means I had to revert some of it.

My idea on how to fix this such that only people who use this feature have to pay a performance penalty is like this:

Instead of always returning a NoticeFuture, instead allow anyone to create a NoticeTx and NoticeFuture pair and change the API so that instead of returning NoticeFuture they take a NoticeTx as parameter. Then also in the Mqtt state, keed the FixedBitSet plus additionaly a HashMap<u16, NoticeTx> but only populate that second one if a notification for the given message was actually requested (by passing a NoticeTx). This means that you don't have to pay any penalty when not using that feature (except for the occasional lookup in an empty HashMap, which should be fast).

I'm one of the people that don't care the slight performance loss :) And I don't think using HashMap would ruin badly on the performance or resource consumption, because generally we wouldn't configure the Q depth of the eventloop too big (currently I'm using 100 for cap in my project), before the feature of saving current state into disk / cache system was implemented. Instead, I would manage the outgoing message Q with a more reliable way out of the mqtt client scope if I have the packet acked mechanism, so that outgoing messages won't get lost if there's an unexpected shutdown on rumqtt client.

Just put a vote on moving this forward, firstly make this feature usable, then we would have a long time window to tune the performance in parallel with features like this https://github.com/bytebeamio/rumqtt/blob/main/rumqttc/design.md#disk-aware-request-stream being implemented.

@FSMaxB
Copy link
Contributor

FSMaxB commented Sep 26, 2024

Note that I won't have any time to continue working on this since at work it was reverted as a failed experiment since there were other issues with the unreleased version of rumqttc which I didn't have enough time to look into.

I'm on your side regarding the performance loss, but the FixedBitSet changes were merged in the first place, so there obviously are people that do care about that, which is why I was considering a solution that allows keeping it. But just going back to hash maps would make implementing this way easier for sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants