From 98dad972549339d32fba6c06057a9df7582e0b51 Mon Sep 17 00:00:00 2001 From: Antoni Dikov Date: Mon, 17 May 2021 14:40:42 +0200 Subject: [PATCH] Mqtt websocket first iteration (#561) * Mqtt websocket first iteration * fix * small changes * bindings added for broker options changes * bindings fix * convert non hex index to hex * remove comment * add changes file * review fixes. * fix * Fix for python binding * python binding fix * fixed python mqqt test Co-authored-by: Thoralf-M --- .changes/mqtt.md | 7 ++++ bindings/nodejs/lib/types/models.d.ts | 2 ++ bindings/python/native/src/client/mod.rs | 2 ++ bindings/python/native/src/client/types.rs | 4 +++ bindings/python/native/tests/test_mqtt.py | 4 +++ docs/libraries/nodejs/examples.md | 2 +- iota-client/Cargo.toml | 2 +- iota-client/src/client.rs | 27 +++++++++++++++ iota-client/src/node/mqtt.rs | 38 +++++++++++++++++++--- 9 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 .changes/mqtt.md diff --git a/.changes/mqtt.md b/.changes/mqtt.md new file mode 100644 index 000000000..a8b9efd33 --- /dev/null +++ b/.changes/mqtt.md @@ -0,0 +1,7 @@ +--- +"nodejs-binding": minor +--- + +MQTT uses websocket as default +Indexation topic with non hex content will be converted to hex automatically + diff --git a/bindings/nodejs/lib/types/models.d.ts b/bindings/nodejs/lib/types/models.d.ts index cc1a3341d..b4273c357 100644 --- a/bindings/nodejs/lib/types/models.d.ts +++ b/bindings/nodejs/lib/types/models.d.ts @@ -54,6 +54,8 @@ export declare interface BrokerOptions { automaticDisconnect?: boolean // timeout in milliseconds timeout?: number + use_ws?: boolean + port?: number maxReconnectionAttempts?: number } diff --git a/bindings/python/native/src/client/mod.rs b/bindings/python/native/src/client/mod.rs index b807ce644..7abc12be0 100644 --- a/bindings/python/native/src/client/mod.rs +++ b/bindings/python/native/src/client/mod.rs @@ -117,6 +117,8 @@ impl Client { let rust_broker_options = RustBrokerOptions::new() .automatic_disconnect(broker_options.automatic_disconnect) .timeout(Duration::from_secs(broker_options.timeout)) + .use_ws(broker_options.use_ws) + .port(broker_options.port) .max_reconnection_attempts(broker_options.max_reconnection_attempts); client = client.with_mqtt_broker_options(rust_broker_options); } diff --git a/bindings/python/native/src/client/types.rs b/bindings/python/native/src/client/types.rs index 03c1c10f7..1217a551f 100644 --- a/bindings/python/native/src/client/types.rs +++ b/bindings/python/native/src/client/types.rs @@ -292,6 +292,10 @@ pub struct BrokerOptions { pub automatic_disconnect: bool, /// broker timeout in secs pub timeout: u64, + /// to use ws instead of tcp + pub use_ws: bool, + /// port + pub port: u16, /// max number of attempts to reconnect. pub max_reconnection_attempts: usize, } diff --git a/bindings/python/native/tests/test_mqtt.py b/bindings/python/native/tests/test_mqtt.py index 9bc016f1c..e54a617e4 100644 --- a/bindings/python/native/tests/test_mqtt.py +++ b/bindings/python/native/tests/test_mqtt.py @@ -19,6 +19,8 @@ broker_options = { 'automatic_disconnect': True, 'timeout': 30, + 'use_ws': True, + 'port': 443, 'max_reconnection_attempts': 5, } @@ -32,6 +34,8 @@ broker_options = { 'automatic_disconnect': True, 'timeout': 5, + 'use_ws': True, + 'port': 443, 'max_reconnection_attempts': 5, } diff --git a/docs/libraries/nodejs/examples.md b/docs/libraries/nodejs/examples.md index 8b4d07a82..61282eb85 100644 --- a/docs/libraries/nodejs/examples.md +++ b/docs/libraries/nodejs/examples.md @@ -516,7 +516,7 @@ IOTA node(s) provides [Message Queuing Telemetry Transport](https://en.wikipedia * milestones/confirmed * messages * messages/referenced -* messages/indexation/{index} (index needs to be formatted as hex-string, i.e. string: HORNET Spammer ==> hex-string: 484f524e4554205370616d6d6572) +* messages/indexation/{index} * messages/{messageId}/metadata * transactions/{transactionId}/included-message * outputs/{outputId} diff --git a/iota-client/Cargo.toml b/iota-client/Cargo.toml index d39be1905..07985998a 100644 --- a/iota-client/Cargo.toml +++ b/iota-client/Cargo.toml @@ -37,7 +37,7 @@ reqwest = { version = "0.11", features = ["json", "rustls-tls", "blocking"], def futures = { version = "0.3", optional = true } # MQTT -rumqttc = { version = "0.5", optional = true } +rumqttc = { version = "0.5", features = ["websocket"], optional = true} # also used for storage once_cell = { version = "1.7", optional = true } diff --git a/iota-client/src/client.rs b/iota-client/src/client.rs index d2082f79e..81703bd10 100644 --- a/iota-client/src/client.rs +++ b/iota-client/src/client.rs @@ -115,6 +115,10 @@ pub struct BrokerOptions { pub(crate) automatic_disconnect: bool, #[serde(default = "default_broker_timeout")] pub(crate) timeout: Duration, + #[serde(default = "default_broker_use_ws", rename = "defaultBrokerUseWs")] + pub(crate) use_ws: bool, + #[serde(default = "default_broker_port", rename = "defaultBrokerPort")] + pub(crate) port: u16, #[serde(rename = "maxReconnectionAttempts", default)] pub(crate) max_reconnection_attempts: usize, } @@ -128,6 +132,15 @@ fn default_broker_automatic_disconnect() -> bool { fn default_broker_timeout() -> Duration { Duration::from_secs(30) } +#[cfg(feature = "mqtt")] +fn default_broker_use_ws() -> bool { + true +} + +#[cfg(feature = "mqtt")] +fn default_broker_port() -> u16 { + 1883 +} #[cfg(feature = "mqtt")] fn default_max_reconnection_attempts() -> usize { @@ -140,6 +153,8 @@ impl Default for BrokerOptions { Self { automatic_disconnect: default_broker_automatic_disconnect(), timeout: default_broker_timeout(), + use_ws: default_broker_use_ws(), + port: default_broker_port(), max_reconnection_attempts: default_max_reconnection_attempts(), } } @@ -164,6 +179,18 @@ impl BrokerOptions { self } + /// Sets the use_ws used for the MQTT operations. + pub fn use_ws(mut self, use_ws: bool) -> Self { + self.use_ws = use_ws; + self + } + + /// Sets the port used for the MQTT operations. + pub fn port(mut self, port: u16) -> Self { + self.port = port; + self + } + /// Sets the maximum number of reconnection attempts. pub fn max_reconnection_attempts(mut self, max_reconnection_attempts: usize) -> Self { self.max_reconnection_attempts = max_reconnection_attempts; diff --git a/iota-client/src/node/mqtt.rs b/iota-client/src/node/mqtt.rs index 543891510..725cd78f3 100644 --- a/iota-client/src/node/mqtt.rs +++ b/iota-client/src/node/mqtt.rs @@ -11,6 +11,7 @@ use log::warn; use regex::Regex; use rumqttc::{ AsyncClient as MqttClient, Event, EventLoop, Incoming, MqttOptions, QoS, Request, Subscribe, SubscribeFilter, + Transport, }; use tokio::sync::{watch::Sender, RwLock}; @@ -31,6 +32,19 @@ impl TryFrom<&str> for Topic { impl Topic { /// Creates a new topic and checks if it's valid. pub fn new>(name: S) -> Result { + let mut name: String = name.into(); + // Convert non hex index to hex + let indexation_beginning = "messages/indexation/"; + if name.len() > indexation_beginning.len() + && &name[0..indexation_beginning.len()] == indexation_beginning + && hex::decode(&name[indexation_beginning.len()..name.len()]).is_err() + { + name = format!( + "messages/indexation/{}", + hex::encode(&name[indexation_beginning.len()..name.len()]) + ); + } + let valid_topics = lazy_static!( ["milestones/latest", "milestones/confirmed", "messages", "messages/referenced"].to_vec() => Vec<&str> ); @@ -47,7 +61,6 @@ impl Topic { ].to_vec() => Vec ); - let name = name.into(); if valid_topics.iter().any(|valid| valid == &name) || regexes.iter().any(|re| re.is_match(&name)) { let topic = Self(name); Ok(topic) @@ -83,9 +96,24 @@ async fn get_mqtt_client(client: &mut Client) -> Result<&mut MqttClient> { }; for node in nodes.iter() { let host = node.url.host_str().expect("Can't get host from URL"); - let mut mqttoptions = MqttOptions::new(host, host, 1883); - mqttoptions.set_connection_timeout(client.broker_options.timeout.as_secs()); - let (_, mut connection) = MqttClient::new(mqttoptions.clone(), 10); + let id = "iota.rs"; + let port = client.broker_options.port; + let mut uri = format!( + "{}://{}:{}/mqtt", + if node.url.scheme() == "https" { "wss" } else { "ws" }, + host, + node.url.port_or_known_default().unwrap_or(port) + ); + + if !client.broker_options.use_ws { + uri = host.to_string(); + }; + let mut mqtt_options = MqttOptions::new(id, uri, port); + if client.broker_options.use_ws { + mqtt_options.set_transport(Transport::ws()); + } + mqtt_options.set_connection_timeout(client.broker_options.timeout.as_secs()); + let (_, mut connection) = MqttClient::new(mqtt_options.clone(), 10); // poll the event loop until we find a ConnAck event, // which means that the mqtt client is ready to be used on this host // if the event loop returns an error, we check the next node @@ -99,7 +127,7 @@ async fn get_mqtt_client(client: &mut Client) -> Result<&mut MqttClient> { // if we found a valid mqtt connection, loop it on a separate thread if got_ack { - let (mqtt_client, connection) = MqttClient::new(mqttoptions, 10); + let (mqtt_client, connection) = MqttClient::new(mqtt_options, 10); client.mqtt_client.replace(mqtt_client); poll_mqtt( client.mqtt_topic_handlers.clone(),