Skip to content

Commit

Permalink
feat: added metric service
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jul 2, 2024
1 parent ffcb44f commit 7eb8c08
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 370 deletions.
98 changes: 39 additions & 59 deletions core/grpc/server/metrics_server_v2.go
Original file line number Diff line number Diff line change
@@ -1,86 +1,66 @@
package server

import (
"errors"
"context"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/core/models/models"
"github.com/crawlab-team/crawlab/core/models/service"
"github.com/crawlab-team/crawlab/grpc"
"io"
"go.mongodb.org/mongo-driver/bson"
"sync"
"time"
)

type MetricsServerV2 struct {
grpc.UnimplementedMetricsServiceV2Server
mu *sync.Mutex
streams map[string]*grpc.MetricsServiceV2_ConnectServer
channels map[string]chan []*grpc.Metric
}

func (svr MetricsServerV2) Connect(stream grpc.MetricsServiceV2_ConnectServer) (err error) {
// receive first message
req, err := stream.Recv()
func (svr MetricsServerV2) Send(_ context.Context, req *grpc.MetricsServiceV2SendRequest) (res *grpc.Response, err error) {
log.Info("[MetricsServerV2] received metric from node: " + req.NodeKey)
n, err := service.NewModelServiceV2[models.NodeV2]().GetOne(bson.M{"key": req.NodeKey}, nil)
if err != nil {
log.Errorf("[MetricsServerV2] receive error: %v", err)
return err
log.Errorf("[MetricsServerV2] error getting node: %v", err)
return HandleError(err)
}

// save stream and channel
svr.mu.Lock()
svr.streams[req.NodeKey] = &stream
svr.channels[req.NodeKey] = make(chan []*grpc.Metric)
svr.mu.Unlock()

log.Info("[MetricsServerV2] connected: " + req.NodeKey)

for {
// receive metrics
req, err = stream.Recv()
if errors.Is(err, io.EOF) {
log.Errorf("[MetricsServerV2] receive EOF: %v", err)
return
}

// send metrics to channel
svr.channels[req.NodeKey] <- req.Metrics

// keep this scope alive because once this scope exits - the stream is closed
select {
case <-stream.Context().Done():
log.Info("[MetricsServerV2] disconnected: " + req.NodeKey)
delete(svr.streams, req.NodeKey)
delete(svr.channels, req.NodeKey)
return nil
}
metric := models.MetricV2{
Type: req.Type,
NodeId: n.Id,
CpuUsagePercent: req.CpuUsagePercent,
TotalMemory: req.TotalMemory,
AvailableMemory: req.AvailableMemory,
UsedMemory: req.UsedMemory,
UsedMemoryPercent: req.UsedMemoryPercent,
TotalDisk: req.TotalDisk,
AvailableDisk: req.AvailableDisk,
UsedDisk: req.UsedDisk,
UsedDiskPercent: req.UsedDiskPercent,
DiskReadBytesRate: req.DiskReadBytesRate,
DiskWriteBytesRate: req.DiskWriteBytesRate,
NetworkBytesSentRate: req.NetworkBytesSentRate,
NetworkBytesRecvRate: req.NetworkBytesRecvRate,
}
metric.CreatedAt = time.Unix(req.Timestamp, 0)
_, err = service.NewModelServiceV2[models.MetricV2]().InsertOne(metric)
if err != nil {
log.Errorf("[MetricsServerV2] error inserting metric: %v", err)
return HandleError(err)
}
return HandleSuccess()
}

func (svr MetricsServerV2) GetStream(nodeKey string) (stream *grpc.MetricsServiceV2_ConnectServer, ok bool) {
svr.mu.Lock()
defer svr.mu.Unlock()
stream, ok = svr.streams[nodeKey]
return stream, ok
}

func (svr MetricsServerV2) GetChannel(nodeKey string) (ch chan []*grpc.Metric, ok bool) {
svr.mu.Lock()
defer svr.mu.Unlock()
ch, ok = svr.channels[nodeKey]
return ch, ok
}

func NewMetricsServerV2() *MetricsServerV2 {
return &MetricsServerV2{
mu: new(sync.Mutex),
streams: make(map[string]*grpc.MetricsServiceV2_ConnectServer),
channels: make(map[string]chan []*grpc.Metric),
}
func newMetricsServerV2() *MetricsServerV2 {
return &MetricsServerV2{}
}

var metricsServerV2 *MetricsServerV2
var metricsServerV2Once = &sync.Once{}

func GetMetricsServerV2() *MetricsServerV2 {
if metricsServerV2 != nil {
return metricsServerV2
}
metricsServerV2 = NewMetricsServerV2()
metricsServerV2Once.Do(func() {
metricsServerV2 = newMetricsServerV2()
})
return metricsServerV2
}
1 change: 1 addition & 0 deletions core/grpc/server/model_base_service_v2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
*new(models.DependencyTaskV2),
*new(models.EnvironmentV2),
*new(models.GitV2),
*new(models.MetricV2),
*new(models.NodeV2),
*new(models.NotificationSettingV2),
*new(models.PermissionV2),
Expand Down
20 changes: 20 additions & 0 deletions core/models/common/index_service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,24 @@ func CreateIndexesV2() {
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24),
},
})

// metrics
mongo.GetMongoCol(service.GetCollectionNameByInstance(models.MetricV2{})).MustCreateIndexes([]mongo2.IndexModel{
{
Keys: bson.D{
{"created_ts", -1},
},
Options: (&options.IndexOptions{}).SetExpireAfterSeconds(60 * 60 * 24 * 30),
},
{
Keys: bson.D{
{"node_id", 1},
},
},
{
Keys: bson.D{
{"type", 1},
},
},
})
}
23 changes: 23 additions & 0 deletions core/models/models/metric_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package models

import "go.mongodb.org/mongo-driver/bson/primitive"

type MetricV2 struct {
any `collection:"metrics"`
BaseModelV2[MetricV2] `bson:",inline"`
Type string `json:"type" bson:"type"`
NodeId primitive.ObjectID `json:"node_id" bson:"node_id"`
CpuUsagePercent float32 `json:"cpu_usage_percent" bson:"cpu_usage_percent"`
TotalMemory uint64 `json:"total_memory" bson:"total_memory"`
AvailableMemory uint64 `json:"available_memory" bson:"available_memory"`
UsedMemory uint64 `json:"used_memory" bson:"used_memory"`
UsedMemoryPercent float32 `json:"used_memory_percent" bson:"used_memory_percent"`
TotalDisk uint64 `json:"total_disk" bson:"total_disk"`
AvailableDisk uint64 `json:"available_disk" bson:"available_disk"`
UsedDisk uint64 `json:"used_disk" bson:"used_disk"`
UsedDiskPercent float32 `json:"used_disk_percent" bson:"used_disk_percent"`
DiskReadBytesRate float32 `json:"disk_read_bytes_rate" bson:"disk_read_bytes_rate"`
DiskWriteBytesRate float32 `json:"disk_write_bytes_rate" bson:"disk_write_bytes_rate"`
NetworkBytesSentRate float32 `json:"network_bytes_sent_rate" bson:"network_bytes_sent_rate"`
NetworkBytesRecvRate float32 `json:"network_bytes_recv_rate" bson:"network_bytes_recv_rate"`
}
Loading

0 comments on commit 7eb8c08

Please sign in to comment.