Skip to content

Commit

Permalink
Add support for MTLS, and other MQTT options
Browse files Browse the repository at this point in the history
  • Loading branch information
pablovicentecybus committed Apr 11, 2024
1 parent 7469abc commit cfc2b39
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 36 deletions.
21 changes: 13 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ module github.com/pablitovicente/mqtt-to-nats

go 1.19

require github.com/eclipse/paho.mqtt.golang v1.4.2
require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/nats-io/nats.go v1.33.1
github.com/pablitovicente/mqtt-load-generator v1.0.6
)

require (
github.com/gorilla/websocket v1.4.2 // indirect
github.com/nats-io/nats.go v1.20.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pablitovicente/mqtt-load-generator v1.0.1 // indirect
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.18.0 // indirect
)
39 changes: 20 additions & 19 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/nats-io/nats.go v1.20.0 h1:T8JJnQfVSdh1CzGiwAOv5hEobYCBho/0EupGznYw0oM=
github.com/nats-io/nats.go v1.20.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pablitovicente/mqtt-load-generator v1.0.0 h1:KEYYZV6zNrciPvhZ5nHUh9Usq0/slXnVoULRmjonyME=
github.com/pablitovicente/mqtt-load-generator v1.0.0/go.mod h1:GRiQWQg2cNwnMxFk/hxRPYntrHC+BTZgrQRbKt/ifHo=
github.com/pablitovicente/mqtt-load-generator v1.0.1 h1:LpZFLQEhnVrkluucIJrT6Z9NUVFU6DLzYHouXuCUpi8=
github.com/pablitovicente/mqtt-load-generator v1.0.1/go.mod h1:GRiQWQg2cNwnMxFk/hxRPYntrHC+BTZgrQRbKt/ifHo=
github.com/pablitovicente/mqtt-load-generator v1.0.6 h1:lLhG/lwMzV6dRO4wakUVsSC0vwrw+LRZbBw6pVFrCQQ=
github.com/pablitovicente/mqtt-load-generator v1.0.6/go.mod h1:SVKDRcaOFb7oR0R9gkiZ4JdF3cxJFGkLdWVuVkUbMtc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 h1:qWPm9rbaAMKs8Bq/9LRpbMqxWRVUAQwMI9fVrssnTfw=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
58 changes: 49 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ func main() {
host := flag.String("h", "localhost", "MQTT host")
port := flag.Int("p", 1883, "MQTT port")
qos := flag.Int("q", 1, "MQTT QoS used by all clients")
cert := flag.String("cert", "", "Path to TLS certificate file")
ca := flag.String("ca", "", "Path to TLS CA file")
key := flag.String("key", "", "Path to TLS key file")
insecure := flag.Bool("insecure", false, "Set to true to allow self signed certificates")
mqtts := flag.Bool("mqtts", false, "Set to true to use MQTTS")
cleanSession := flag.Bool("cleanSession", true, "Set to true for clean MQTT sessions or false to keep session")
clientID := flag.String("clientID", "mqtt-to-nats-bridge", "Custom MQTT clientID")
keepAliveTimeout := flag.Int64("keepAliveTimeout", 5000, "Set the amount of time (in seconds) that the client should wait before sending a PING request to the broker")
natsURL := flag.String("N", "nats://localhost:4222", "NATS Stream server url for example nats://localhost:4222")
natsStreamName := flag.String("SN", "collector", "NATS Stream name used to store MQTT forwarded messages")
natsStreamReplicas := flag.Int("R", 1, "Number of NATS Stream replicas")
Expand All @@ -42,19 +50,30 @@ func main() {

// General Client Config
mqttClientConfig := MQTTClient.Config{
TargetTopic: targetTopic,
Username: username,
Password: password,
Host: host,
Port: port,
QoS: qos,
TargetTopic: targetTopic,
Username: username,
Password: password,
Host: host,
Port: port,
QoS: qos,
CleanSession: cleanSession,
ClientID: clientID,
KeepAliveTimeout: keepAliveTimeout,
Insecure: insecure,
MQTTS: mqtts,
}

if TLSOptionsSet() {
mqttClientConfig.TLSConfigured = true
mqttClientConfig.CA = ca
mqttClientConfig.Cert = cert
mqttClientConfig.Key = key
}

rand.Seed(time.Now().UnixNano())
updates := make(chan int)

mqttClient := MQTTClient.Client{
ID: rand.Intn(100000),
Config: mqttClientConfig,
Updates: updates,
}
Expand All @@ -64,7 +83,7 @@ func main() {
nc, _ := nats.Connect(*natsURL)
js, _ := nc.JetStream(nats.PublishAsyncMaxPending(*maxInflightMessages))

streamInfo, err := js.AddStream(&nats.StreamConfig{
_, err := js.AddStream(&nats.StreamConfig{
Name: *natsStreamName,
Subjects: []string{*natsStreamName},
Replicas: *natsStreamReplicas,
Expand All @@ -74,7 +93,6 @@ func main() {
if err != nil {
fmt.Println("Error creating NATS Stream:", err)
}
fmt.Println("Stream info", streamInfo)

mqttClient.Connection.Subscribe(*targetTopic, byte(*qos), func(c mqtt.Client, m mqtt.Message) {
_, err := js.PublishAsync(*natsStreamName, m.Payload())
Expand All @@ -85,3 +103,25 @@ func main() {

select {}
}

func TLSOptionsSet() bool {
foundCert := false
foundCA := false
foundKey := false

flag.Visit(func(f *flag.Flag) {
if f.Name == "cert" {
foundCert = true
}

if f.Name == "ca" {
foundCA = true
}

if f.Name == "key" {
foundKey = true
}
})

return foundCA && foundCert && foundKey
}

0 comments on commit cfc2b39

Please sign in to comment.