Skip to content

Commit

Permalink
update mapper
Browse files Browse the repository at this point in the history
Signed-off-by: wbc6080 <[email protected]>
  • Loading branch information
wbc6080 committed Jan 30, 2024
1 parent 225ff37 commit 7e650f8
Show file tree
Hide file tree
Showing 44 changed files with 210 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/kubeedge/virtualdevice/device"
"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/config"
"github.com/kubeedge/virtualdevice/pkg/grpcclient"
"github.com/kubeedge/virtualdevice/pkg/grpcserver"
"github.com/kubeedge/virtualdevice/pkg/httpserver"
"github.com/kubeedge/virtualdevice/pkg/util/grpcclient"
"github.com/kubeedge/virtualdevice/pkg/util/parse"
)

Expand Down Expand Up @@ -50,7 +50,7 @@ func main() {
// if dev init mode is register, mapper's dev will init when registry to edgecore
if c.DevInit.Mode != common.DevInitModeRegister {
klog.Infoln("======dev init mode is not register, will register to edgecore")
if _, _, err = grpcclient.RegisterMapper(&c, false); err != nil {
if _, _, err = grpcclient.RegisterMapper(false); err != nil {
klog.Fatal(err)
}
klog.Infoln("registerMapper finished")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (d *DataBaseConfig) CloseSession(client influxdb2.Client) {
}

func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error {
// write device data to influx database
writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket)
p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement,
d.Influxdb2DataConfig.Tag,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redis

import (
"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/global"
)

type DataBaseConfig struct {
}

func NewDataBaseClient() (global.DataBaseClient, error) {
return &DataBaseConfig{}, nil
}

func (d *DataBaseConfig) InitDbClient() error {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) CloseSession() {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) AddData(data *common.DataModel) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
}

func (pm *PushMethod) InitPushMethod() error {
fmt.Println("Init Http")
klog.V(1).Info("Init HTTP")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(2).Info("Publish device data by HTTP")

targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath
klog.V(1).Infof("targetUrl = %s", targetUrl)
payload := data.PropertyName + "=" + data.Value
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
currentTime := "&time" + "=" + formatTimeStr
payload += currentTime

klog.V(3).Infof("Publish %v to %s", payload, targetUrl)

resp, err := http.Post(targetUrl,
"application/x-www-form-urlencoded",
strings.NewReader(payload))
Expand All @@ -61,7 +64,10 @@ func (pm *PushMethod) Push(data *common.DataModel) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
klog.Errorf("Publish device data by HTTP failed, err = %v", err)
return
}
klog.V(1).Info(string(body))
klog.V(1).Info("############### Message published. ###############")
klog.V(3).Infof("HTTP reviced %s", string(body))

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
}

func (pm *PushMethod) InitPushMethod() error {
fmt.Println("Init Mqtt")
klog.V(1).Info("Init MQTT")
return nil
}

Expand All @@ -59,5 +59,5 @@ func (pm *PushMethod) Push(data *common.DataModel) {
token.Wait()

client.Disconnect(250)
klog.V(1).Info("############### Message published. ###############")
klog.V(2).Info("############### Message published. ###############")
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
var visitorConfig driver.VisitorConfig

err := json.Unmarshal(twin.Property.Visitors, &visitorConfig)
visitorConfig.VisitorConfigData.DataType = strings.ToLower(visitorConfig.VisitorConfigData.DataType)
if err != nil {
klog.Errorf("Unmarshal VisitorConfig error: %v", err)
continue
Expand Down Expand Up @@ -253,7 +254,13 @@ func setVisitor(visitorConfig *driver.VisitorConfig, twin *common.Twin, dev *dri
klog.V(3).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName)
return nil
}
err := dev.CustomizedClient.SetDeviceData(twin, visitorConfig)
klog.V(2).Infof("Convert type: %s, value: %s ", twin.Property.PProperty.DataType, twin.ObservedDesired.Value)
value, err := common.Convert(twin.Property.PProperty.DataType, twin.ObservedDesired.Value)
if err != nil {
klog.Errorf("Failed to convert value as %s : %v", twin.Property.PProperty.DataType, err)
return err
}
err = dev.CustomizedClient.SetDeviceData(value, visitorConfig)
if err != nil {
return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err)
}
Expand All @@ -270,7 +277,7 @@ func (d *DevPanel) DevInit(cfg *config.Config) error {
// return err
// }
case common.DevInitModeRegister:
if err := parse.ParseByUsingRegister(cfg, devs, d.models, d.protocols); err != nil {
if err := parse.ParseByUsingRegister(devs, d.models, d.protocols); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/kubeedge/virtualdevice/driver"
"github.com/kubeedge/virtualdevice/pkg/common"
dmiapi "github.com/kubeedge/virtualdevice/pkg/dmi-api"
"github.com/kubeedge/virtualdevice/pkg/util/grpcclient"
"github.com/kubeedge/virtualdevice/pkg/grpcclient"
"github.com/kubeedge/virtualdevice/pkg/util/parse"
)

Expand All @@ -31,6 +31,7 @@ type TwinData struct {

func (td *TwinData) GetPayLoad() ([]byte, error) {
var err error
td.VisitorConfig.VisitorConfigData.DataType = strings.ToLower(td.VisitorConfig.VisitorConfigData.DataType)
td.Results, err = td.Client.GetDeviceData(td.VisitorConfig)
if err != nil {
return nil, fmt.Errorf("get device data failed: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package driver

import (
"fmt"
"github.com/kubeedge/virtualdevice/pkg/common"
"k8s.io/klog/v2"
"math/rand"
"sync"

"k8s.io/klog/v2"
)

func NewClient(protocol ProtocolConfig) (*CustomizedClient, error) {
Expand Down Expand Up @@ -40,16 +40,12 @@ func (c *CustomizedClient) GetDeviceData(visitor *VisitorConfig) (interface{}, e
}
}

func (c *CustomizedClient) SetDeviceData(twin *common.Twin, visitor *VisitorConfig) error {
func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *VisitorConfig) error {
// TODO: set device's data
// you can use c.ProtocolConfig and visitor
value, err := common.Convert(twin.Property.PProperty.DataType, twin.Property.PProperty.Maximum)
if err != nil {
klog.Errorf("Failed to convert value as %s : %v", twin.Property.PProperty.DataType, err)
return err
}

if visitor.DataType == "int" {
c.intMaxValue = int(value.(int64))
c.intMaxValue = int(data.(int64))
} else {
return fmt.Errorf("unrecognized data type: %s", visitor.DataType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/kubeedge/virtualdevice
go 1.17

require (

github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/golang/protobuf v1.5.2
github.com/gorilla/mux v1.8.0
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package common

import "encoding/json"

// DeviceProfile is structure to store in configMap.
// DeviceProfile is structure to store in configMap. It will be removed later
type DeviceProfile struct {
DeviceInstances []DeviceInstance `json:"deviceInstances,omitempty"`
DeviceModels []DeviceModel `json:"deviceModels,omitempty"`
Expand Down Expand Up @@ -49,10 +49,9 @@ type ModelProperty struct {
DataType string `json:"dataType,omitempty"`
Description string `json:"description,omitempty"`
AccessMode string `json:"accessMode,omitempty"`
//DefaultValue interface{} `json:"defaultValue,omitempty"`
Minimum string `json:"minimum,omitempty"` //todo todo why the type is int64
Maximum string `json:"maximum,omitempty"`
Unit string `json:"unit,omitempty"`
Minimum string `json:"minimum,omitempty"`
Maximum string `json:"maximum,omitempty"`
Unit string `json:"unit,omitempty"`
}

// Protocol is structure to store protocol in deviceProfile.json in configmap.
Expand All @@ -74,14 +73,12 @@ type DeviceProperty struct {
ModelName string `json:"modelName,omitempty"`
Protocol string `json:"protocol,omitempty"`
Visitors json.RawMessage `json:"visitorConfig"`

// whether be reported to the cloud
ReportToCloud bool `json:"reportToCloud,omitempty"`
CollectCycle int64 `json:"collectCycle"`
ReportCycle int64 `json:"reportCycle,omitempty"`
PushMethod PushMethodConfig `json:"pushMethod,omitempty"`

PProperty ModelProperty
PProperty ModelProperty
}

// PushMethodConfig is structure to store push config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"google.golang.org/grpc"

"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/config"
dmiapi "github.com/kubeedge/virtualdevice/pkg/dmi-api"
)

// RegisterMapper if withData is true, edgecore will send device and model list.
func RegisterMapper(cfg *config.Config, withData bool) ([]*dmiapi.Device, []*dmiapi.DeviceModel, error) {
func RegisterMapper(withData bool) ([]*dmiapi.Device, []*dmiapi.DeviceModel, error) {
// connect grpc server
conn, err := grpc.Dial(cfg.Common.EdgeCoreSock,
grpc.WithInsecure(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ import (
)

func (s *Server) RegisterDevice(ctx context.Context, request *dmiapi.RegisterDeviceRequest) (*dmiapi.RegisterDeviceResponse, error) {
//klog.V(2).Info("RegisterDevice")
klog.V(3).Info("RegisterDevice")
device := request.GetDevice()
//klog.V(1).Infof("In RegisterDevice, device = %v", device)

if device == nil {
return nil, errors.New("device is nil")
}
Expand Down Expand Up @@ -67,7 +65,7 @@ func (s *Server) RemoveDevice(ctx context.Context, request *dmiapi.RemoveDeviceR
}

func (s *Server) UpdateDevice(ctx context.Context, request *dmiapi.UpdateDeviceRequest) (*dmiapi.UpdateDeviceResponse, error) {
klog.V(2).Info("UpdateDevice")
klog.V(3).Info("UpdateDevice")
device := request.GetDevice()
if device == nil {
return nil, errors.New("device is nil")
Expand All @@ -82,7 +80,7 @@ func (s *Server) UpdateDevice(ctx context.Context, request *dmiapi.UpdateDeviceR
return nil, fmt.Errorf("parse device %s protocol failed, err: %s", device.Name, err)
}

klog.Infof("model: %+v", model)
klog.V(3).Infof("model: %+v", model)
deviceInstance, err := parse.ParseDeviceFromGrpc(device, &model)
if err != nil {
return nil, fmt.Errorf("parse device %s instance failed, err: %s", device.Name, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Server) Start() error {
grpcServer := grpc.NewServer()
dmiapi.RegisterDeviceMapperServiceServer(grpcServer, s)
reflection.Register(grpcServer)
klog.Info("start grpc server")
klog.V(2).Info("start grpc server")
return grpcServer.Serve(s.lis)
}

Expand All @@ -63,7 +63,7 @@ func (s *Server) Stop() {
}

func initSock(sockPath string) error {
klog.Infof("init uds socket: %s", sockPath)
klog.V(2).Infof("init uds socket: %s", sockPath)
_, err := os.Stat(sockPath)
if err == nil {
err = os.Remove(sockPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,23 @@ func (rs *RestServer) StartServer() {
}
if rs.CaCertFilePath == "" && (rs.KeyFilePath == "" || rs.CertFilePath == "") {
// insecure
klog.Info("Insecure communication, skipping server verification")
klog.V(3).Info("Insecure communication, skipping server verification")
err := rs.server.ListenAndServe()
if err != nil {
klog.Errorf("insecure http server error: %v", err)
return
}
} else if rs.CaCertFilePath == "" && rs.KeyFilePath != "" && rs.CertFilePath != "" {
// tls
klog.Info("tls communication, https server start")
klog.V(3).Info("tls communication, https server start")
err := rs.server.ListenAndServeTLS(rs.CertFilePath, rs.KeyFilePath)
if err != nil {
klog.Errorf("tls http server error: %v", err)
return
}
} else if rs.CaCertFilePath != "" && rs.KeyFilePath != "" && rs.CertFilePath != "" {
// mtls
klog.Info("mtls communication, please provide client-key and client-cert to access service")
klog.V(3).Info("mtls communication, please provide client-key and client-cert to access service")
// Configure the server to trust TLS client cert issued by your CA.
certPool := x509.NewCertPool()
if caCertPEM, err := ioutil.ReadFile(rs.CaCertFilePath); err != nil {
Expand Down
Loading

0 comments on commit 7e650f8

Please sign in to comment.