-
Notifications
You must be signed in to change notification settings - Fork 1
/
unipitt.go
153 lines (135 loc) · 4.04 KB
/
unipitt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package unipitt
import (
"log"
"github.com/cenkalti/backoff"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
// SysFsRoot default root folder to search for digital inputs
SysFsRoot = "/sys/devices/platform/unipi_plc"
// MsgTrueValue is the MQTT true value to check for
MsgTrueValue = "ON"
)
// Unipitt defines the interface with unipi board
type Unipitt interface {
Poll(pollingInterval int) error
Close()
}
// Handler implements handles all unipi to MQTT interactions
type Handler struct {
readers []DigitalInputReader
writerMap map[string]DigitalOutputWriter
client mqtt.Client
config Configuration
}
// NewHandler prepares and sets up an entire unipitt handler
func NewHandler(broker string, clientID string, caFile string, sysFsRoot string, configFile string) (h *Handler, err error) {
h = &Handler{}
// Check if there's a mapping to be read
if configFile != "" {
log.Printf("Reading configuration file %s\n", configFile)
c, err := configFromFile(configFile)
if err != nil {
log.Printf("Error reading config file %s: %s\n", configFile, err)
} else {
h.config = c
}
}
// Digital writer setup
h.writerMap, err = FindDigitalOutputWriters(sysFsRoot)
if err != nil {
log.Printf("Error creating a map of digital output writers: %s\n", err)
}
// MQTT setup
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientID)
if caFile != "" {
tlsConfig, err := NewTLSConfig(caFile)
if err != nil {
log.Printf("Error reading MQTT CA file %s: %s\n", caFile, err)
return h, err
}
opts.SetTLSConfig(tlsConfig)
}
// Callbacks for subscribe
var cb mqtt.MessageHandler = func(c mqtt.Client, msg mqtt.Message) {
log.Printf("Handling message on topic %s\n", msg.Topic())
// Find corresponding writer
if writer, ok := h.writerMap[h.config.Name(msg.Topic())]; ok {
err := writer.Update(string(msg.Payload()) == MsgTrueValue)
if err != nil {
log.Printf("Error updating digital output with name %s: %s\n", writer.Name, err)
}
} else {
log.Printf("Error matching a writer for given name %s\n", msg.Topic())
}
}
opts.OnConnect = func(c mqtt.Client) {
for name := range h.writerMap {
if token := c.Subscribe(name, 0, cb); token.Wait() && token.Error() != nil {
log.Print(err)
}
// Also subscribe any given mapped topic for the names
topic := h.config.Topic(name)
if topic != name {
if token := c.Subscribe(topic, 0, cb); token.Wait() && token.Error() != nil {
log.Print(err)
}
}
}
}
h.client = mqtt.NewClient(opts)
err = h.connect()
if err != nil {
log.Printf("Error connecting to MQTT broker: %s\n ...", err)
}
// Digital Input reader setup
h.readers, err = FindDigitalInputReaders(sysFsRoot)
if err != nil {
return
}
log.Printf("Created %d digital input reader instances from path %s\n", len(h.readers), sysFsRoot)
return
}
// Poll starts the actual polling and pushing to MQTT
func (h *Handler) Poll(done chan bool, interval int, payload string) (err error) {
events := make(chan *DigitalInputReader)
// Start polling
log.Printf("Initiate polling for %d readers\n", len(h.readers))
for k := range h.readers {
go h.readers[k].Poll(events, interval)
}
// Publish on a trigger
for {
select {
case d := <-events:
if d.Err != nil {
log.Printf("Found error %s for name %s\n", d.Err, d.Name)
} else {
// Determine topic from config
log.Printf("Trigger for name %s, using topic %s\n", d.Name, h.config.Topic(d.Name))
if token := h.client.Publish(h.config.Topic(d.Name), 0, false, payload); token.Wait() && token.Error() != nil {
go backoff.Retry(h.connect, backoff.NewExponentialBackOff())
}
}
case <-done:
log.Println("Handler done polling, coming back ...")
return
}
}
}
// reconnect tries to reconnect the MQTT client to the broker
func (h *Handler) connect() error {
log.Println("Error connecting to MQTT broker ...")
token := h.client.Connect()
token.Wait()
return token.Error()
}
// Close loose ends
func (h *Handler) Close() {
// Close the readers
for k := range h.readers {
h.readers[k].Close()
}
}