From cdc6393ca4b6d30c5836913a6dd01de69cb60755 Mon Sep 17 00:00:00 2001 From: Ravi Teja Date: Wed, 11 Dec 2019 18:48:23 +0530 Subject: [PATCH] Connection stream (#13) * Make connection part of the stream and remove reconnection feature * Update versions * Add discord badge * Remove discord header --- README.md | 4 +- rg.diff | 0 rumq-cli/Cargo.toml | 2 +- rumq-client/Cargo.toml | 4 +- rumq-client/examples/gcloud.rs | 6 +- rumq-client/examples/pubsub.rs | 6 +- rumq-client/examples/smoke.rs | 6 +- rumq-client/src/eventloop.rs | 199 ++++++++++++++------------------- rumq-client/src/lib.rs | 17 +-- rumq-client/src/state.rs | 2 - rumq-core/Cargo.toml | 2 +- 11 files changed, 104 insertions(+), 144 deletions(-) create mode 100644 rg.diff diff --git a/README.md b/README.md index db443b675..868ff0cae 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -# rumq [![img](https://github.com/tekjar/rumq/workflows/CI/badge.svg)](https://github.com/tekjar/rumq/actions) +# rumq +[![img](https://github.com/tekjar/rumq/workflows/CI/badge.svg)](https://github.com/tekjar/rumq/actions) +[![img](https://img.shields.io/discord/633193308033646605?style=flat-square)](https://discord.gg/mpkSqDg) MQTT ecosystem in rust which strives to be simple, robust and performant diff --git a/rg.diff b/rg.diff new file mode 100644 index 000000000..e69de29bb diff --git a/rumq-cli/Cargo.toml b/rumq-cli/Cargo.toml index e72950985..99de5ae19 100644 --- a/rumq-cli/Cargo.toml +++ b/rumq-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rumq-cli" -version = "0.1.0-alpha.1" +version = "0.1.0-alpha.2" description = "Commandline mqtt utilities to replace mosquitto_sub and mosquitto_pub" license = "MIT" repository = "https://github.com/tekjar/rumq" diff --git a/rumq-client/Cargo.toml b/rumq-client/Cargo.toml index 4afebe88a..8d6ffe1a4 100644 --- a/rumq-client/Cargo.toml +++ b/rumq-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rumq-client" -version = "0.1.0-alpha.1" +version = "0.1.0-alpha.2" description = "An efficeint and robust mqtt client for your connected devices" license = "MIT" repository = "https://github.com/tekjar/rumq" @@ -16,7 +16,7 @@ async-stream = "0.2" futures-util = "0.3" webpki = "0.21" tokio-rustls = "0.12" -rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.1" } +rumq-core = { path = "../rumq-core", version = "0.1.0-alpha.2" } log = "0.4" pin-project = "0.4" diff --git a/rumq-client/examples/gcloud.rs b/rumq-client/examples/gcloud.rs index 05e83a459..401cc5d93 100644 --- a/rumq-client/examples/gcloud.rs +++ b/rumq-client/examples/gcloud.rs @@ -4,7 +4,7 @@ use std::ops::Add; use std::env; use std::fs; -use rumq_client::{self, MqttOptions, ReconnectOptions, Request, MqttEventLoop, eventloop}; +use rumq_client::{self, MqttOptions, Request, MqttEventLoop, eventloop}; use serde::{Serialize, Deserialize}; use jsonwebtoken::{encode, Algorithm, Header}; use futures_util::stream::StreamExt; @@ -21,7 +21,7 @@ async fn main() { let (requests_tx, requests_rx) = channel(1); let mqttoptions = gcloud(); - let mut eventloop = eventloop(mqttoptions, requests_rx).await.unwrap(); + let mut eventloop = eventloop(mqttoptions, requests_rx); thread::spawn(move || { requests(requests_tx); @@ -33,7 +33,7 @@ async fn main() { } async fn stream_it(eventloop: &mut MqttEventLoop) { - let mut stream = eventloop.stream(ReconnectOptions::Never); + let mut stream = eventloop.stream(); while let Some(item) = stream.next().await { println!("{:?}", item); diff --git a/rumq-client/examples/pubsub.rs b/rumq-client/examples/pubsub.rs index 7ac324712..53ee19bc3 100644 --- a/rumq-client/examples/pubsub.rs +++ b/rumq-client/examples/pubsub.rs @@ -4,7 +4,7 @@ use tokio::sync::mpsc::{channel, Sender}; use tokio::task; use tokio::time; -use rumq_client::{self, MqttOptions, ReconnectOptions, Request, MqttEventLoop, QoS, eventloop}; +use rumq_client::{self, MqttOptions, Request, MqttEventLoop, QoS, eventloop}; use std::time::Duration; #[tokio::main(basic_scheduler)] @@ -15,7 +15,7 @@ async fn main() { let (requests_tx, requests_rx) = channel(10); let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); mqttoptions.set_keep_alive(5).set_throttle(Duration::from_secs(1)); - let mut eventloop = eventloop(mqttoptions, requests_rx).await.unwrap(); + let mut eventloop = eventloop(mqttoptions, requests_rx); thread::spawn(move || { requests(requests_tx); @@ -29,7 +29,7 @@ async fn main() { async fn stream_it(eventloop: &mut MqttEventLoop) { - let mut stream = eventloop.stream(ReconnectOptions::Always(Duration::from_secs(5))); + let mut stream = eventloop.stream(); while let Some(item) = stream.next().await { println!("{:?}", item); diff --git a/rumq-client/examples/smoke.rs b/rumq-client/examples/smoke.rs index 1fbfd9af6..c797aae87 100644 --- a/rumq-client/examples/smoke.rs +++ b/rumq-client/examples/smoke.rs @@ -4,7 +4,7 @@ use tokio::sync::mpsc::{channel, Sender}; use tokio::task; use tokio::time; -use rumq_client::{self, MqttOptions, ReconnectOptions, Request, MqttEventLoop, eventloop}; +use rumq_client::{self, MqttOptions, Request, MqttEventLoop, eventloop}; use std::time::Duration; #[tokio::main(basic_scheduler)] @@ -15,7 +15,7 @@ async fn main() { let (requests_tx, requests_rx) = channel(10); let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); mqttoptions.set_keep_alive(5).set_throttle(Duration::from_secs(1)); - let mut eventloop = eventloop(mqttoptions, requests_rx).await.unwrap(); + let mut eventloop = eventloop(mqttoptions, requests_rx); thread::spawn(move || { requests(requests_tx); @@ -29,7 +29,7 @@ async fn main() { async fn stream_it(eventloop: &mut MqttEventLoop) { - let mut stream = eventloop.stream(ReconnectOptions::Always(Duration::from_secs(5))); + let mut stream = eventloop.stream(); while let Some(item) = stream.next().await { println!("{:?}", item); diff --git a/rumq-client/src/eventloop.rs b/rumq-client/src/eventloop.rs index 9e9bab104..431b49f1a 100644 --- a/rumq-client/src/eventloop.rs +++ b/rumq-client/src/eventloop.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc::{channel, Sender, Receiver}; use async_stream::stream; use pin_project::pin_project; use crate::state::{StateError, MqttState}; -use crate::{MqttOptions, ReconnectOptions}; +use crate::MqttOptions; use std::io; use std::time::Duration; @@ -17,11 +17,10 @@ use std::time::Duration; #[pin_project] pub struct MqttEventLoop { pub state: MqttState, - options: MqttOptions, + pub options: MqttOptions, + pub requests: Box, queue_limit_tx: Sender<()>, queue_limit_rx: Receiver<()>, - requests: Box, - network: Box, throttle_flag: bool, throttle: Instant } @@ -29,11 +28,8 @@ pub struct MqttEventLoop { #[derive(From, Debug)] pub enum EventLoopError { Io(io::Error), - NoRequestStream, - NoTcpStream, NoRequest, MqttState(StateError), - StreamClosed, Timeout(Elapsed), Rumq(rumq_core::Error), Network(network::Error) @@ -42,37 +38,39 @@ pub enum EventLoopError { /// Connects to the server and returs an object which encompasses state of the connection. /// Use this to create an `stream` and poll it with tokio /// The choice of separating `MqttEventLoop` and `stream` methods is to get access to the -/// internal state after the work with the stream is done. This is useful in scenarios like -/// shutdown where the current state should be persisted and passed back to the `stream` as -/// a `Stream` -/// First connection is done here instead of the stream to initialize network directly without -/// Option and to have good error code for failured. If using an encapsulated struct with -/// connection parameters (with state inside `MqttEventLoop`) and handling indermediate critical -/// errors (like intermediate authorization failure after initial success) -pub async fn eventloop(options: MqttOptions, requests: impl Requests + 'static) -> Result { +/// internal state and mqtt options after the work with the stream is done. This is useful in +/// scenarios like shutdown where the current state should be persisted and passed back to the +/// `stream` as a `Stream` +/// For a similar reason, requests are also initialized as part of this method to reuse same +/// request stream while retrying after the previous `Stream` from `stream()` method +/// ```ignore +/// let mut eventloop = eventloop(options, requests); +/// loop { +/// let mut stream = eventloop.stream(reconnection_options); +/// while let Some(notification) = stream.next().await() {} +/// } +/// ``` +/// When mqtt `stream` ends due to critical errors (like auth failure), user has a choice to +/// access and update `options`, `state` and `requests`. +/// For example, state and requests can be used to save state to disk before shutdown. +/// Options can be used to update gcp iotcore password +/// TODO: Remove `mqttoptions` from `state` to make sure that there is not chance of dirty +pub fn eventloop(options: MqttOptions, requests: impl Requests + 'static) -> MqttEventLoop { let (queue_limit_tx, queue_limit_rx) = channel(1); - let mut state = MqttState::new(options.clone()); - - // make tcp and mqtt connections - let mut network = network_connect(&options).await?; - mqtt_connect(&options, &mut network, &mut state).await?; - - // make network and user requests generic for better unit testing capabilities - let network = Box::new(network); + let state = MqttState::new(options.clone()); let requests = Box::new(requests); - + let eventloop = MqttEventLoop { state, options, queue_limit_rx, queue_limit_tx, - network, requests, throttle_flag: false, throttle: Instant::now() }; - Ok(eventloop) + eventloop } async fn network_connect(options: &MqttOptions) -> Result, EventLoopError> { @@ -114,93 +112,69 @@ async fn mqtt_connect(options: &MqttOptions, mut network: impl Network, state: & impl MqttEventLoop { - /// This stream handle mqtt state and reconnections. It's critical to define when to retry - /// and when to return error to the user. For example, intermediate mqtt auth failure should - /// be classified as hard error and retry shouldn't be performed. Same with TLS auth failures - /// Other soft error like server/intermediate node going down intemediately or should be - /// either retried infinitely or number of times asked by the user. - /// Any hard errors are yielded as last `Notification::Error` element of the stream before - /// closing the stream. These can be converted to finer versions when necessary - /// There are no methods to poll the stream for the user. This is done to prevent extra copies - /// of user notifcations (if a channel is used) + /// This stream handle mqtt state and reconnections. + /// Instead of baking reconnections inside this stream and handling compllicated and + /// opinionated uses cases like distingushing between critical and non critical errors, + /// a different approach of separating state and connection is taken. This stream just + /// borrows state from `MqttEventLoop` and when the stream ends due to errors, users can + /// choose to use same `MqttEventLoop` to spawn a new stream and hence continuing from the + /// previous state + /// All the stream errors are represented using Notification::Error() as the last element + /// of the stream. Users can depend on this result to do error handling. For example, if + /// gcp iot core disconnects because of jwt expiry, users can update `MqttOptions` in + /// `MqttEventLoop` and create a new stream. + /// With these techniques, the codebase becomes much simpler and robustness is just a matter + /// of creating a loop like below + /// ```ignore + /// let mut eventloop = eventloop(options, requests); + /// loop { + /// let mut stream = eventloop.stream(); + /// while let Some(notification) = stream.next().await() {} + /// } + /// ``` + /// NOTE: There are no methods to poll the stream internally for the user. This is done to prevent + /// extra copies of user notifcations (if a channel is used) /// NOTE: For cases where the steam should be inturrepted, user's should wrap the stream into /// an interruptible stream. check `stream-cancel` for example. Usecases like shutdown can - /// depend on this to force stop the stream and use `MqttState` in `MqttEventLoop` to persist - /// it to disk + /// depend on this to force stop the stream and use `MqttState` in `MqttEventLoop` to save + /// intermediate state to disk /// NOTE: Similary stream can be paused in the stream loop by user to pause the stream - /// TODO: Differentiate TLS auth errors and server being down /// TODO: User requests which are bounded streams (ends after producing 'n' elements) or channels /// which are closed before acks aren't received, the current implementation ends the mqtt stream /// immediately. Probably the stream should end when all the state buffers are acked with a timeout - pub fn stream(&mut self, reconnect: ReconnectOptions) -> impl Stream + '_ { + pub fn stream(&mut self) -> impl Stream + '_ { let o = stream! { - 'main: loop { - // loop which polls mqtt requests and network after a connection is established - 'stream: loop { - let (notification, reply) = match self.read_network_and_requests().await { - Ok(o) => o, - Err(EventLoopError::NoRequest) => { - let error = format!("RequestStreamClosed"); - yield Notification::Error(error); - break 'main - } - Err(e) => { - let error = format!("{:?}", e); - yield Notification::Error(error); - break 'stream - } - }; - - // write the reply back to the network - if let Some(p) = reply { - match self.network.mqtt_write(&p).await { - Ok(_) => (), - Err(e) => { - let error = format!("{:?}", e); - yield Notification::Error(error); - break 'stream - } - } - } - - // yield the notification to the user - if let Some(n) = notification { - yield n + // make tcp and mqtt connections + let mut network = network_connect(&self.options).await.unwrap(); + mqtt_connect(&self.options, &mut network, &mut self.state).await.unwrap(); + let mut network = Box::new(network); + + loop { + let (notification, reply) = match self.read_network_and_requests(&mut network).await { + Ok(o) => o, + Err(EventLoopError::NoRequest) => { + let error = format!("RequestStreamClosed"); + yield Notification::Error(error); + break } - } - - // connection retry - 'reconnection: loop { - let mut network = match network_connect(&self.options).await { - Ok(network) => network, - Err(e) => { - error!("Network connection error = {:?}", e); - match reconnect { - ReconnectOptions::Never => { - let err = format!("Network connection error = {:?}", e); - yield Notification::Error(err); - break 'main - } - ReconnectOptions::Always(sleep) => { - time::delay_for(sleep).await; - continue 'reconnection - } - _ => unimplemented!() - } - } - }; - - if let Err(e) = mqtt_connect(&self.options, &mut network, &mut self.state).await { + Err(e) => { let error = format!("{:?}", e); yield Notification::Error(error); - break 'main + break } + }; - // make network and user requests generic for better unit testing capabilities - let network = Box::new(network); - self.network = network; - continue 'main + // write the reply back to the network + if let Some(p) = reply { + if let Err(e) = network.mqtt_write(&p).await { + let error = format!("{:?}", e); + yield Notification::Error(error); + break + } } + + // yield the notification to the user + if let Some(n) = notification { yield n } } }; @@ -210,8 +184,7 @@ impl MqttEventLoop { /// Reads user requests stream and network stream concurrently and returns notification /// which should be sent to the user and packet which should be written to network - async fn read_network_and_requests(&mut self) -> Result<(Option, Option), EventLoopError> { - let network = &mut self.network; + async fn read_network_and_requests(&mut self, mut network: impl Network) -> Result<(Option, Option), EventLoopError> { let requests = &mut self.requests; let throttle = &mut self.throttle; let delay = self.options.throttle; @@ -343,17 +316,14 @@ mod test { use std::pin::Pin; use std::task::{Poll, Context}; use std::future::Future; - use std::sync::Arc; use std::thread; use super::MqttEventLoop; use crate::state::MqttState; - use crate::{Request, MqttOptions, ReconnectOptions}; + use crate::{Request, MqttOptions}; - fn eventloop(requests: Receiver) -> (MqttEventLoop, Sender>, Receiver>) { + fn eventloop(requests: Receiver) -> MqttEventLoop { let options = MqttOptions::new("dummy", "test.mosquitto.org", 1883); let state = MqttState::new(options.clone()); - let (network, server_tx, server_rx) = IO::new(); - let network = Box::new(network); let requests = Box::new(requests); let (queue_limit_tx, queue_limit_rx) = channel(1); @@ -363,13 +333,12 @@ mod test { options, queue_limit_rx, queue_limit_tx, - network, requests, throttle_flag: false, throttle: time::Instant::now() }; - (eventloop, server_tx, server_rx) + eventloop } @@ -393,9 +362,11 @@ mod test { #[tokio::test] async fn connection_should_timeout_on_time() { let (requests_tx, requests_rx) = channel(5); - let (mut eventloop, tx, rx) = eventloop(requests_rx); + let mut eventloop = eventloop(requests_rx); + let (network, server_tx, server_rx) = IO::new(); + let mut network = Box::new(network); let start = Instant::now(); - let o = super::mqtt_connect(&eventloop.options, &mut eventloop.network, &mut eventloop.state).await; + let o = super::mqtt_connect(&eventloop.options, &mut network, &mut eventloop.state).await; let elapsed = start.elapsed(); match o { @@ -414,13 +385,14 @@ mod test { async fn throttled_requests_works_with_correct_delays_between_requests() { let (requests_tx, requests_rx) = channel(5); - let (mut eventloop, tx, mut rx) = eventloop(requests_rx); - let mut eventloop = eventloop.stream(ReconnectOptions::Never); + let mut eventloop = eventloop(requests_rx); + let mut eventloop = eventloop.stream(); thread::spawn(move || { requests(requests_tx); }); + /* for _ in 0..10 { dbg!(); let _notification = eventloop.next().await; @@ -428,6 +400,7 @@ mod test { let packet = rx.next().await.unwrap(); println!("Packet = {:?}", packet); } + */ } #[test] diff --git a/rumq-client/src/lib.rs b/rumq-client/src/lib.rs index c67b0b994..486380e73 100644 --- a/rumq-client/src/lib.rs +++ b/rumq-client/src/lib.rs @@ -1,4 +1,4 @@ -#![recursion_limit = "300"] +#![recursion_limit = "512"] #[macro_use] extern crate log; @@ -51,19 +51,6 @@ pub enum Command { Resume, } -/// Control how the connection is re-established if it is lost. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum ReconnectOptions { - /// Don't automatically reconnect - Never, - /// Always reconnect automatically. - /// Before a reconnection attempt, sleep for the specified amount of time - Always(Duration), - /// Always reconnect automatically. - /// Before a reconnection attempt, sleep for the specified amount of time - Count(u16, Duration), -} - /// Client authentication option for mqtt connect packet #[derive(Clone, Debug)] pub enum SecurityOptions { @@ -277,7 +264,7 @@ impl MqttOptions { #[cfg(test)] mod test { - use super::{MqttOptions, ReconnectOptions}; + use super::MqttOptions; #[test] #[should_panic] diff --git a/rumq-client/src/state.rs b/rumq-client/src/state.rs index cff9a49a0..0320b7784 100644 --- a/rumq-client/src/state.rs +++ b/rumq-client/src/state.rs @@ -333,8 +333,6 @@ impl MqttState { #[cfg(test)] mod test { - use std::sync::Arc; - use super::{MqttConnectionStatus, MqttState, Packet, StateError}; use crate::{MqttOptions, Notification}; use rumq_core::*; diff --git a/rumq-core/Cargo.toml b/rumq-core/Cargo.toml index 628d1e297..a5c9c8ed7 100644 --- a/rumq-core/Cargo.toml +++ b/rumq-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rumq-core" -version = "0.1.0-alpha.1" +version = "0.1.0-alpha.2" description = "Serializes and deserializes mqtt byte stream" license = "MIT" repository = "https://github.com/tekjar/rumq"