Skip to content

Commit

Permalink
Mqtt websocket first iteration (iotaledger#561)
Browse files Browse the repository at this point in the history
* Mqtt websocket first iteration

* fix

* small changes

* bindings added for broker options changes

* bindings fix

* convert non hex index to hex

* remove comment

* add changes file

* review fixes.

* fix

* Fix for python binding

* python binding fix

* fixed python mqqt test

Co-authored-by: Thoralf-M <[email protected]>
  • Loading branch information
melatron and Thoralf-M authored May 17, 2021
1 parent e085181 commit 98dad97
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 7 deletions.
7 changes: 7 additions & 0 deletions .changes/mqtt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"nodejs-binding": minor
---

MQTT uses websocket as default
Indexation topic with non hex content will be converted to hex automatically

2 changes: 2 additions & 0 deletions bindings/nodejs/lib/types/models.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export declare interface BrokerOptions {
automaticDisconnect?: boolean
// timeout in milliseconds
timeout?: number
use_ws?: boolean
port?: number
maxReconnectionAttempts?: number
}

Expand Down
2 changes: 2 additions & 0 deletions bindings/python/native/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ impl Client {
let rust_broker_options = RustBrokerOptions::new()
.automatic_disconnect(broker_options.automatic_disconnect)
.timeout(Duration::from_secs(broker_options.timeout))
.use_ws(broker_options.use_ws)
.port(broker_options.port)
.max_reconnection_attempts(broker_options.max_reconnection_attempts);
client = client.with_mqtt_broker_options(rust_broker_options);
}
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/native/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ pub struct BrokerOptions {
pub automatic_disconnect: bool,
/// broker timeout in secs
pub timeout: u64,
/// to use ws instead of tcp
pub use_ws: bool,
/// port
pub port: u16,
/// max number of attempts to reconnect.
pub max_reconnection_attempts: usize,
}
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/native/tests/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
broker_options = {
'automatic_disconnect': True,
'timeout': 30,
'use_ws': True,
'port': 443,
'max_reconnection_attempts': 5,
}

Expand All @@ -32,6 +34,8 @@
broker_options = {
'automatic_disconnect': True,
'timeout': 5,
'use_ws': True,
'port': 443,
'max_reconnection_attempts': 5,
}

Expand Down
2 changes: 1 addition & 1 deletion docs/libraries/nodejs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ IOTA node(s) provides [Message Queuing Telemetry Transport](https://en.wikipedia
* milestones/confirmed
* messages
* messages/referenced
* messages/indexation/{index} (index needs to be formatted as hex-string, i.e. string: HORNET Spammer ==> hex-string: 484f524e4554205370616d6d6572)
* messages/indexation/{index}
* messages/{messageId}/metadata
* transactions/{transactionId}/included-message
* outputs/{outputId}
Expand Down
2 changes: 1 addition & 1 deletion iota-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ reqwest = { version = "0.11", features = ["json", "rustls-tls", "blocking"], def
futures = { version = "0.3", optional = true }

# MQTT
rumqttc = { version = "0.5", optional = true }
rumqttc = { version = "0.5", features = ["websocket"], optional = true}
# also used for storage
once_cell = { version = "1.7", optional = true }

Expand Down
27 changes: 27 additions & 0 deletions iota-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub struct BrokerOptions {
pub(crate) automatic_disconnect: bool,
#[serde(default = "default_broker_timeout")]
pub(crate) timeout: Duration,
#[serde(default = "default_broker_use_ws", rename = "defaultBrokerUseWs")]
pub(crate) use_ws: bool,
#[serde(default = "default_broker_port", rename = "defaultBrokerPort")]
pub(crate) port: u16,
#[serde(rename = "maxReconnectionAttempts", default)]
pub(crate) max_reconnection_attempts: usize,
}
Expand All @@ -128,6 +132,15 @@ fn default_broker_automatic_disconnect() -> bool {
fn default_broker_timeout() -> Duration {
Duration::from_secs(30)
}
#[cfg(feature = "mqtt")]
fn default_broker_use_ws() -> bool {
true
}

#[cfg(feature = "mqtt")]
fn default_broker_port() -> u16 {
1883
}

#[cfg(feature = "mqtt")]
fn default_max_reconnection_attempts() -> usize {
Expand All @@ -140,6 +153,8 @@ impl Default for BrokerOptions {
Self {
automatic_disconnect: default_broker_automatic_disconnect(),
timeout: default_broker_timeout(),
use_ws: default_broker_use_ws(),
port: default_broker_port(),
max_reconnection_attempts: default_max_reconnection_attempts(),
}
}
Expand All @@ -164,6 +179,18 @@ impl BrokerOptions {
self
}

/// Sets the use_ws used for the MQTT operations.
pub fn use_ws(mut self, use_ws: bool) -> Self {
self.use_ws = use_ws;
self
}

/// Sets the port used for the MQTT operations.
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}

/// Sets the maximum number of reconnection attempts.
pub fn max_reconnection_attempts(mut self, max_reconnection_attempts: usize) -> Self {
self.max_reconnection_attempts = max_reconnection_attempts;
Expand Down
38 changes: 33 additions & 5 deletions iota-client/src/node/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use log::warn;
use regex::Regex;
use rumqttc::{
AsyncClient as MqttClient, Event, EventLoop, Incoming, MqttOptions, QoS, Request, Subscribe, SubscribeFilter,
Transport,
};
use tokio::sync::{watch::Sender, RwLock};

Expand All @@ -31,6 +32,19 @@ impl TryFrom<&str> for Topic {
impl Topic {
/// Creates a new topic and checks if it's valid.
pub fn new<S: Into<String>>(name: S) -> Result<Self> {
let mut name: String = name.into();
// Convert non hex index to hex
let indexation_beginning = "messages/indexation/";
if name.len() > indexation_beginning.len()
&& &name[0..indexation_beginning.len()] == indexation_beginning
&& hex::decode(&name[indexation_beginning.len()..name.len()]).is_err()
{
name = format!(
"messages/indexation/{}",
hex::encode(&name[indexation_beginning.len()..name.len()])
);
}

let valid_topics = lazy_static!(
["milestones/latest", "milestones/confirmed", "messages", "messages/referenced"].to_vec() => Vec<&str>
);
Expand All @@ -47,7 +61,6 @@ impl Topic {
].to_vec() => Vec<Regex>
);

let name = name.into();
if valid_topics.iter().any(|valid| valid == &name) || regexes.iter().any(|re| re.is_match(&name)) {
let topic = Self(name);
Ok(topic)
Expand Down Expand Up @@ -83,9 +96,24 @@ async fn get_mqtt_client(client: &mut Client) -> Result<&mut MqttClient> {
};
for node in nodes.iter() {
let host = node.url.host_str().expect("Can't get host from URL");
let mut mqttoptions = MqttOptions::new(host, host, 1883);
mqttoptions.set_connection_timeout(client.broker_options.timeout.as_secs());
let (_, mut connection) = MqttClient::new(mqttoptions.clone(), 10);
let id = "iota.rs";
let port = client.broker_options.port;
let mut uri = format!(
"{}://{}:{}/mqtt",
if node.url.scheme() == "https" { "wss" } else { "ws" },
host,
node.url.port_or_known_default().unwrap_or(port)
);

if !client.broker_options.use_ws {
uri = host.to_string();
};
let mut mqtt_options = MqttOptions::new(id, uri, port);
if client.broker_options.use_ws {
mqtt_options.set_transport(Transport::ws());
}
mqtt_options.set_connection_timeout(client.broker_options.timeout.as_secs());
let (_, mut connection) = MqttClient::new(mqtt_options.clone(), 10);
// poll the event loop until we find a ConnAck event,
// which means that the mqtt client is ready to be used on this host
// if the event loop returns an error, we check the next node
Expand All @@ -99,7 +127,7 @@ async fn get_mqtt_client(client: &mut Client) -> Result<&mut MqttClient> {

// if we found a valid mqtt connection, loop it on a separate thread
if got_ack {
let (mqtt_client, connection) = MqttClient::new(mqttoptions, 10);
let (mqtt_client, connection) = MqttClient::new(mqtt_options, 10);
client.mqtt_client.replace(mqtt_client);
poll_mqtt(
client.mqtt_topic_handlers.clone(),
Expand Down

0 comments on commit 98dad97

Please sign in to comment.