From 4f67a9bc95c49dab66a27023b95eff1e75a272c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bal=C3=A1zs=20Grill?= Date: Sun, 24 Mar 2024 08:14:58 +0100 Subject: [PATCH] observable connection state --- base.go | 4 ++-- paho/pubsub.go | 17 ++++++++++++++++- pubsub.go | 1 + 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/base.go b/base.go index 001fb14..745f2bd 100644 --- a/base.go +++ b/base.go @@ -46,7 +46,7 @@ type stateField[Type any] struct { } func (state *stateField[Type]) send(command string) { - state.actuator.send(fmt.Sprintf("{\"%s\":%s}", state.key, command)) + state.actuator.send(fmt.Sprintf("{\"%s\":\"%s\"}", state.key, command)) } func (d *stateField[_]) Close() error { @@ -91,7 +91,7 @@ func (field *onOffStateField) Process(state map[string]interface{}) { field.values <- true } if strings.EqualFold("OFF", str) { - field.values <- true + field.values <- false } } } diff --git a/paho/pubsub.go b/paho/pubsub.go index fcb065b..e2190de 100644 --- a/paho/pubsub.go +++ b/paho/pubsub.go @@ -1,6 +1,8 @@ package paho import ( + "log" + "github.com/balazsgrill/hass" mqtt "github.com/eclipse/paho.mqtt.golang" ) @@ -8,6 +10,7 @@ import ( type PahoPubSub struct { mqtt.Client subscriptions map[string][]func(topic string, payload []byte) + connstate chan bool } var _ hass.IPubSubConnecterRuntime = &PahoPubSub{} @@ -15,8 +18,15 @@ var _ hass.IPubSubConnecterRuntime = &PahoPubSub{} func New(options *mqtt.ClientOptions) hass.IPubSubConnecterRuntime { result := &PahoPubSub{ subscriptions: make(map[string][]func(topic string, payload []byte)), + connstate: make(chan bool), } - options = options.SetOnConnectHandler(result.onConnect) + options = options.SetOnConnectHandler(result.onConnect).SetConnectionLostHandler(func(c mqtt.Client, err error) { + if err != nil { + log.Println(err) + } + result.connstate <- false + }).SetAutoReconnect(true) + result.Client = mqtt.NewClient(options) return result } @@ -27,6 +37,11 @@ func (ps *PahoPubSub) onConnect(client mqtt.Client) { subscribe(ps.Client, topic, callback) } } + ps.connstate <- true +} + +func (ps *PahoPubSub) ConnectionState() chan bool { + return ps.connstate } func (ps *PahoPubSub) Send(topic string, payload []byte) error { diff --git a/pubsub.go b/pubsub.go index 95de49b..0966852 100644 --- a/pubsub.go +++ b/pubsub.go @@ -8,6 +8,7 @@ type IPubSubRuntime interface { type IConnecter interface { Connect() error Disconnect() + ConnectionState() chan bool } type IPubSubConnecterRuntime interface {