From cfc2b39a16d9d0f3b8e4033e0603f3a18778cd58 Mon Sep 17 00:00:00 2001 From: Pablo Vicente Date: Thu, 11 Apr 2024 09:49:03 +0200 Subject: [PATCH] Add support for MTLS, and other MQTT options --- go.mod | 21 +++++++++++++-------- go.sum | 39 +++++++++++++++++++------------------- main.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 82 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 2de421f..8332e8e 100644 --- a/go.mod +++ b/go.mod @@ -2,15 +2,20 @@ module github.com/pablitovicente/mqtt-to-nats go 1.19 -require github.com/eclipse/paho.mqtt.golang v1.4.2 +require ( + github.com/eclipse/paho.mqtt.golang v1.4.2 + github.com/nats-io/nats.go v1.33.1 + github.com/pablitovicente/mqtt-load-generator v1.0.6 +) require ( - github.com/gorilla/websocket v1.4.2 // indirect - github.com/nats-io/nats.go v1.20.0 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/klauspost/compress v1.17.7 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/pablitovicente/mqtt-load-generator v1.0.1 // indirect - golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect - golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.18.0 // indirect ) diff --git a/go.sum b/go.sum index 843a34f..87919b8 100644 --- a/go.sum +++ b/go.sum @@ -1,30 +1,31 @@ github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/nats-io/nats.go v1.20.0 h1:T8JJnQfVSdh1CzGiwAOv5hEobYCBho/0EupGznYw0oM= -github.com/nats-io/nats.go v1.20.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= -github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= +github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/pablitovicente/mqtt-load-generator v1.0.0 h1:KEYYZV6zNrciPvhZ5nHUh9Usq0/slXnVoULRmjonyME= -github.com/pablitovicente/mqtt-load-generator v1.0.0/go.mod h1:GRiQWQg2cNwnMxFk/hxRPYntrHC+BTZgrQRbKt/ifHo= -github.com/pablitovicente/mqtt-load-generator v1.0.1 h1:LpZFLQEhnVrkluucIJrT6Z9NUVFU6DLzYHouXuCUpi8= -github.com/pablitovicente/mqtt-load-generator v1.0.1/go.mod h1:GRiQWQg2cNwnMxFk/hxRPYntrHC+BTZgrQRbKt/ifHo= +github.com/pablitovicente/mqtt-load-generator v1.0.6 h1:lLhG/lwMzV6dRO4wakUVsSC0vwrw+LRZbBw6pVFrCQQ= +github.com/pablitovicente/mqtt-load-generator v1.0.6/go.mod h1:SVKDRcaOFb7oR0R9gkiZ4JdF3cxJFGkLdWVuVkUbMtc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= -golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index 0618e0e..e9c0c95 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,14 @@ func main() { host := flag.String("h", "localhost", "MQTT host") port := flag.Int("p", 1883, "MQTT port") qos := flag.Int("q", 1, "MQTT QoS used by all clients") + cert := flag.String("cert", "", "Path to TLS certificate file") + ca := flag.String("ca", "", "Path to TLS CA file") + key := flag.String("key", "", "Path to TLS key file") + insecure := flag.Bool("insecure", false, "Set to true to allow self signed certificates") + mqtts := flag.Bool("mqtts", false, "Set to true to use MQTTS") + cleanSession := flag.Bool("cleanSession", true, "Set to true for clean MQTT sessions or false to keep session") + clientID := flag.String("clientID", "mqtt-to-nats-bridge", "Custom MQTT clientID") + keepAliveTimeout := flag.Int64("keepAliveTimeout", 5000, "Set the amount of time (in seconds) that the client should wait before sending a PING request to the broker") natsURL := flag.String("N", "nats://localhost:4222", "NATS Stream server url for example nats://localhost:4222") natsStreamName := flag.String("SN", "collector", "NATS Stream name used to store MQTT forwarded messages") natsStreamReplicas := flag.Int("R", 1, "Number of NATS Stream replicas") @@ -42,19 +50,30 @@ func main() { // General Client Config mqttClientConfig := MQTTClient.Config{ - TargetTopic: targetTopic, - Username: username, - Password: password, - Host: host, - Port: port, - QoS: qos, + TargetTopic: targetTopic, + Username: username, + Password: password, + Host: host, + Port: port, + QoS: qos, + CleanSession: cleanSession, + ClientID: clientID, + KeepAliveTimeout: keepAliveTimeout, + Insecure: insecure, + MQTTS: mqtts, + } + + if TLSOptionsSet() { + mqttClientConfig.TLSConfigured = true + mqttClientConfig.CA = ca + mqttClientConfig.Cert = cert + mqttClientConfig.Key = key } rand.Seed(time.Now().UnixNano()) updates := make(chan int) mqttClient := MQTTClient.Client{ - ID: rand.Intn(100000), Config: mqttClientConfig, Updates: updates, } @@ -64,7 +83,7 @@ func main() { nc, _ := nats.Connect(*natsURL) js, _ := nc.JetStream(nats.PublishAsyncMaxPending(*maxInflightMessages)) - streamInfo, err := js.AddStream(&nats.StreamConfig{ + _, err := js.AddStream(&nats.StreamConfig{ Name: *natsStreamName, Subjects: []string{*natsStreamName}, Replicas: *natsStreamReplicas, @@ -74,7 +93,6 @@ func main() { if err != nil { fmt.Println("Error creating NATS Stream:", err) } - fmt.Println("Stream info", streamInfo) mqttClient.Connection.Subscribe(*targetTopic, byte(*qos), func(c mqtt.Client, m mqtt.Message) { _, err := js.PublishAsync(*natsStreamName, m.Payload()) @@ -85,3 +103,25 @@ func main() { select {} } + +func TLSOptionsSet() bool { + foundCert := false + foundCA := false + foundKey := false + + flag.Visit(func(f *flag.Flag) { + if f.Name == "cert" { + foundCert = true + } + + if f.Name == "ca" { + foundCA = true + } + + if f.Name == "key" { + foundKey = true + } + }) + + return foundCA && foundCert && foundKey +}