Skip to content

Commit

Permalink
feat: added metrics service v2
Browse files Browse the repository at this point in the history
  • Loading branch information
tikazyq committed Jun 26, 2024
1 parent 265b098 commit 7bdce1a
Show file tree
Hide file tree
Showing 36 changed files with 302 additions and 437 deletions.
43 changes: 16 additions & 27 deletions core/grpc/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"io"
"sync"
"time"
)

Expand All @@ -41,10 +42,7 @@ type GrpcClientV2 struct {
TaskClient grpc2.TaskServiceClient
ModelBaseServiceV2Client grpc2.ModelBaseServiceV2Client
DependenciesClient grpc2.DependenciesServiceV2Client
}

func (c *GrpcClientV2) Init() (err error) {
return nil
MetricsClient grpc2.MetricsServiceV2Client
}

func (c *GrpcClientV2) Start() (err error) {
Expand Down Expand Up @@ -96,11 +94,7 @@ func (c *GrpcClientV2) Register() {
c.ModelBaseServiceV2Client = grpc2.NewModelBaseServiceV2Client(c.conn)
c.TaskClient = grpc2.NewTaskServiceClient(c.conn)
c.DependenciesClient = grpc2.NewDependenciesServiceV2Client(c.conn)

// log
log.Infof("[GrpcClient] grpc client registered client services")
log.Debugf("[GrpcClient] NodeClient: %v", c.NodeClient)
log.Debugf("[GrpcClient] ModelBaseServiceV2Client: %v", c.ModelBaseServiceV2Client)
c.MetricsClient = grpc2.NewMetricsServiceV2Client(c.conn)
}

func (c *GrpcClientV2) Context() (ctx context.Context, cancel context.CancelFunc) {
Expand Down Expand Up @@ -248,7 +242,7 @@ func (c *GrpcClientV2) handleStreamMessage() {
}
}

func NewGrpcClientV2() (c *GrpcClientV2, err error) {
func newGrpcClientV2() (c *GrpcClientV2) {
client := &GrpcClientV2{
address: entity.NewAddress(&entity.AddressOptions{
Host: constants.DefaultGrpcClientRemoteHost,
Expand All @@ -260,28 +254,23 @@ func NewGrpcClientV2() (c *GrpcClientV2, err error) {
client.nodeCfgSvc = nodeconfig.GetNodeConfigService()

if viper.GetString("grpc.address") != "" {
client.address, err = entity.NewAddressFromString(viper.GetString("grpc.address"))
address, err := entity.NewAddressFromString(viper.GetString("grpc.address"))
if err != nil {
return nil, trace.TraceError(err)
log.Errorf("failed to parse grpc address: %s", viper.GetString("grpc.address"))
panic(err)
}
client.address = address
}

if err := client.Init(); err != nil {
return nil, err
}

return client, nil
return client
}

var _clientV2 *GrpcClientV2
var clientV2 *GrpcClientV2
var clientV2Once sync.Once

func GetGrpcClientV2() (client *GrpcClientV2, err error) {
if _clientV2 != nil {
return _clientV2, nil
}
_clientV2, err = NewGrpcClientV2()
if err != nil {
return nil, err
}
return _clientV2, nil
func GetGrpcClientV2() *GrpcClientV2 {
clientV2Once.Do(func() {
clientV2 = newGrpcClientV2()
})
return clientV2
}
78 changes: 76 additions & 2 deletions core/grpc/server/metrics_server_v2.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,86 @@
package server

import (
"errors"
"github.com/apex/log"
"github.com/crawlab-team/crawlab/grpc"
"io"
"sync"
)

type MetricsServerV2 struct {
grpc.UnimplementedMetricsServiceV2Server
mu *sync.Mutex
streams map[string]*grpc.MetricsServiceV2_ConnectServer
mu *sync.Mutex
streams map[string]*grpc.MetricsServiceV2_ConnectServer
channels map[string]chan []*grpc.Metric
}

func (svr MetricsServerV2) Connect(stream grpc.MetricsServiceV2_ConnectServer) (err error) {
// receive first message
req, err := stream.Recv()
if err != nil {
log.Errorf("[MetricsServerV2] receive error: %v", err)
return err
}

// save stream and channel
svr.mu.Lock()
svr.streams[req.NodeKey] = &stream
svr.channels[req.NodeKey] = make(chan []*grpc.Metric)
svr.mu.Unlock()

log.Info("[MetricsServerV2] connected: " + req.NodeKey)

for {
// receive metrics
req, err = stream.Recv()
if errors.Is(err, io.EOF) {
log.Errorf("[MetricsServerV2] receive EOF: %v", err)
return
}

// send metrics to channel
svr.channels[req.NodeKey] <- req.Metrics

// keep this scope alive because once this scope exits - the stream is closed
select {
case <-stream.Context().Done():
log.Info("[MetricsServerV2] disconnected: " + req.NodeKey)
delete(svr.streams, req.NodeKey)
delete(svr.channels, req.NodeKey)
return nil
}
}
}

func (svr MetricsServerV2) GetStream(nodeKey string) (stream *grpc.MetricsServiceV2_ConnectServer, ok bool) {
svr.mu.Lock()
defer svr.mu.Unlock()
stream, ok = svr.streams[nodeKey]
return stream, ok
}

func (svr MetricsServerV2) GetChannel(nodeKey string) (ch chan []*grpc.Metric, ok bool) {
svr.mu.Lock()
defer svr.mu.Unlock()
ch, ok = svr.channels[nodeKey]
return ch, ok
}

func NewMetricsServerV2() *MetricsServerV2 {
return &MetricsServerV2{
mu: new(sync.Mutex),
streams: make(map[string]*grpc.MetricsServiceV2_ConnectServer),
channels: make(map[string]chan []*grpc.Metric),
}
}

var metricsServerV2 *MetricsServerV2

func GetMetricsServerV2() *MetricsServerV2 {
if metricsServerV2 != nil {
return metricsServerV2
}
metricsServerV2 = NewMetricsServerV2()
return metricsServerV2
}
3 changes: 3 additions & 0 deletions core/grpc/server/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type GrpcServerV2 struct {
TaskSvr *TaskServerV2
ModelBaseServiceSvr *ModelBaseServiceServerV2
DependenciesSvr *DependenciesServerV2
MetricsSvr *MetricsServerV2
}

func (svr *GrpcServerV2) GetConfigPath() (path string) {
Expand Down Expand Up @@ -119,6 +120,7 @@ func (svr *GrpcServerV2) Register() (err error) {
grpc2.RegisterModelBaseServiceV2Server(svr.svr, *svr.ModelBaseServiceSvr)
grpc2.RegisterTaskServiceServer(svr.svr, *svr.TaskSvr)
grpc2.RegisterDependenciesServiceV2Server(svr.svr, *svr.DependenciesSvr)
grpc2.RegisterMetricsServiceV2Server(svr.svr, *svr.MetricsSvr)

return nil
}
Expand Down Expand Up @@ -217,6 +219,7 @@ func NewGrpcServerV2() (svr *GrpcServerV2, err error) {
return nil, err
}
svr.DependenciesSvr = GetDependenciesServerV2()
svr.MetricsSvr = GetMetricsServerV2()

// recovery options
recoveryOpts := []grpc_recovery.Option{
Expand Down
7 changes: 2 additions & 5 deletions core/models/client/model_service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,9 @@ func NewModelServiceV2[T any]() *ModelServiceV2[T] {

var instance *ModelServiceV2[T]

c, err := client.GetGrpcClientV2()
if err != nil {
panic(err)
}
c := client.GetGrpcClientV2()
if !c.IsStarted() {
err = c.Start()
err := c.Start()
if err != nil {
panic(err)
}
Expand Down
5 changes: 1 addition & 4 deletions core/node/service/worker_service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ func NewWorkerServiceV2() (res *WorkerServiceV2, err error) {
}

// grpc client
svc.client, err = client.NewGrpcClientV2()
if err != nil {
return nil, err
}
svc.client = client.GetGrpcClientV2()

// handler service
svc.handlerSvc, err = handler.GetTaskHandlerServiceV2()
Expand Down
5 changes: 1 addition & 4 deletions core/task/handler/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,7 @@ func NewTaskHandlerServiceV2() (svc2 *ServiceV2, err error) {
svc.cfgSvc = nodeconfig.GetNodeConfigService()

// grpc client
svc.c, err = grpcclient.NewGrpcClientV2()
if err != nil {
return nil, err
}
svc.c = grpcclient.GetGrpcClientV2()

log.Debugf("[NewTaskHandlerService] svc[cfgPath: %s]", svc.cfgSvc.GetConfigPath())

Expand Down
2 changes: 1 addition & 1 deletion grpc/dependencies_service_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 12 additions & 21 deletions grpc/dependencies_service_v2_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion grpc/message_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions grpc/message_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7bdce1a

Please sign in to comment.