Skip to content

Commit

Permalink
feat: send max packet size of server in connack
Browse files Browse the repository at this point in the history
  • Loading branch information
swanandx committed Sep 10, 2023
1 parent 9431c10 commit e1e8954
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
15 changes: 11 additions & 4 deletions rumqttd/src/link/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl<P: Protocol> RemoteLink<P> {
tenant_id: Option<String>,
mut network: Network<P>,
connect_packet: Packet,
dynamic_filters: bool,
config: Arc<ConnectionSettings>,
) -> Result<RemoteLink<P>, Error> {
let Packet::Connect(connect, props, lastwill, lastwill_props, _) = connect_packet else {
return Err(Error::NotConnectPacket(connect_packet));
Expand Down Expand Up @@ -98,15 +98,22 @@ impl<P: Protocol> RemoteLink<P> {
.clean_session(clean_session)
.last_will(lastwill)
.last_will_properties(lastwill_props)
.dynamic_filters(dynamic_filters)
.dynamic_filters(config.dynamic_filters)
.topic_alias_max(topic_alias_max.unwrap_or(0))
.build()?;

let id = link_rx.id();
Span::current().record("connection_id", id);

if let Some(packet) = notification.into() {
network.write(packet).await?;
if let Some(mut packet) = notification.into() {
if let Packet::ConnAck(_ack, props) = &mut packet {
let mut new_props = props.clone().unwrap_or_default();
// NOTE: shall we rename max_payload_size to max_packet_size
// and make it u32?
new_props.max_packet_size = Some(config.max_payload_size as u32);
*props = Some(new_props);
network.write(packet).await?;
}
}

Ok(RemoteLink {
Expand Down
6 changes: 2 additions & 4 deletions rumqttd/src/server/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,7 @@ async fn remote<P: Protocol>(
protocol,
);

let dynamic_filters = config.dynamic_filters;

let connect_packet = match mqtt_connect(config, &mut network).await {
let connect_packet = match mqtt_connect(config.clone(), &mut network).await {
Ok(p) => p,
Err(e) => {
error!(error=?e, "Error while handling MQTT connect packet");
Expand Down Expand Up @@ -506,7 +504,7 @@ async fn remote<P: Protocol>(
tenant_id.clone(),
network,
connect_packet,
dynamic_filters,
config,
)
.await
{
Expand Down

0 comments on commit e1e8954

Please sign in to comment.