-
Notifications
You must be signed in to change notification settings - Fork 256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: rumqttc publish subscribe API design #774
Comments
We could make use of something like a let publish = RequestBuilder::publish()
.topic("topic")
.qos(QoS::AtleastOnce)
.payload(b"1231231312")
.retain()
.properties(_) // v5 only
.build();
client.send(publish).await;
|
I'm assuming it returns
reasoning was:
|
It's not a good idea to go forward with using |
Technically , QoS and retain etc. aren't Request params, we use Request as a way to communicate with Eventloop, it's not in standards right? As per MQTT Standards, term Topic Filters is used in subscriptions.
And Application Message for Publish,
When you mention request, we are referring to
Agree, but what will be the return type from that builder, which we pass down to Returning the packet itself might be confusing for users as it will have placeholder pkid, actually pkid will be allocated only after/during call to |
what about something like this ( based on above suggestions @de-sh ): let sub: SubscribeRequest = RequestBuilder::subscribe("topic", QoS::AtLeastOnce)
// v5 only
.preserve_retain()
.nolocal()
.retain_forward_rule(RetainForwardRule::Never)
.build();
// build multiple SubscribeRequest if you want to subscribe_many
client.subscribe(sub)?;
let pub: PublishRequest = RequestBuilder::publish("topic", QoS::AtLeastOnce)
.payload("")
.retain()
.build();
client.publish(pub)?;
for i in 1..=10 {
// no need to clone, you can construct new request here!
let mut p = pub.clone();
p.payload(vec![1; i]);
client.publish(p)?;
} btw, can there be better type than ^ that is one of the reason I find // can use just Filter::new(topic, qos) if don't need to set v5 options!
let sub: Filter = Filter::builder("topic", QoS::AtLeastOnce) // FilterBuilder
// v5 only
.preserve_retain()
.nolocal()
.retain_forward_rule(RetainForwardRule::Never)
.build();
client.subscribe(sub)?;
let pub: Message = Message::builder("topic", QoS::AtLeastOnce) // MessageBuilder
.payload("")
.retain()
.build();
client.publish(pub)?;
for i in 1..=10 {
// no need to clone, you can construct new request here!
let mut p = pub.clone();
p.payload(vec![1; i]);
client.publish(p)?;
} |
Why not the following: let sub: Subscribe = Subscribe::builder("topic", QoS::AtLeastOnce) // SubscriptionBuilder
// v5 only
.preserve_retain()
.nolocal()
.retain_forward_rule(RetainForwardRule::Never)
.build();
client.subscribe(sub)?; |
because:
|
So if I am not wrong, a |
in protocol level as well? We might wanna be more inline with the terms as per standards right? |
note: after implementing, and trying out from users pov, I think builders are overkill and are introducing unnecessary extra types. what can we do instead: // Message { .. } constructor can be used if u want all fields
let mut message = Message::new("topic", QoS::AtMostOnce);
// set the fields if required
message.payload = "payload";
client.publish(message);
// same with Filter!
let mut filter = Filter::new("filter", QoS::AtMostOnce);
filter.preserve_retain = true;
client.subscribe(filter) client can subscribe to a filter and client can publish a message Filters were already exposed to the users, and were used by them, so it won't be a change. Only new type is to summarize changes:
|
I get your argument, yet the protocol description distinguishes between topic filters and subscription. The subscription is a transfer contract between the client and programmer in how it will forward the filters to the broker, the filter on the other hand is a contract between the broker and the client that broker will forward all messages(with mentioned QoS, etc.) to the client on said topic filter. Similarly publish is a contract between client and programmer that the message will be delivered to the broker with said QoS while the message itself is the payload contained within and not the whole package it gets transported in. |
ah gotcha! Subscription == Filter + Options! Will try to replace what about the |
as you mentioned here:
and as in standards:
technically, let subscription = Subscription::new("topic/filter", qos);
client.subscribe(subscription); |
Paho also uses the same terminologies, they send a |
We can continue to call it Publish and give it a publish builder, looks like eclipse/paho is using the naive understanding of the terms but naming is always the toughest part of writing software, so let's tread carefully on this one, will ensure the codebase is readable.
Yes, that's what I meant by subscription being a contract between client and broker to forward message, the subscription is created on the basis of a request that contains the filter along with a set of other params, i.e. QoS, etc. So going by the wording, a subscription request is the combined product of a set [filter, QoS and other parameters], the subscription requests are packaged together along with a set of properties(not necessary) to create the MQTT subscription packet that is transfered over the network. |
I have made the changes ( I already had them, so just pushed haha ) as per the originally proposed naming on
Definitely agree on trading this carefully on this so our codebase is readable and our APIs are user friendly! TBH, to me Filter/Message feels comfortable to use / maintain while PublishBuilder / Subscription introduces much more additional overhead both to users and readability, but that might just be due to some biases 😅 let's also have some more people share their opinions / views on this? wdyt? |
Hello, Thanks for your work @swanandx , I'll give some though on the changes. PublishI like the new I'm doing the same on our wrapper around pub async fn publish(&self, msg: Message) -> Result<(), Error> {
// Current code
self.inner.publih(msg.topic(), msg.qos().into(), false, msg.payload()).await?
// New code
self.inner.publish(msg.into()).await? // We just need to add a `From<Message> for rumqtt::Message`
} I also like the idea to have a // Not sure if the `build()` step can fail or not ? Maybe for topic validation as you said.
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().build()?; The state machine for the The only mandatory fields are The stateDiagram-v2
[*] --> New
New --> Payload
New --> Build
New --> Retain
Payload --> Build
Payload --> Retain
Retain --> Build
Build --> [*]
Thus, all the possibilities are: let message = Message::new("topic", QoS::AtMostOnce).build()?;
let message = Message::new("topic", QoS::AtMostOnce).retain().build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().build()?; For stateDiagram-v2
[*] --> New
New --> Payload
New --> Build
New --> Retain
New --> Property
Payload --> Build
Payload --> Retain
Payload --> Property
Retain --> Build
Retain --> Property
Property --> Property
Property --> Build
Build --> [*]
Thus, it will leads to: // Previous cases +
let message = Message::new("topic", QoS::AtMostOnce).property(...).build()?;
let message = Message::new("topic", QoS::AtMostOnce).retain().property(...).build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").property().build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().property().build()?;
// And you can chain properties
let message = Message::new("topic", QoS::AtMostOnce)
.payload("")
.retain()
.property(...)
.property(...)
.build()?; SubscribeFor the
The Builder patternI think it's better to use |
I've an idea, it might be weird, but I think it's worth a proposition. What if the Let me show you what I mean. PublishWe want to first create an immutable 1. Sending the
|
Can you make a draft PR of the branch I'll gladly review the code even if it's a WIP. |
Thank you so much for the feedback @benjamin-nw ! 🚀 💯 here are my thoughts on it:
Instead of a builder, where you would need to call let message = Message::new(..).payload("...");
// here payload fn will look something like
impl Message {
fn payload(mut self, payload: _) -> Self {
self.payload = payload;
self
}
} reason:
note:
if currently, instead of putting properties in message, we pass them to different functions: client.publish(message);
client.publish_with_properties(message, properties); // v5 only do you think adding them to Initially I didn't add them due to thinking like we are publishing the message with properties, the message itself won't have the properties right? and we needed to pass
we can work around it without need for sending payload as parameter, like shown in examples: async fn requests(client: AsyncClient) {
let filter = Filter::new("hello/world", QoS::AtMostOnce);
client.subscribe(filter).await.unwrap();
let mut message = Message::new("hello/world", QoS::ExactlyOnce);
for i in 1..=10 {
message.payload = vec![1; i];
client.publish(message.clone()).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}
Topic seems like just an wrapper around client, so more like an additional abstraction, which we can consider to be out of scope for this RFC right?
I will do it once we have finalized the basic design from the RFC 💯 for now, have a look here: main...rumqttc-pub-sub-api-revamp CI running on PRs whenever I push a commit kinda annoys me haha ( jk 😜 ) Thanks for the review! :) |
Hi, happy new year 🎉 !
Yeah I think it's a good trade-off between having a new type for a Builder and having a good user experience for handling
Agreed, or we can
I think it's a bad idea to have We'll only need
For this one, I'm not sure, we don't use properties. But keeping another function like
Yes, we can keep them separated. It make more senses to have the properties outside the
For this, I'm not sure. Is it expensive to If we are using We need to check which one is cheaper in term of memory usage, and which one is simpler to use between: async fn requests(client: AsyncClient) {
let filter = Filter::new("hello/world", QoS::AtMostOnce);
client.subscribe(filter).await.unwrap();
let mut message = Message::new("hello/world", QoS::ExactlyOnce);
for i in 1..=10 {
message.payload = vec![1; i];
client.publish(message.clone()).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
} or async fn requests(client: AsyncClient) {
let filter = Filter::new("hello/world", QoS::AtMostOnce);
client.subscribe(filter).await.unwrap();
let mut message = Message::new("hello/world", QoS::ExactlyOnce);
for i in 1..=10 {
let payload = vec![1; i];
client.publish(&message, payload).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}
I agree and I don't like the way paho implements this wrapper either. So this is out of scope 👍🏻
Got it 👍🏻 will look at the branch for now. |
Hey, Happy New Year! 🥳 🎆 Thanks for the detailed reply!
If we pass the topic as reference to avoid cloning, it will be actually of type Payload being the part of note: passing payload as parameter, would allow fns like
Even though I was prioritizing simplicity here ( and to tackle performance later ), I definitely agree with you, we should verify the perf impact! I'm positive that it will be negligible / none for most of the users :) |
Yup, we can do this then.
Oh, didn't know that, good for me 👍🏻
Technically yes because the
You mean as a public API ?
Full 👍🏻 on this. Let's make a simple yet useful API and tackle performance issues (if any) later. |
Yup, it is currently in API https://github.com/bytebeamio/rumqtt/blob/main/rumqttc%2Fsrc%2Fclient.rs#L132 but I removed it in this RFC due to |
This is RFC for API changes in
publish
/subscribe
in rumqttc. Feel free to comment and provide feedback, thanks!Publish
Message
Message
for publishingnote: topic will be verified in call to
publish()
. Shall we perform the topic verification logic while constructingMessage
?Subscribe
Filter
to subscribenote: where shall we verify the filter?
Notes
publish
andsubscribe
would returnpkid
( but up-to possibility of implementation )try_publish
,try_subscribe
,subscribe_many
etc. will be updated accordinglyThe text was updated successfully, but these errors were encountered: