Skip to content

Commit

Permalink
refactor device provider
Browse files Browse the repository at this point in the history
  • Loading branch information
IngoRoessner committed Apr 19, 2024
1 parent d0df400 commit 9af4be3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 40 deletions.
49 changes: 24 additions & 25 deletions pkg/providers/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type DeviceProvider struct {
maxAge time.Duration
mux sync.Mutex
handledProtocols map[string]bool
foundDevices int64 //needed to exit loop if no matching device is found
}

type TokenGenerator interface {
Expand All @@ -70,7 +69,7 @@ type DeviceTypeProviderInterface interface {
GetDeviceType(deviceTypeId string) (dt models.DeviceType, err error)
}

func (this *DeviceProvider) GetNextDevice() (device model.PermDevice, err error) {
func (this *DeviceProvider) GetNextDevice() (device model.PermDevice, isFirstDeviceOfBatchLoopRepeat bool, err error) {
this.mux.Lock()
defer this.mux.Unlock()
if time.Since(this.lastRequest) > this.maxAge {
Expand All @@ -80,24 +79,33 @@ func (this *DeviceProvider) GetNextDevice() (device model.PermDevice, err error)
this.batch = []model.PermDevice{}
this.nextBatchIndex = 0
}
backToBeginningCount := 0
for device.Id == "" && err == nil {
device, err = this.getNextDeviceFromBatch()
if err == nil {
this.lastDevice = device
if this.deviceMatchesProtocol(device) {
this.foundDevices = this.foundDevices + 1
} else {
if !this.deviceMatchesProtocol(device) {
device = model.PermDevice{}
}
}
if err == EmptyBatch {
err = this.loadBatch()
backToBeginning := false
this.nextBatchIndex = 0
this.batch, backToBeginning, err = this.getBatch(this.lastDevice)
if err != nil {
return device, err
return device, isFirstDeviceOfBatchLoopRepeat, err
}
if backToBeginning {
isFirstDeviceOfBatchLoopRepeat = true
this.lastDevice = model.PermDevice{}
backToBeginningCount++
if backToBeginningCount >= 2 {
return device, isFirstDeviceOfBatchLoopRepeat, ErrNoMatchingDevice
}
}
}
}
return device, err
return device, isFirstDeviceOfBatchLoopRepeat, err
}

var EmptyBatch = errors.New("empty batch")
Expand Down Expand Up @@ -145,19 +153,19 @@ func (this *DeviceProvider) GetDevice(id string) (result model.PermDevice, err e
return result, errors.New("device not found")
}

func (this *DeviceProvider) loadBatch() error {
func (this *DeviceProvider) getBatch(from model.PermDevice) (batch []model.PermDevice, backToTheBeginning bool, err error) {
if this.config.Debug {
log.Println("load batch")
}
token, err := this.tokengen.Access()
if err != nil {
return err
return nil, false, err
}
var after *permmodel.ListAfter
if this.lastDevice.Id != "" {
after = &permmodel.ListAfter{
SortFieldValue: this.lastDevice.LocalId,
Id: this.lastDevice.Id,
SortFieldValue: from.LocalId,
Id: from.Id,
}
if this.config.Debug {
log.Printf("use after %#v", *after)
Expand All @@ -173,20 +181,13 @@ func (this *DeviceProvider) loadBatch() error {
},
})
if err != nil {
return err
return nil, false, err
}
if len(list) == 0 {
this.lastDevice = model.PermDevice{}
if this.foundDevices == 0 {
this.batch = nil
this.nextBatchIndex = 0
return ErrNoMatchingDevice
}
backToTheBeginning = true
if this.config.Debug {
log.Println("load batch from beginning")
}
this.foundDevices = 0
this.lastRequest = time.Now()
list, err = client.List[[]model.PermDevice](this.permissions, token, "devices", permmodel.ListOptions{
QueryListCommons: permmodel.QueryListCommons{
Limit: this.config.PermissionsRequestDeviceBatchSize,
Expand All @@ -195,12 +196,10 @@ func (this *DeviceProvider) loadBatch() error {
},
})
if err != nil {
return err
return list, backToTheBeginning, nil
}
}
this.batch = list
this.nextBatchIndex = 0
return nil
return list, backToTheBeginning, nil
}

func DeviceTypeUsesHandledProtocol(dt models.DeviceType, handledProtocols map[string]bool) (result bool) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/tests/mqttdeviceloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tests
import (
"context"
"encoding/json"
"errors"
"github.com/SENERGY-Platform/connection-check-v2/pkg/configuration"
"github.com/SENERGY-Platform/connection-check-v2/pkg/connectionlog"
"github.com/SENERGY-Platform/connection-check-v2/pkg/model"
Expand Down Expand Up @@ -256,8 +257,10 @@ func TestMqttDeviceProviderWithoutMqttDevices(t *testing.T) {
return
}

t.Log(deviceProvider.GetNextDevice())

_, _, err = deviceProvider.GetNextDevice()
if !errors.Is(err, providers.ErrNoMatchingDevice) {
t.Error(err)
}
}

func createDummyMqttDevices(config configuration.Config) error {
Expand Down
37 changes: 24 additions & 13 deletions pkg/worker/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ConnectionLogger interface {
}

type DeviceProvider interface {
GetNextDevice() (device model.PermDevice, err error)
GetNextDevice() (device model.PermDevice, isFirstDeviceOfBatchLoopRepeat bool, err error)
GetDevice(id string) (result model.PermDevice, err error)
}

Expand All @@ -103,6 +103,7 @@ func (this *Worker) RunDeviceLoop(ctx context.Context, wg *sync.WaitGroup) error
if this.config.DeviceCheckInterval == "" || this.config.DeviceCheckInterval == "-" {
return nil
}
batchLoopStartTime := time.Now()

dur, err := time.ParseDuration(this.config.DeviceCheckInterval)
if err != nil {
Expand All @@ -116,7 +117,16 @@ func (this *Worker) RunDeviceLoop(ctx context.Context, wg *sync.WaitGroup) error
for {
select {
case <-t.C:
errHandler(this.runDeviceCheck())
var isFirstDeviceOfBatchLoopRepeat bool
isFirstDeviceOfBatchLoopRepeat, err = this.runDeviceCheck()
errHandler(err)
if isFirstDeviceOfBatchLoopRepeat {
since := time.Since(batchLoopStartTime)
if since < this.minimalRecheckWait {
time.Sleep(this.minimalRecheckWait - since)
}
batchLoopStartTime = time.Now()
}
case <-ctx.Done():
t.Stop()
return
Expand Down Expand Up @@ -163,42 +173,43 @@ func getFatalErrOnRepeatHandler(maxCount int64) func(error) {

const ConnectionStateAnnotation = "connected"

func (this *Worker) runDeviceCheck() error {
func (this *Worker) runDeviceCheck() (isFirstDeviceOfBatchLoopRepeat bool, err error) {
this.metrics.DevicesChecked.Inc()
start := time.Now()
device, err := this.deviceprovider.GetNextDevice()
var device model.PermDevice
device, isFirstDeviceOfBatchLoopRepeat, err = this.deviceprovider.GetNextDevice()
if errors.Is(err, providers.ErrNoMatchingDevice) {
log.Println("no device to check found")
return nil
return isFirstDeviceOfBatchLoopRepeat, nil
}
if err != nil {
return err
return isFirstDeviceOfBatchLoopRepeat, err
}
topics, err := this.topic(this.config, this.deviceTypeProvider, device)
if err == common.NoSubscriptionExpected {
return nil
return isFirstDeviceOfBatchLoopRepeat, nil
}
if err != nil {
return err
return isFirstDeviceOfBatchLoopRepeat, err
}
isOnline, err := this.checkTopics(device, topics)
if err != nil {
return err
return isFirstDeviceOfBatchLoopRepeat, err
}
this.metrics.DeviceCheckLatencyMs.Set(float64(time.Since(start).Milliseconds()))
annotation, ok := device.Annotations[ConnectionStateAnnotation]
if !ok {
return this.updateDeviceState(device, isOnline)
return isFirstDeviceOfBatchLoopRepeat, this.updateDeviceState(device, isOnline)
}
expected, ok := annotation.(bool)
if !ok {
log.Printf("WARNING: unexpected device state anotation in %#v", device)
return this.updateDeviceState(device, isOnline)
return isFirstDeviceOfBatchLoopRepeat, this.updateDeviceState(device, isOnline)
}
if expected != isOnline {
return this.updateDeviceState(device, isOnline)
return isFirstDeviceOfBatchLoopRepeat, this.updateDeviceState(device, isOnline)
}
return nil
return isFirstDeviceOfBatchLoopRepeat, nil
}

func (this *Worker) checkTopics(device model.PermDevice, topics []string) (onlineSubscriptionExists bool, err error) {
Expand Down

0 comments on commit 9af4be3

Please sign in to comment.