Skip to content

Commit

Permalink
observable connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
balazsgrill committed Mar 24, 2024
1 parent 3096c9f commit 4f67a9b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
4 changes: 2 additions & 2 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type stateField[Type any] struct {
}

func (state *stateField[Type]) send(command string) {
state.actuator.send(fmt.Sprintf("{\"%s\":%s}", state.key, command))
state.actuator.send(fmt.Sprintf("{\"%s\":\"%s\"}", state.key, command))
}

func (d *stateField[_]) Close() error {
Expand Down Expand Up @@ -91,7 +91,7 @@ func (field *onOffStateField) Process(state map[string]interface{}) {
field.values <- true
}
if strings.EqualFold("OFF", str) {
field.values <- true
field.values <- false
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion paho/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package paho

import (
"log"

"github.com/balazsgrill/hass"
mqtt "github.com/eclipse/paho.mqtt.golang"
)

type PahoPubSub struct {
mqtt.Client
subscriptions map[string][]func(topic string, payload []byte)
connstate chan bool
}

var _ hass.IPubSubConnecterRuntime = &PahoPubSub{}

func New(options *mqtt.ClientOptions) hass.IPubSubConnecterRuntime {
result := &PahoPubSub{
subscriptions: make(map[string][]func(topic string, payload []byte)),
connstate: make(chan bool),
}
options = options.SetOnConnectHandler(result.onConnect)
options = options.SetOnConnectHandler(result.onConnect).SetConnectionLostHandler(func(c mqtt.Client, err error) {
if err != nil {
log.Println(err)
}
result.connstate <- false
}).SetAutoReconnect(true)

result.Client = mqtt.NewClient(options)
return result
}
Expand All @@ -27,6 +37,11 @@ func (ps *PahoPubSub) onConnect(client mqtt.Client) {
subscribe(ps.Client, topic, callback)
}
}
ps.connstate <- true
}

func (ps *PahoPubSub) ConnectionState() chan bool {
return ps.connstate
}

func (ps *PahoPubSub) Send(topic string, payload []byte) error {
Expand Down
1 change: 1 addition & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type IPubSubRuntime interface {
type IConnecter interface {
Connect() error
Disconnect()
ConnectionState() chan bool
}

type IPubSubConnecterRuntime interface {
Expand Down

0 comments on commit 4f67a9b

Please sign in to comment.