-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #93 from Azure-Samples/ok/rust-getting-started
rust getting started
- Loading branch information
Showing
9 changed files
with
323 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
[workspace] | ||
|
||
members = [ | ||
"scenarios/getting_started/rust/getting_started", | ||
"mqttclients/rust/conn", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
[package] | ||
name = "conn" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
dotenv = "0.15.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Define the connection_settings module | ||
pub mod connection_settings { | ||
// Import necessary modules | ||
use std::collections::HashMap; | ||
use std::env; | ||
use std::error::Error; | ||
|
||
// Define a struct to represent the connection settings | ||
#[derive(Debug)] | ||
pub struct ConnectionSettings { | ||
pub mqtt_host_name: String, | ||
pub mqtt_tcp_port: u16, | ||
pub mqtt_use_tls: bool, | ||
pub mqtt_clean_session: bool, | ||
pub mqtt_keep_alive_in_seconds: u16, | ||
pub mqtt_client_id: String, | ||
pub mqtt_username: Option<String>, | ||
pub mqtt_password: Option<String>, | ||
pub mqtt_ca_file: Option<String>, | ||
pub mqtt_cert_file: Option<String>, | ||
pub mqtt_key_file: Option<String>, | ||
pub mqtt_key_file_password: Option<String>, | ||
} | ||
|
||
// Implement a method to convert strings to integers | ||
fn convert_to_int(value: &str, name: &str) -> Result<u16, Box<dyn Error>> { | ||
value.parse::<u16>().map_err(|_| format!("{} must be an integer", name).into()) | ||
} | ||
|
||
// Implement a method to convert strings to booleans | ||
fn convert_to_bool(value: &str, name: &str) -> Result<bool, Box<dyn Error>> { | ||
match value.to_lowercase().as_str() { | ||
"true" => Ok(true), | ||
"false" => Ok(false), | ||
_ => Err(format!("{} must be true or false", name).into()), | ||
} | ||
} | ||
|
||
// Define other helper functions or associated items here | ||
pub fn get_connection_settings(env_filename: Option<&str>) -> Result<ConnectionSettings, Box<dyn Error>> { | ||
// Load environment variables from .env file if provided | ||
if let Some(filename) = env_filename { | ||
dotenv::from_path(filename)?; | ||
} | ||
|
||
// Read environment variables into a HashMap | ||
let mut env_values: HashMap<String, String> = env::vars().collect(); | ||
|
||
// Define default values | ||
let default_values: HashMap<&str, &str> = [ | ||
("MQTT_TCP_PORT", "8883"), | ||
("MQTT_USE_TLS", "true"), | ||
("MQTT_CLEAN_SESSION", "true"), | ||
("MQTT_KEEP_ALIVE_IN_SECONDS", "30"), | ||
] | ||
.iter() | ||
.cloned() | ||
.collect(); | ||
|
||
// Merge default values with environment variables | ||
for (key, value) in default_values.iter() { | ||
env_values.entry(key.to_string()).or_insert(value.to_string()); | ||
} | ||
|
||
// Extract connection settings from environment variables | ||
let mqtt_host_name = env_values.remove("MQTT_HOST_NAME").ok_or("MQTT_HOST_NAME must be set")?; | ||
let mqtt_tcp_port = convert_to_int(env_values.remove("MQTT_TCP_PORT").unwrap().as_str(), "MQTT_TCP_PORT")?; | ||
let mqtt_use_tls = convert_to_bool(env_values.remove("MQTT_USE_TLS").unwrap().as_str(), "MQTT_USE_TLS")?; | ||
let mqtt_clean_session = convert_to_bool(env_values.remove("MQTT_CLEAN_SESSION").unwrap().as_str(), "MQTT_CLEAN_SESSION")?; | ||
let mqtt_keep_alive_in_seconds = convert_to_int(env_values.remove("MQTT_KEEP_ALIVE_IN_SECONDS").unwrap().as_str(), "MQTT_KEEP_ALIVE_IN_SECONDS")?; | ||
let mqtt_client_id = env_values.remove("MQTT_CLIENT_ID").unwrap_or_else(|| "".to_string()); | ||
let mqtt_username = env_values.remove("MQTT_USERNAME"); | ||
let mqtt_password = env_values.remove("MQTT_PASSWORD"); | ||
let mqtt_ca_file = env_values.remove("MQTT_CA_FILE"); | ||
let mqtt_cert_file = env_values.remove("MQTT_CERT_FILE"); | ||
let mqtt_key_file = env_values.remove("MQTT_KEY_FILE"); | ||
let mqtt_key_file_password = env_values.remove("MQTT_KEY_FILE_PASSWORD"); | ||
|
||
// Construct ConnectionSettings struct | ||
let settings = ConnectionSettings { | ||
mqtt_host_name, | ||
mqtt_tcp_port, | ||
mqtt_use_tls, | ||
mqtt_clean_session, | ||
mqtt_keep_alive_in_seconds, | ||
mqtt_client_id, | ||
mqtt_username, | ||
mqtt_password, | ||
mqtt_ca_file, | ||
mqtt_cert_file, | ||
mqtt_key_file, | ||
mqtt_key_file_password, | ||
}; | ||
|
||
Ok(settings) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
[package] | ||
name = "getting_started" | ||
version = "0.1.0" | ||
edition = "2021" | ||
|
||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
||
[dependencies] | ||
futures = "0.3" | ||
# openssl = "0.10.36" | ||
paho-mqtt = { version = "0.12.3", features = ["bundled"] } | ||
dotenv = "0.15.0" | ||
env_logger = "0.11.2" | ||
ctrlc = "3.3.0" | ||
conn = { path = "../../../../mqttclients/rust/conn" } |
161 changes: 161 additions & 0 deletions
161
scenarios/getting_started/rust/getting_started/src/main.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
use conn::connection_settings; | ||
use dotenv::dotenv; | ||
use paho_mqtt as mqtt; | ||
use paho_mqtt::SslOptionsBuilder; | ||
use paho_mqtt::SslVersion; | ||
use paho_mqtt::MQTT_VERSION_5; | ||
use std::time::Duration; | ||
|
||
fn main() { | ||
// Initialize the logger from the environment | ||
env_logger::init(); | ||
|
||
dotenv().ok(); | ||
let conn_settings = connection_settings::get_connection_settings(Some(".env")).unwrap(); | ||
let mqtt_clean_session = conn_settings.mqtt_clean_session; | ||
if !mqtt_clean_session { | ||
println!("This sample does not support connecting with existing sessions"); | ||
std::process::exit(1); | ||
} | ||
let port = conn_settings.mqtt_tcp_port; | ||
let hostname = conn_settings.mqtt_host_name; | ||
let keepalive = conn_settings.mqtt_keep_alive_in_seconds; | ||
let client_id = conn_settings.mqtt_client_id; | ||
let mut protocol = "tcp"; | ||
|
||
// Set up connection builder | ||
let mut conn_opts_builder = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5); | ||
conn_opts_builder | ||
.keep_alive_interval(Duration::from_secs(keepalive.into())) | ||
.clean_start(mqtt_clean_session); | ||
|
||
if let Some(username) = &conn_settings.mqtt_username { | ||
conn_opts_builder.user_name(username); | ||
} | ||
|
||
if let Some(password) = &conn_settings.mqtt_password { | ||
conn_opts_builder.password(password); | ||
} | ||
|
||
if conn_settings.mqtt_use_tls { | ||
protocol = "mqtts"; | ||
|
||
// Create an SSL options builder | ||
let mut ssl_opts_builder = SslOptionsBuilder::new(); | ||
ssl_opts_builder.ssl_version(SslVersion::Tls_1_2); | ||
|
||
// Trust store (CA file) | ||
if let Some(ca_file_path) = conn_settings.mqtt_ca_file { | ||
// Handle the Result returned by trust_store | ||
match ssl_opts_builder.trust_store(ca_file_path) { | ||
Ok(_) => { | ||
// Trust store loaded successfully, continue | ||
} | ||
Err(err) => { | ||
eprintln!("Failed to load trust store: {:?}", err); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
// Certificate file | ||
if let Some(cert_file_path) = conn_settings.mqtt_cert_file { | ||
// ssl_opts_builder = ssl_opts_builder.key_store(cert_file_path); | ||
match ssl_opts_builder.key_store(cert_file_path) { | ||
Ok(_) => { | ||
// Key store loaded successfully, continue | ||
} | ||
Err(err) => { | ||
eprintln!("Failed to load key store: {:?}", err); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
if let Some(key_file_path) = conn_settings.mqtt_key_file { | ||
match ssl_opts_builder.private_key(key_file_path) { | ||
Ok(_) => { | ||
// Key store loaded successfully, continue | ||
} | ||
Err(err) => { | ||
eprintln!("Failed to load private key: {:?}", err); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
if let Some(key_password) = conn_settings.mqtt_key_file_password { | ||
ssl_opts_builder.private_key_password(key_password); | ||
} else { | ||
// Handle the case where key_password is None, if needed. | ||
} | ||
let ssl_opts = ssl_opts_builder.finalize(); | ||
conn_opts_builder.ssl_options(ssl_opts); | ||
} | ||
|
||
let host = format!("{}://{}:{}", protocol, hostname, port); | ||
println!("Host is {}", host); | ||
|
||
let create_options = mqtt::CreateOptionsBuilder::new() | ||
.server_uri(host) | ||
.client_id(client_id) | ||
.finalize(); | ||
|
||
// Set up MQTT client | ||
let client = mqtt::Client::new(create_options).unwrap_or_else(|err| { | ||
eprintln!("Error creating the client: {}", err); | ||
std::process::exit(1); | ||
}); | ||
|
||
// Initialize the consumer before connecting | ||
let rx = client.start_consuming(); | ||
|
||
let conn_opts = conn_opts_builder.finalize(); | ||
|
||
// Connect to the broker | ||
if let Err(err) = client.connect(conn_opts) { | ||
eprintln!("Failed to connect to the broker: {}", err); | ||
std::process::exit(1); | ||
} | ||
println!("Connected to the broker"); | ||
|
||
// Subscribe to a topic | ||
// sample/# | ||
if let Err(err) = client.subscribe("sample/#", mqtt::QOS_0) { | ||
eprintln!("Failed to subscribe to topics: {}", err); | ||
std::process::exit(1); | ||
} | ||
println!("Subscribed to topics"); | ||
|
||
// Publish a message | ||
let msg = mqtt::Message::new("sample/topic1", "hello world!", mqtt::QOS_0); | ||
if let Err(err) = client.publish(msg) { | ||
eprintln!("Failed to publish message: {}", err); | ||
std::process::exit(1); | ||
} | ||
println!("Published message"); | ||
|
||
// ^C handler will stop the consumer, breaking us out of the loop, below | ||
let ctrlc_cli = client.clone(); | ||
ctrlc::set_handler(move || { | ||
ctrlc_cli.stop_consuming(); | ||
}) | ||
.expect("Error setting Ctrl-C handler"); | ||
|
||
// Just loop on incoming messages. | ||
println!("\nWaiting for messages.."); | ||
for msg in rx.iter() { | ||
if let Some(msg) = msg { | ||
println!("Message received is {}", msg); | ||
} else if client.is_connected() { | ||
break; | ||
} | ||
} | ||
|
||
// Disconnect from the broker | ||
if let Err(err) = client.disconnect(None) { | ||
eprintln!("Failed to disconnect from the broker: {}", err); | ||
std::process::exit(1); | ||
} | ||
println!("Disconnected from the broker"); | ||
} |