Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix]fix some bug of DMI mapper of modbus #96

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Config struct {
GrpcServer GRPCServer `yaml:"grpc_server"`
Common Common `yaml:"common"`
DevInit DevInit `yaml:"dev_init"`
LogLevel string `yaml:"log_level"`
}

type GRPCServer struct {
Expand All @@ -63,9 +64,13 @@ type DevInit struct {
func (c *Config) Parse() error {
var level klog.Level
var loglevel string
var configFile string

pflag.StringVar(&loglevel, "v", "1", "log level")
if err := level.Set(loglevel); err != nil {
return err
}

var configFile string
pflag.StringVar(&configFile, "config-file", defaultConfigFile, "Config file name")

cf, err := ioutil.ReadFile(configFile)
Expand All @@ -75,8 +80,10 @@ func (c *Config) Parse() error {
if err = yaml.Unmarshal(cf, c); err != nil {
return err
}
if err = level.Set(loglevel); err != nil {
return err
if len(c.LogLevel) != 0 && c.LogLevel != "0" {
if serr := level.Set(c.LogLevel); serr != nil {
return serr
}
}

switch c.DevInit.Mode {
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ common:
dev_init:
mode: register #register/configmap
configmap: /opt/kubeedge/deviceProfile.json
log_level: 1
1 change: 1 addition & 0 deletions mappers/modbus-dmi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ common:
edgecore_sock: /etc/kubeedge/dmi.sock
dev_init:
mode: register #register/configmap
log_level: 1
104 changes: 87 additions & 17 deletions mappers/modbus-dmi/device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package device

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"regexp"
"strconv"
"sync"
Expand Down Expand Up @@ -51,17 +53,65 @@ func setVisitor(visitorConfig *modbus.ModbusVisitorConfig, twin *common.Twin, cl
return
}

klog.V(2).Infof("Convert type: %s, value: %s ", twin.PVisitor.PProperty.DataType, twin.Desired.Value)
value, err := common.Convert(twin.PVisitor.PProperty.DataType, twin.Desired.Value)
if err != nil {
klog.Errorf("Convert error: %v", err)
return
}

valueInt, _ := value.(int64)
_, err = client.Set(visitorConfig.Register, visitorConfig.Offset, uint16(valueInt))
if err != nil {
klog.Errorf("Set visitor error: %v %v", err, visitorConfig)
klog.Infof("Convert type: %s, value: %s ", twin.PVisitor.PProperty.DataType, twin.Desired.Value)
value := twin.Desired.Value
switch twin.PVisitor.PProperty.DataType {
case "int":
valueInt, err := strconv.ParseInt(value, 10, 64)
if err != nil {
klog.Errorf("twin %s Convert error: %v", value, err)
return
}
_, err = client.Set(visitorConfig.Register, visitorConfig.Offset, uint16(valueInt))
if err != nil {
klog.Errorf("Set visitor error: %v %v", err, visitorConfig)
return
}
case "float":
valueFloat, err := strconv.ParseFloat(value, 32)
if err != nil {
klog.Errorf("twin %s Convert error: %v", value, err)
return
}
_, err = client.SetString(visitorConfig.Register, visitorConfig.Offset, visitorConfig.Limit, string(ConvertFloat32ToBytes(float32(valueFloat))))
if err != nil {
klog.Errorf("Set visitor error: %v %v", err, visitorConfig)
return
}
case "double":
valueDouble, err := strconv.ParseFloat(value, 64)
if err != nil {
klog.Errorf("twin %s Convert error: %v", value, err)
return
}
_, err = client.SetString(visitorConfig.Register, visitorConfig.Offset, visitorConfig.Limit, string(ConvertFloat64ToBytes(valueDouble)))
if err != nil {
klog.Errorf("Set visitor error: %v %v", err, visitorConfig)
return
}
case "boolean":
valueBool, err := strconv.ParseBool(value)
if err != nil {
klog.Errorf("twin %s Convert error: %v", value, err)
return
}
var valueSet uint16 = 0x0000
if valueBool {
valueSet = 0xFF00
}
_, err = client.Set(visitorConfig.Register, visitorConfig.Offset, valueSet)
if err != nil {
klog.Errorf("Set visitor error: %v %v", err, visitorConfig)
return
}
case "string":
_, err := client.SetString(visitorConfig.Register, visitorConfig.Offset, visitorConfig.Limit, value)
if err != nil {
klog.Errorf("Set visitor error: %v %v", err, visitorConfig)
return
}
default:
klog.Errorf("wrong DataType of twin %s: %s", value, twin.PVisitor.PProperty.DataType)
return
}
}
Expand Down Expand Up @@ -121,17 +171,21 @@ func initTwin(ctx context.Context, dev *modbus.ModbusDev) {
}
setVisitor(&visitorConfig, &dev.Instance.Twins[i], dev.ModbusClient)

twinData := TwinData{Client: dev.ModbusClient,
twinData := TwinData{
Client: dev.ModbusClient,
Name: dev.Instance.Twins[i].PropertyName,
Type: dev.Instance.Twins[i].Desired.Metadatas.Type,
VisitorConfig: &visitorConfig,
Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID),
DeviceName: dev.Instance.Name}
collectCycle := time.Duration(dev.Instance.Twins[i].PVisitor.CollectCycle)
DeviceID: dev.Instance.ID,
DeviceName: dev.Instance.Name,
}
collectCycle := time.Duration(dev.Instance.Twins[i].PVisitor.CollectCycle) * time.Second
// If the collect cycle is not set, set it to 1 second.
if collectCycle == 0 {
collectCycle = 1 * time.Second
}
klog.V(2).InfoS("Start to collect", "twin", twinData.Name, "cycle", collectCycle)
ticker := time.NewTicker(collectCycle)
go func() {
for {
Expand Down Expand Up @@ -171,9 +225,10 @@ func (d *DevPanel) start(ctx context.Context, dev *modbus.ModbusDev) {
dev.ModbusClient = client

go initTwin(ctx, dev)
klog.Infof("All twins has been set, %+v", dev.Instance)

<-ctx.Done()
d.wg.Done()
klog.InfoS("sync wait group done", "deviceID", dev.Instance.ID, "device name", dev.Instance.Name)
}

// DevInit initialize the device data.
Expand Down Expand Up @@ -215,13 +270,15 @@ func NewDevPanel() *DevPanel {
// DevStart start all devices.
func (d *DevPanel) DevStart() {
for id, dev := range d.devices {
klog.V(4).Info("Dev: ", id, dev)
klog.Info("Dev: ", id, dev)
ctx, cancel := context.WithCancel(context.Background())
d.deviceMuxs[id] = cancel
d.wg.Add(1)
go d.start(ctx, dev)
}
klog.Infoln("Wait all sync wait group")
d.wg.Wait()
klog.Infoln("All sync wait group done")
}

func (d *DevPanel) UpdateDevTwins(deviceID string, twins []common.Twin) error {
Expand Down Expand Up @@ -301,7 +358,8 @@ func getTwinData(deviceID string, twin common.Twin, client *modbus.ModbusClient)
VisitorConfig: &visitorConfig,
Topic: fmt.Sprintf(common.TopicTwinUpdate, deviceID),
}
return td.GetPayload()
payload, _, err := td.GetPayload()
return payload, err
}

func (d *DevPanel) GetDevice(deviceID string) (interface{}, error) {
Expand Down Expand Up @@ -342,3 +400,15 @@ func (d *DevPanel) UpdateModel(model *common.DeviceModel) {
func (d *DevPanel) RemoveModel(modelName string) {
delete(d.models, modelName)
}

func ConvertFloat64ToBytes(f float64) []byte {
res := make([]byte, 8)
binary.BigEndian.PutUint64(res, math.Float64bits(f))
return res
}

func ConvertFloat32ToBytes(f float32) []byte {
res := make([]byte, 4)
binary.BigEndian.PutUint32(res, math.Float32bits(f))
return res
}
75 changes: 63 additions & 12 deletions mappers/modbus-dmi/device/twindata.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/klog/v2"

dmiapi "github.com/kubeedge/kubeedge/pkg/apis/dmi/v1alpha1"

"github.com/kubeedge/mappers-go/pkg/common"
"github.com/kubeedge/mappers-go/pkg/driver/modbus"
"github.com/kubeedge/mappers-go/pkg/util/grpcclient"
Expand All @@ -36,12 +37,14 @@ import (

// TwinData is the timer structure for getting twin/data.
type TwinData struct {
DeviceID string
DeviceName string
Client *modbus.ModbusClient
Name string
Type string
VisitorConfig *modbus.ModbusVisitorConfig
Results []byte
LastValue string
Topic string
}

Expand Down Expand Up @@ -111,53 +114,101 @@ func TransferData(isRegisterSwap bool, isSwap bool,
}
bits := binary.BigEndian.Uint32(value)
data := float64(math.Float32frombits(bits)) * scale
sData := strconv.FormatFloat(data, 'f', 6, 64)
sData := strconv.FormatFloat(data, 'f', 2, 64)
return sData, nil
case "boolean":
return strconv.FormatBool(value[0] == 1), nil
return strconv.FormatBool(value[0] == 0xFF), nil
case "string":
data := string(value)
for i, b := range value {
if !isUpper(b) && !isLowercase(b) && !isNumber(b) && !isSpecial(b) {
value[i] = ' '
}
}
data := strings.ReplaceAll(string(value), " ", "")
return data, nil
default:
return "", errors.New("Data type is not support")
return "", errors.New("data type is not support")
}
}

func (td *TwinData) GetPayload() ([]byte, error) {
func isUpper(b byte) bool {
return 'A' <= b && b <= 'Z'
}

func isLowercase(b byte) bool {
return 'a' <= b && b <= 'z'
}

func isNumber(b byte) bool {
return '0' <= b && b <= '9'
}

func isSpecial(b byte) bool {
whiteList := map[byte]byte{
'/': '/',
'-': '-',
'_': '_',
'.': '.',
'%': '%',
'+': '+',
',': ',',
'=': '=',
'@': '@',
'#': '#',
':': ':',
'^': '^',
'~': '~',
'?': '?',
'&': '&',
'!': '!',
'*': '*',
}
_, ok := whiteList[b]
return ok
}

func (td *TwinData) GetPayload() ([]byte, bool, error) {
var err error

td.Results, err = td.Client.Get(td.VisitorConfig.Register, td.VisitorConfig.Offset, uint16(td.VisitorConfig.Limit))
if err != nil {
return nil, fmt.Errorf("get register failed: %v", err)
return nil, false, fmt.Errorf("get register failed: %v", err)
}
// transfer data according to the dpl configuration
sData, err := TransferData(td.VisitorConfig.IsRegisterSwap,
td.VisitorConfig.IsSwap, td.Type, td.VisitorConfig.Scale, td.Results)
if err != nil {
return nil, fmt.Errorf("transfer Data failed: %v", err)
return nil, false, fmt.Errorf("transfer Data failed: %v", err)
}

// do not report if the twin data is not changed to prevent triggering traffic limiting
changed := sData != td.LastValue
td.LastValue = sData
// construct payload
var payload []byte
if strings.Contains(td.Topic, "$hw") {
if payload, err = common.CreateMessageTwinUpdate(td.Name, td.Type, sData); err != nil {
return nil, fmt.Errorf("create message twin update failed: %v", err)
return nil, false, fmt.Errorf("create message twin update failed: %v", err)
}
} else {
if payload, err = common.CreateMessageData(td.Name, td.Type, sData); err != nil {
return nil, fmt.Errorf("create message data failed: %v", err)
return nil, false, fmt.Errorf("create message data failed: %v", err)
}
}
klog.V(2).Infof("Get the %s value as %s", td.Name, sData)
return payload, nil
return payload, changed, nil
}

// Run timer function.
func (td *TwinData) Run() {
payload, err := td.GetPayload()
payload, changed, err := td.GetPayload()
if err != nil {
klog.Errorf("twindata %s get payload failed, err: %s", td.Name, err)
return
}
if !changed {
return
}

var msg common.DeviceTwinUpdate
if err = json.Unmarshal(payload, &msg); err != nil {
Expand All @@ -168,7 +219,7 @@ func (td *TwinData) Run() {
twins := parse.ConvMsgTwinToGrpc(msg.Twin)

var rdsr = &dmiapi.ReportDeviceStatusRequest{
DeviceName: td.DeviceName,
DeviceName: td.DeviceID,
ReportedDevice: &dmiapi.DeviceStatus{
Twins: twins,
State: "OK",
Expand Down
Loading