Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: rumqttc publish subscribe API design #774

Open
swanandx opened this issue Dec 19, 2023 · 24 comments
Open

RFC: rumqttc publish subscribe API design #774

swanandx opened this issue Dec 19, 2023 · 24 comments
Assignees

Comments

@swanandx
Copy link
Member

swanandx commented Dec 19, 2023

This is RFC for API changes in publish / subscribe in rumqttc. Feel free to comment and provide feedback, thanks!

Publish

  • Introduce new struct for constructing Message
  • use Message for publishing
let message = Message::new("topic", QoS::AtMostOnce)
    .payload("")
    .retain();

client.publish(message);
client.publish_with_properties(message, properties); // v5 only

note: topic will be verified in call to publish(). Shall we perform the topic verification logic while constructing Message?

Subscribe

  • Pass Filter to subscribe
let filter = Filter::new("filter", QoS::AtMostOnce)
    // v5 only options
    .preserve_retain()
    .nolocal()
    .retain_forward_rule(RetainForwardRule::Never);

client.subscribe(filter)
client.subscribe_with_properties(filter, properties) // v5 only

note: where shall we verify the filter?

Notes

  • Ideally, both publish and subscribe would return pkid ( but up-to possibility of implementation )
  • other fns like try_publish, try_subscribe, subscribe_many etc. will be updated accordingly
@swanandx swanandx self-assigned this Dec 19, 2023
@de-sh
Copy link
Contributor

de-sh commented Dec 20, 2023

We could make use of something like a RequestBuilder which will allow us to code as follows:

let publish = RequestBuilder::publish()
                       .topic("topic")
                       .qos(QoS::AtleastOnce)
                       .payload(b"1231231312")
                       .retain()
                       .properties(_) // v5 only
                       .build();

client.send(publish).await;

Why?
Because topic, qos, etc are part of the publish/subscription request and not the message/filter.

@swanandx
Copy link
Member Author

We could make use of something like a RequestBuilder

I'm assuming it returns Request::Publish(Publish) ( or subscribe ):

  • It would be hard to modify the request after build() and thus compromise the re-usability ( usually users would just change the payload )
  • we need to check type of request in send() to properly handle the pkid logic ( or have different publish() and subscribe() )
  • how would you specify multiple subscriptions in single packet? ( Each subscription includes a topic and a QoS level. )

Because topic, qos, etc are part of the publish/subscription request and not the message/filter.

reasoning was:

  • for subscriptions we call the topic as filters and use Filter internally ( as well as externally for subscribe_many ). So it wan't much big change
  • for publication, I wanted to use Topic but as every message can be published with different qos, I just named that struct Message

@de-sh
Copy link
Contributor

de-sh commented Dec 21, 2023

It's not a good idea to go forward with using Filter and Message as they aren't the requests themselves and QoS, retain etc are request params. Something like a Publish/SubscriptionBuilder seem to be the right way to go.

@swanandx
Copy link
Member Author

swanandx commented Dec 21, 2023

Technically , QoS and retain etc. aren't Request params, we use Request as a way to communicate with Eventloop, it's not in standards right?

As per MQTT Standards, term Topic Filters is used in subscriptions.

The Payload of a SUBSCRIBE packet contains a list of Topic Filters indicating the Topics to which the Client wants to subscribe

And Application Message for Publish,

A PUBLISH packet is sent from a Client to a Server or from a Server to a Client to transport an Application Message.

When you mention request, we are referring to Request used in rumqttc right? Or did I misunderstood something 😅

Something like a Publish/SubscriptionBuilder seem to be the right way to go.

Agree, but what will be the return type from that builder, which we pass down to client.publish(_) or client.subscribe(_)? If we can figure that out, it would be really great!

Returning the packet itself might be confusing for users as it will have placeholder pkid, actually pkid will be allocated only after/during call to publish()!

@swanandx
Copy link
Member Author

swanandx commented Dec 21, 2023

what about something like this ( based on above suggestions @de-sh ):

let sub: SubscribeRequest = RequestBuilder::subscribe("topic", QoS::AtLeastOnce)
    // v5 only
    .preserve_retain()
    .nolocal()
    .retain_forward_rule(RetainForwardRule::Never)
    .build();

// build multiple SubscribeRequest if you want to subscribe_many
client.subscribe(sub)?;


let pub: PublishRequest = RequestBuilder::publish("topic", QoS::AtLeastOnce)
    .payload("")
    .retain()
    .build();

client.publish(pub)?;

for i in 1..=10 {
    // no need to clone, you can construct new request here!
    let mut p = pub.clone();
    p.payload(vec![1; i]);
    client.publish(p)?;
}

btw, can there be better type than ..Request?

^ that is one of the reason I find Filter and Message more sensible names ( also explained in above comment)! preferably, I would like to stick with them:

// can use just Filter::new(topic, qos) if don't need to set v5 options!

let sub: Filter = Filter::builder("topic", QoS::AtLeastOnce) // FilterBuilder
    // v5 only
    .preserve_retain()
    .nolocal()
    .retain_forward_rule(RetainForwardRule::Never)
    .build();

client.subscribe(sub)?;

let pub: Message = Message::builder("topic", QoS::AtLeastOnce) // MessageBuilder
    .payload("")
    .retain()
    .build();

client.publish(pub)?;

for i in 1..=10 {
    // no need to clone, you can construct new request here!
    let mut p = pub.clone();
    p.payload(vec![1; i]);
    client.publish(p)?;
}

@de-sh
Copy link
Contributor

de-sh commented Dec 22, 2023

Why not the following:

let sub: Subscribe = Subscribe::builder("topic", QoS::AtLeastOnce) // SubscriptionBuilder
    // v5 only
    .preserve_retain()
    .nolocal()
    .retain_forward_rule(RetainForwardRule::Never)
    .build();

client.subscribe(sub)?;

@swanandx
Copy link
Member Author

swanandx commented Dec 22, 2023

Why not the following:

let sub: Subscribe = Subscribe::builder("topic", QoS::AtLeastOnce) // SubscriptionBuilder
    // v5 only
    .preserve_retain()
    .nolocal()
    .retain_forward_rule(RetainForwardRule::Never)
    .build();

client.subscribe(sub)?;

because:

  • a single Subscribe packet can have multiple filter & subscription options pairs, which we can't specify in this format. ( Subscription options include QoS, preserve_retain, etc. When we do subscribe_many(filters), a single Subscribe packet if created with those filters.
  • if we return Subscribe packet, users might refer to it's pkid, thinking it is final ( though we can avoid this part with docs or something, but still a footgun )

@de-sh
Copy link
Contributor

de-sh commented Dec 22, 2023

So if I am not wrong, a Subscribe packet can contain multiple Subscriptions for the same request? In that caseSubscription could just be used to replace Filter term. Plus we really don't need two methods against client, we could just impl From<Subscription> for Vec<Subscription> and pass both singleton and list to the same method as illustrated in this code sample.

@swanandx
Copy link
Member Author

In that case Subscription could just be used to replace Filter term

in protocol level as well? We might wanna be more inline with the terms as per standards right?

@swanandx
Copy link
Member Author

swanandx commented Dec 22, 2023

note: after implementing, and trying out from users pov, I think builders are overkill and are introducing unnecessary extra types. what can we do instead:

// Message { .. } constructor can be used if u want all fields
let mut message = Message::new("topic", QoS::AtMostOnce);
// set the fields if required
message.payload = "payload";

client.publish(message);

// same with Filter!
let mut filter = Filter::new("filter", QoS::AtMostOnce);

filter.preserve_retain = true;

client.subscribe(filter)

client can subscribe to a filter and client can publish a message

Filters were already exposed to the users, and were used by them, so it won't be a change. Only new type is Message!

to summarize changes:

  • subscribe() takes Filter instead of string & qos ( similar to how it is with subscribe_many() currently )
  • publish() takes Message instead of all arguments

@de-sh
Copy link
Contributor

de-sh commented Dec 22, 2023

I get your argument, yet the protocol description distinguishes between topic filters and subscription.

The subscription is a transfer contract between the client and programmer in how it will forward the filters to the broker, the filter on the other hand is a contract between the broker and the client that broker will forward all messages(with mentioned QoS, etc.) to the client on said topic filter. Similarly publish is a contract between client and programmer that the message will be delivered to the broker with said QoS while the message itself is the payload contained within and not the whole package it gets transported in.

@swanandx
Copy link
Member Author

swanandx commented Dec 22, 2023

ah gotcha!

Subscription == Filter + Options! Will try to replace Filter with Subscription for users and see how it goes ( will be still calling it Filter internally! )
ps: wdyt about SubscribeFilter ( ref )

what about the Message though? what should we name it then? can't name it Publish as it will conflict with publish packet, nor Publication cuz sounds weird haha

@swanandx
Copy link
Member Author

as you mentioned here:

The subscription is a transfer contract between the client and programmer in how it will forward the filters to the broker

and as in standards:

The SUBSCRIBE packet is sent from the Client to the Server to create one or more Subscriptions

technically, Subscription isn't sent, rather it is created upon receiving SUBSCRIBE packet. So passing down Subscription to subscribe() doesn't sound right:

let subscription = Subscription::new("topic/filter", qos);
client.subscribe(subscription);

@swanandx
Copy link
Member Author

Paho also uses the same terminologies, they send a Message to publish! ref: https://github.com/eclipse/paho.mqtt.rust/blob/master/examples/async_publish.rs#L59-L60

@de-sh
Copy link
Contributor

de-sh commented Dec 22, 2023

what about the Message though? what should we name it then? can't name it Publish as it will conflict with publish packet, nor Publication cuz sounds weird haha

Paho also uses the same terminologies, they send a Message to publish! ref: https://github.com/eclipse/paho.mqtt.rust/blob/master/examples/async_publish.rs#L59-L60

We can continue to call it Publish and give it a publish builder, looks like eclipse/paho is using the naive understanding of the terms but naming is always the toughest part of writing software, so let's tread carefully on this one, will ensure the codebase is readable.

The SUBSCRIBE packet is sent from the Client to the Server to create one or more Subscriptions

technically, Subscription isn't sent, rather it is created upon receiving SUBSCRIBE packet. So passing down Subscription to subscribe() doesn't sound right:

Yes, that's what I meant by subscription being a contract between client and broker to forward message, the subscription is created on the basis of a request that contains the filter along with a set of other params, i.e. QoS, etc. So going by the wording, a subscription request is the combined product of a set [filter, QoS and other parameters], the subscription requests are packaged together along with a set of properties(not necessary) to create the MQTT subscription packet that is transfered over the network.

@swanandx
Copy link
Member Author

I have made the changes ( I already had them, so just pushed haha ) as per the originally proposed naming on rumqttc-pub-sub-api-revamp branch, have a look: main...rumqttc-pub-sub-api-revamp


naming is always the toughest part of writing software, so let's tread carefully on this one, will ensure the codebase is readable.

Definitely agree on trading this carefully on this so our codebase is readable and our APIs are user friendly!


TBH, to me Filter/Message feels comfortable to use / maintain while PublishBuilder / Subscription introduces much more additional overhead both to users and readability, but that might just be due to some biases 😅

let's also have some more people share their opinions / views on this? wdyt?

@benjamin-nw
Copy link

Hello,

Thanks for your work @swanandx , I'll give some though on the changes.

Publish

I like the new Message struct to construct an MQTT message.

I'm doing the same on our wrapper around rumqtt and currently I need to unfold my struct in order to call client.publish(...), thus having a rumqtt::Message will allow us to do:

pub async fn publish(&self, msg: Message) -> Result<(), Error> {
    // Current code
    self.inner.publih(msg.topic(), msg.qos().into(), false, msg.payload()).await?

    // New code
    self.inner.publish(msg.into()).await? // We just need to add a `From<Message> for rumqtt::Message`
}

I also like the idea to have a builder for Message.

// Not sure if the `build()` step can fail or not ? Maybe for topic validation as you said.
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().build()?;

The state machine for the Builder could look like that. It's very simple and allows you to build at every step.

The only mandatory fields are topic and qos.

The payload default to being empty, and the retain to false.

stateDiagram-v2
    [*] --> New

    New --> Payload
    New --> Build
    New --> Retain

    Payload --> Build
    Payload --> Retain

    Retain --> Build

    Build --> [*]
Loading

Thus, all the possibilities are:

let message = Message::new("topic", QoS::AtMostOnce).build()?;
let message = Message::new("topic", QoS::AtMostOnce).retain().build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().build()?;

For v5 if I understand correctly, we could add a property call that you can repeat.

stateDiagram-v2
    [*] --> New

    New --> Payload
    New --> Build
    New --> Retain
    New --> Property

    Payload --> Build
    Payload --> Retain
    Payload --> Property

    Retain --> Build
    Retain --> Property

    Property --> Property
    Property --> Build

    Build --> [*]
Loading

Thus, it will leads to:

// Previous cases +
let message = Message::new("topic", QoS::AtMostOnce).property(...).build()?;
let message = Message::new("topic", QoS::AtMostOnce).retain().property(...).build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").property().build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().property().build()?;

// And you can chain properties
let message = Message::new("topic", QoS::AtMostOnce)
                         .payload("")
                         .retain()
                         .property(...)
                         .property(...)
                         .build()?;

Subscribe

For the Subribe part, I'm not sure of the terminology either.

Filter can be seen as a SubscribeFilter or a TopicFilter.

The Builder pattern suits the Filter perfectly because you only need to build it once.

Builder pattern

I think it's better to use Message::new instead of Message::builder. The user don't want to create a Builder, it wants a New Message. The builder pattern is good because it fits well with the natural way of thinking about constructing new things in Rust.

@benjamin-nw
Copy link

benjamin-nw commented Dec 29, 2023

I've an idea, it might be weird, but I think it's worth a proposition.

What if the Message was only the immutable data (i.e: topic, qos, retain, property) and we can construct each time a new Message with the wanted payload.

Let me show you what I mean.

Publish

We want to first create an immutable Message, and send it when we receive a payload.

1. Sending the Message and the Payload as parameters.

/// Send data to mqtt when the `rec` receive data from another tasks.
async fn send_task(client: AsyncClient, topic: Topic, qos: QoS, rec: Receiver<[u8]>) -> Result<(), Error> {
    let msg = Message::new(topic).qos(qos).retain().build();

    while let Some(data) = rec.recv().await {
        client.publish(&msg, data).await?;
        // or
        client.publish(msg.clone(), data).await?;
    }
}

2. Creating a PublishMessage with the Message and Payload stored by ref inside it

/// Send data to mqtt when the `rec` receive data from another tasks.
async fn send_task(client: AsyncClient, topic: Topic, qos: QoS, rec: Receiver<[u8]>) -> Result<(), Error> {
    let msg = Message::new(topic).qos(qos).retain().build();

    while let Some(data) = rec.recv().await {
        client.publish(msg.payload(data)).await?;
    }
}

impl Message {
     fn payload(&'a self, payload: &'b [u8]) -> PublishMessage<'a, 'b> {
        PublishMessage::new(self, payload)
    }
}

Notes

It's kind of a similar way of how paho does it with its Topic.

@benjamin-nw
Copy link

Can you make a draft PR of the branch rumqttc-pub-sub-api-revamp in order to better see the changes ?

I'll gladly review the code even if it's a WIP.

@swanandx
Copy link
Member Author

Thank you so much for the feedback @benjamin-nw ! 🚀 💯

here are my thoughts on it:

I also like the idea to have a builder for Message.

Instead of a builder, where you would need to call build() at end, I was thinking more of adding a consuming pattern like

let message = Message::new(..).payload("...");

// here payload fn will look something like
impl Message {
     fn payload(mut self, payload: _) -> Self {
        self.payload = payload;
        self
    }
}

reason:

  • Builder introduces an additional type on which we will call build() to get Message
  • Doesn't provide much benefit in our use case

note:

  • we need to change new() to return Result<Self, Err> if we want to verify the topic right away.
  • if we do so, we can't update the topic like message.topic = "_" as we won't be able to verify it. We would need fn like get_topic() & set_topic() to do so
  • otherwise, we can always validate topic in call to publish 😄

we could add a property call that you can repeat.

if property(_) was there, it will take PublishProperties struct, so we won't need to repeat the call.

currently, instead of putting properties in message, we pass them to different functions:

client.publish(message);
client.publish_with_properties(message, properties); // v5 only

do you think adding them to Message is better? if yes, will do it!

Initially I didn't add them due to thinking like we are publishing the message with properties, the message itself won't have the properties right? and we needed to pass PublishProperties to Message which felt bit awkward haha. But I guess if it is more user friendly to add properties in Message itself, then sure we can do it!


Sending the Message and the Payload as parameters.

we can work around it without need for sending payload as parameter, like shown in examples:

async fn requests(client: AsyncClient) {
    let filter = Filter::new("hello/world", QoS::AtMostOnce);
    client.subscribe(filter).await.unwrap();

    let mut message = Message::new("hello/world", QoS::ExactlyOnce);
    for i in 1..=10 {
        message.payload = vec![1; i];
        client.publish(message.clone()).await.unwrap();

        time::sleep(Duration::from_secs(1)).await;
    }
}

It's kind of a similar way of how paho does it with its Topic.

Topic seems like just an wrapper around client, so more like an additional abstraction, which we can consider to be out of scope for this RFC right?


Can you make a draft PR of the branch rumqttc-pub-sub-api-revamp in order to better see the changes ?

I will do it once we have finalized the basic design from the RFC 💯 for now, have a look here: main...rumqttc-pub-sub-api-revamp

CI running on PRs whenever I push a commit kinda annoys me haha ( jk 😜 )

Thanks for the review! :)

@benjamin-nw
Copy link

Hi, happy new year 🎉 !

I was thinking more of adding a consuming pattern like

Yeah I think it's a good trade-off between having a new type for a Builder and having a good user experience for handling Messages 👍🏻

we need to change new() to return Result<Self, Err> if we want to verify the topic right away.

Agreed, or we can we can always validate topic in call to publish, it's also a solution to keep Message simple.

if we do so, we can't update the topic like message.topic = "_" as we won't be able to verify it. We would need fn like get_topic() & set_topic() to do so

I think it's a bad idea to have set_topic(). We can enforce the creation of a new Message if we want a new Topic. Thus, the only way to access a new topic, is to create a new Message, which is, very simple.

We'll only need topic() and payload(...).


do you think adding them to Message is better?

For this one, I'm not sure, we don't use properties. But keeping another function like publish_with_properties might be a bit clearer to really indicate that you are using v5.

Initially I didn't add them due to thinking like we are publishing the message with properties, the message itself won't have the properties right?

Yes, we can keep them separated. It make more senses to have the properties outside the Message. Thus we can change properties without affecting the Message.


we can work around it without need for sending payload as parameter

For this, I'm not sure.

Is it expensive to clone() the Message ? Because we'll be cloning a lot our current Message.

If we are using Bytes for the payload it's okay because we only clone an Arc, but the rest of the Message, it might cost more if the topic is big.

We need to check which one is cheaper in term of memory usage, and which one is simpler to use between:

async fn requests(client: AsyncClient) {
    let filter = Filter::new("hello/world", QoS::AtMostOnce);
    client.subscribe(filter).await.unwrap();

    let mut message = Message::new("hello/world", QoS::ExactlyOnce);
    for i in 1..=10 {
        message.payload = vec![1; i];
        client.publish(message.clone()).await.unwrap();

        time::sleep(Duration::from_secs(1)).await;
    }
}

or

async fn requests(client: AsyncClient) {
    let filter = Filter::new("hello/world", QoS::AtMostOnce);
    client.subscribe(filter).await.unwrap();

    let mut message = Message::new("hello/world", QoS::ExactlyOnce);
    for i in 1..=10 {
        let payload = vec![1; i];
        client.publish(&message, payload).await.unwrap();

        time::sleep(Duration::from_secs(1)).await;
    }
}

Topic seems like just an wrapper around client, so more like an additional abstraction, which we can consider to be out of scope for this RFC right?

I agree and I don't like the way paho implements this wrapper either. So this is out of scope 👍🏻


I will do it once we have finalized the basic design from the RFC 💯 for now

Got it 👍🏻 will look at the branch for now.

@swanandx
Copy link
Member Author

swanandx commented Jan 2, 2024

Hey, Happy New Year! 🥳 🎆

Thanks for the detailed reply!

it might cost more if the topic is big.

If we pass the topic as reference to avoid cloning, it will be actually of type Into<String> ( similar to our existing design ), and we will end up calling .into() on it to convert it to String anyways ( e.g. here ) . So I think cost will be similar in both the cases.

Payload being the part of Message seems more intuitive and might as well help in debugging? ( not sure here haha )

note: passing payload as parameter, would allow fns like publish_bytes where we can use Bytes directly!

We need to check which one is cheaper in term of memory usage, and which one is simpler to use between:

Even though I was prioritizing simplicity here ( and to tackle performance later ), I definitely agree with you, we should verify the perf impact! I'm positive that it will be negligible / none for most of the users :)

@benjamin-nw
Copy link

benjamin-nw commented Jan 2, 2024

Payload being the part of Message seems more intuitive

Yup, we can do this then.

If we pass the topic as reference to avoid cloning, it will be actually of type Into

Oh, didn't know that, good for me 👍🏻

and might as well help in debugging?

Technically yes because the payload will be in Message and not elsewhere.

note: passing payload as parameter, would allow fns like publish_bytes where we can use Bytes directly!

You mean as a public API ?

Even though I was prioritizing simplicity here ( and to tackle performance later )

Full 👍🏻 on this. Let's make a simple yet useful API and tackle performance issues (if any) later.

@swanandx
Copy link
Member Author

swanandx commented Jan 2, 2024

You mean as a public API ?

Yup, it is currently in API https://github.com/bytebeamio/rumqtt/blob/main/rumqttc%2Fsrc%2Fclient.rs#L132

but I removed it in this RFC due to Message. Maybe we can use generics & traits such that Message payload field can accept both? Will have to see how complex it gets though 👀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants