Skip to content

Commit

Permalink
Upload mapper example based on v1beta1 api
Browse files Browse the repository at this point in the history
Signed-off-by: wbc6080 <[email protected]>
  • Loading branch information
wbc6080 committed Jan 30, 2024
1 parent 36cc3be commit 871e85d
Show file tree
Hide file tree
Showing 40 changed files with 7,688 additions and 0 deletions.
20 changes: 20 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM golang:1.17-alpine AS builder

WORKDIR /build

ENV GO111MODULE=on \
GOPROXY=https://goproxy.cn,direct

COPY . .

RUN CGO_ENABLED=0 GOOS=linux go build -o main cmd/main.go


FROM ubuntu:16.04

RUN mkdir -p kubeedge

COPY --from=builder /build/main kubeedge/
COPY ./config.yaml kubeedge/

WORKDIR kubeedge
34 changes: 34 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
SHELL := /bin/bash

curr_dir := $(patsubst %/,%,$(dir $(abspath $(lastword $(MAKEFILE_LIST)))))
rest_args := $(wordlist 2, $(words $(MAKECMDGOALS)), $(MAKECMDGOALS))
$(eval $(rest_args):;@:)

help:
#
# Usage:
# make generate : generate a mapper based on a template.
# make mapper {mapper-name} <action> <parameter>: execute mapper building process.
#
# Actions:
# - mod, m : download code dependencies.
# - lint, l : verify code via go fmt and `golangci-lint`.
# - build, b : compile code.
# - package, p : package docker image.
# - clean, c : clean output binary.
#
# Parameters:
# ARM : true or undefined
# ARM64 : true or undefined
#
# Example:
# - make mapper modbus ARM64=true : execute `build` "modbus" mapper for ARM64.
# - make mapper modbus test : execute `test` "modbus" mapper.
@echo

make_rules := $(shell ls $(curr_dir)/hack/make-rules | sed 's/.sh//g')
$(make_rules):
@$(curr_dir)/hack/make-rules/$@.sh $(rest_args)

.DEFAULT_GOAL := help
.PHONY: $(make_rules) build test package
67 changes: 67 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"errors"
"os"

"k8s.io/klog/v2"

"github.com/kubeedge/modbus/device"
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/config"
"github.com/kubeedge/modbus/pkg/grpcclient"
"github.com/kubeedge/modbus/pkg/grpcserver"
"github.com/kubeedge/modbus/pkg/httpserver"
"github.com/kubeedge/modbus/pkg/util/parse"
)

func main() {
var err error
var c config.Config

klog.InitFlags(nil)
defer klog.Flush()

if err = c.Parse(); err != nil {
klog.Fatal(err)
os.Exit(1)
}
klog.Infof("config: %+v", c)

grpcclient.Init(&c)

// start grpc server
grpcServer := grpcserver.NewServer(
grpcserver.Config{
SockPath: c.GrpcServer.SocketPath,
Protocol: common.ProtocolCustomized,
},
device.NewDevPanel(),
)

panel := device.NewDevPanel()
err = panel.DevInit(&c)
if err != nil && !errors.Is(err, parse.ErrEmptyData) {
klog.Fatal(err)
}
klog.Infoln("devInit finished")

// register to edgecore
// if dev init mode is register, mapper's dev will init when registry to edgecore
if c.DevInit.Mode != common.DevInitModeRegister {
klog.Infoln("======dev init mode is not register, will register to edgecore")
if _, _, err = grpcclient.RegisterMapper(false); err != nil {
klog.Fatal(err)
}
klog.Infoln("registerMapper finished")
}
go panel.DevStart()

httpServer := httpserver.NewRestServer(panel)
go httpServer.StartServer()

defer grpcServer.Stop()
if err = grpcServer.Start(); err != nil {
klog.Fatal(err)
}
}
12 changes: 12 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
grpc_server:
socket_path: /etc/kubeedge/modbus.sock
common:
name: Modbus-mapper
version: v1.13.0
api_version: v1.0.0
protocol: modbus # TODO add your protocol name
address: 127.0.0.1
edgecore_sock: /etc/kubeedge/dmi.sock
dev_init:
mode: register

76 changes: 76 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/data/dbmethod/influxdb2/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package influxdb2

import (
"context"
"encoding/json"
"os"
"time"

"k8s.io/klog/v2"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/kubeedge/modbus/pkg/common"
)

type DataBaseConfig struct {
Influxdb2ClientConfig *Influxdb2ClientConfig `json:"influxdb2ClientConfig,omitempty"`
Influxdb2DataConfig *Influxdb2DataConfig `json:"influxdb2DataConfig,omitempty"`
}

type Influxdb2ClientConfig struct {
Url string `json:"url,omitempty"`
Org string `json:"org,omitempty"`
Bucket string `json:"bucket,omitempty"`
}

type Influxdb2DataConfig struct {
Measurement string `json:"measurement,omitempty"`
Tag map[string]string `json:"tag,omitempty"`
FieldKey string `json:"fieldKey,omitempty"`
}

func NewDataBaseClient(clientConfig json.RawMessage, dataConfig json.RawMessage) (*DataBaseConfig, error) {
// parse influx database config data
influxdb2ClientConfig := new(Influxdb2ClientConfig)
influxdb2DataConfig := new(Influxdb2DataConfig)
err := json.Unmarshal(clientConfig, influxdb2ClientConfig)
if err != nil {
return nil, err
}
err = json.Unmarshal(dataConfig, influxdb2DataConfig)
if err != nil {
return nil, err
}
return &DataBaseConfig{
Influxdb2ClientConfig: influxdb2ClientConfig,
Influxdb2DataConfig: influxdb2DataConfig,
}, nil
}

func (d *DataBaseConfig) InitDbClient() influxdb2.Client {
var usrtoken string
usrtoken = os.Getenv("TOKEN")
client := influxdb2.NewClient(d.Influxdb2ClientConfig.Url, usrtoken)

return client
}

func (d *DataBaseConfig) CloseSession(client influxdb2.Client) {
client.Close()
}

func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error {
// write device data to influx database
writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket)
p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement,
d.Influxdb2DataConfig.Tag,
map[string]interface{}{d.Influxdb2DataConfig.FieldKey: data.Value},
time.Now())
// write point immediately
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
klog.V(4).Info("Exit AddData")
return err
}
return nil
}
73 changes: 73 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/data/publish/http/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package http

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

"k8s.io/klog/v2"

"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/global"
)

type PushMethod struct {
HTTP *HTTPConfig `json:"http"`
}

type HTTPConfig struct {
HostName string `json:"hostName,omitempty"`
Port int `json:"port,omitempty"`
RequestPath string `json:"requestPath,omitempty"`
Timeout int `json:"timeout,omitempty"`
}

func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
httpConfig := new(HTTPConfig)
err := json.Unmarshal(config, httpConfig)
if err != nil {
return nil, err
}
return &PushMethod{
HTTP: httpConfig,
}, nil
}

func (pm *PushMethod) InitPushMethod() error {
klog.V(1).Info("Init HTTP")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(2).Info("Publish device data by HTTP")

targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath
payload := data.PropertyName + "=" + data.Value
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
currentTime := "&time" + "=" + formatTimeStr
payload += currentTime

klog.V(3).Infof("Publish %v to %s", payload, targetUrl)

resp, err := http.Post(targetUrl,
"application/x-www-form-urlencoded",
strings.NewReader(payload))

if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
klog.Errorf("Publish device data by HTTP failed, err = %v", err)
return
}
klog.V(1).Info("############### Message published. ###############")
klog.V(3).Infof("HTTP reviced %s", string(body))

}
63 changes: 63 additions & 0 deletions mappers/kubeedge-v1.15.0/modbus/data/publish/mqtt/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package mqtt

import (
"encoding/json"
"fmt"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"k8s.io/klog/v2"

"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/global"
)

type PushMethod struct {
MQTT *MQTTConfig `json:"http"`
}

type MQTTConfig struct {
Address string `json:"address,omitempty"`
Topic string `json:"topic,omitempty"`
QoS int `json:"qos,omitempty"`
Retained bool `json:"retained,omitempty"`
}

func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
mqttConfig := new(MQTTConfig)
err := json.Unmarshal(config, mqttConfig)
if err != nil {
return nil, err
}
return &PushMethod{
MQTT: mqttConfig,
}, nil
}

func (pm *PushMethod) InitPushMethod() error {
klog.V(1).Info("Init MQTT")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(1).Infof("Publish %v to %s on topic: %s, Qos: %d, Retained: %v",
data.Value, pm.MQTT.Address, pm.MQTT.Topic, pm.MQTT.QoS, pm.MQTT.Retained)

opts := mqtt.NewClientOptions().AddBroker(pm.MQTT.Address)
client := mqtt.NewClient(opts)

if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
str_time := "time is " + formatTimeStr + " "
str_publish := str_time + pm.MQTT.Topic + ": " + data.Value

token := client.Publish(pm.MQTT.Topic, byte(pm.MQTT.QoS), pm.MQTT.Retained, str_publish)
token.Wait()

client.Disconnect(250)
klog.V(2).Info("############### Message published. ###############")
}
Loading

0 comments on commit 871e85d

Please sign in to comment.