Skip to content

Commit

Permalink
feat: enhance gRPC client functionality and improve logging
Browse files Browse the repository at this point in the history
- Added WaitForReady method to GrpcClient for blocking until the client is ready.
- Updated WorkerService to utilize WaitForReady for ensuring gRPC client readiness before starting.
- Refactored ModelService to consistently use GetGrpcClient for context management.
- Changed logging level for received metrics in MetricServiceServer from Info to Debug.
- Modified error handling in HandleError to conditionally print errors based on the environment.
- Cleaned up unused GrpcClient references in various services, improving code clarity.
  • Loading branch information
tikazyq committed Dec 20, 2024
1 parent eed451e commit 3cb74d7
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 96 deletions.
46 changes: 43 additions & 3 deletions core/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -75,6 +76,21 @@ func (c *GrpcClient) Stop() (err error) {
return nil
}

func (c *GrpcClient) WaitForReady() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if c.IsReady() {
return
}
case <-c.stop:
log.Errorf("grpc client stopped")
}
}
}

func (c *GrpcClient) register() {
c.NodeClient = grpc2.NewNodeServiceClient(c.conn)
c.ModelBaseServiceClient = grpc2.NewModelBaseServiceClient(c.conn)
Expand All @@ -87,8 +103,8 @@ func (c *GrpcClient) Context() (ctx context.Context, cancel context.CancelFunc)
return context.WithTimeout(context.Background(), c.timeout)
}

func (c *GrpcClient) IsStarted() (res bool) {
return c.conn != nil
func (c *GrpcClient) IsReady() (res bool) {
return c.conn != nil && c.conn.GetState() == connectivity.Ready
}

func (c *GrpcClient) IsClosed() (res bool) {
Expand Down Expand Up @@ -132,12 +148,30 @@ func (c *GrpcClient) connect() (err error) {
}

// connect
log.Infof("[GrpcClient] grpc client connecting to %s", c.address)
c.conn.Connect()

// wait for connection to be ready
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ok := c.conn.WaitForStateChange(ctx, connectivity.Ready)
if !ok {
return fmt.Errorf("[GrpcClient] grpc client failed to connect to %s: timed out", c.address)
}

// success
log.Infof("[GrpcClient] grpc client connected to %s", c.address)

return nil
}
return backoff.RetryNotify(op, backoff.NewExponentialBackOff(), utils.BackoffErrorNotify("grpc client connect"))
b := backoff.NewExponentialBackOff(
backoff.WithInitialInterval(5*time.Second),
backoff.WithMaxElapsedTime(10*time.Minute),
)
n := func(err error, duration time.Duration) {
log.Errorf("[GrpcClient] grpc client failed to connect to %s: %v, retrying in %s", c.address, err, duration)
}
return backoff.RetryNotify(op, b, n)
}

func newGrpcClient() (c *GrpcClient) {
Expand All @@ -154,6 +188,12 @@ var _clientOnce sync.Once
func GetGrpcClient() *GrpcClient {
_clientOnce.Do(func() {
_client = newGrpcClient()
go func() {
err := _client.Start()
if err != nil {
log.Fatalf("[GrpcClient] failed to start: %v", err)
}
}()
})
return _client
}
2 changes: 1 addition & 1 deletion core/grpc/server/metric_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type MetricServiceServer struct {
}

func (svr MetricServiceServer) Send(_ context.Context, req *grpc.MetricServiceSendRequest) (res *grpc.Response, err error) {
log.Info("[MetricServiceServer] received metric from node: " + req.NodeKey)
log.Debugf("[MetricServiceServer] received metric from node: " + req.NodeKey)
n, err := service.NewModelService[models.Node]().GetOne(bson.M{"key": req.NodeKey}, nil)
if err != nil {
log.Errorf("[MetricServiceServer] error getting node: %v", err)
Expand Down
5 changes: 4 additions & 1 deletion core/grpc/server/utils_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package server

import (
"encoding/json"
"github.com/crawlab-team/crawlab/core/utils"
"github.com/crawlab-team/crawlab/grpc"
"github.com/crawlab-team/crawlab/trace"
)

func HandleError(err error) (res *grpc.Response, err2 error) {
trace.PrintError(err)
if utils.IsDev() {
trace.PrintError(err)
}
return &grpc.Response{
Code: grpc.ResponseCode_ERROR,
Error: err.Error(),
Expand Down
70 changes: 30 additions & 40 deletions core/models/client/model_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ var (

type ModelService[T any] struct {
cfg interfaces.NodeConfigService
c *client.GrpcClient
modelType string
}

func (svc *ModelService[T]) GetById(id primitive.ObjectID) (model *T, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
res, err := svc.c.ModelBaseServiceClient.GetById(ctx, &grpc.ModelServiceGetByIdRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.GetById(ctx, &grpc.ModelServiceGetByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Expand All @@ -41,7 +40,7 @@ func (svc *ModelService[T]) GetById(id primitive.ObjectID) (model *T, err error)
}

func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (model *T, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
Expand All @@ -51,7 +50,7 @@ func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (mo
if err != nil {
return nil, err
}
res, err := svc.c.ModelBaseServiceClient.GetOne(ctx, &grpc.ModelServiceGetOneRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.GetOne(ctx, &grpc.ModelServiceGetOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -64,7 +63,7 @@ func (svc *ModelService[T]) GetOne(query bson.M, options *mongo.FindOptions) (mo
}

func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (models []T, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
Expand All @@ -74,7 +73,7 @@ func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (m
if err != nil {
return nil, err
}
res, err := svc.c.ModelBaseServiceClient.GetMany(ctx, &grpc.ModelServiceGetManyRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.GetMany(ctx, &grpc.ModelServiceGetManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -87,9 +86,9 @@ func (svc *ModelService[T]) GetMany(query bson.M, options *mongo.FindOptions) (m
}

func (svc *ModelService[T]) DeleteById(id primitive.ObjectID) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
_, err = svc.c.ModelBaseServiceClient.DeleteById(ctx, &grpc.ModelServiceDeleteByIdRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.DeleteById(ctx, &grpc.ModelServiceDeleteByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Expand All @@ -101,13 +100,13 @@ func (svc *ModelService[T]) DeleteById(id primitive.ObjectID) (err error) {
}

func (svc *ModelService[T]) DeleteOne(query bson.M) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.DeleteOne(ctx, &grpc.ModelServiceDeleteOneRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.DeleteOne(ctx, &grpc.ModelServiceDeleteOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -119,13 +118,13 @@ func (svc *ModelService[T]) DeleteOne(query bson.M) (err error) {
}

func (svc *ModelService[T]) DeleteMany(query bson.M) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.DeleteMany(ctx, &grpc.ModelServiceDeleteManyRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.DeleteMany(ctx, &grpc.ModelServiceDeleteManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -137,13 +136,13 @@ func (svc *ModelService[T]) DeleteMany(query bson.M) (err error) {
}

func (svc *ModelService[T]) UpdateById(id primitive.ObjectID, update bson.M) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
updateData, err := json.Marshal(update)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.UpdateById(ctx, &grpc.ModelServiceUpdateByIdRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.UpdateById(ctx, &grpc.ModelServiceUpdateByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Expand All @@ -156,7 +155,7 @@ func (svc *ModelService[T]) UpdateById(id primitive.ObjectID, update bson.M) (er
}

func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
Expand All @@ -166,7 +165,7 @@ func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) {
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.UpdateOne(ctx, &grpc.ModelServiceUpdateOneRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.UpdateOne(ctx, &grpc.ModelServiceUpdateOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -179,7 +178,7 @@ func (svc *ModelService[T]) UpdateOne(query bson.M, update bson.M) (err error) {
}

func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
Expand All @@ -189,7 +188,7 @@ func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.UpdateMany(ctx, &grpc.ModelServiceUpdateManyRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.UpdateMany(ctx, &grpc.ModelServiceUpdateManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -199,13 +198,13 @@ func (svc *ModelService[T]) UpdateMany(query bson.M, update bson.M) (err error)
}

func (svc *ModelService[T]) ReplaceById(id primitive.ObjectID, model T) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelData, err := json.Marshal(model)
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.ReplaceById(ctx, &grpc.ModelServiceReplaceByIdRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.ReplaceById(ctx, &grpc.ModelServiceReplaceByIdRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Id: id.Hex(),
Expand All @@ -218,7 +217,7 @@ func (svc *ModelService[T]) ReplaceById(id primitive.ObjectID, model T) (err err
}

func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
Expand All @@ -228,7 +227,7 @@ func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) {
if err != nil {
return err
}
_, err = svc.c.ModelBaseServiceClient.ReplaceOne(ctx, &grpc.ModelServiceReplaceOneRequest{
_, err = client.GetGrpcClient().ModelBaseServiceClient.ReplaceOne(ctx, &grpc.ModelServiceReplaceOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -241,13 +240,13 @@ func (svc *ModelService[T]) ReplaceOne(query bson.M, model T) (err error) {
}

func (svc *ModelService[T]) InsertOne(model T) (id primitive.ObjectID, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelData, err := json.Marshal(model)
if err != nil {
return primitive.NilObjectID, err
}
res, err := svc.c.ModelBaseServiceClient.InsertOne(ctx, &grpc.ModelServiceInsertOneRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.InsertOne(ctx, &grpc.ModelServiceInsertOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Model: modelData,
Expand All @@ -259,13 +258,13 @@ func (svc *ModelService[T]) InsertOne(model T) (id primitive.ObjectID, err error
}

func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
modelsData, err := json.Marshal(models)
if err != nil {
return nil, err
}
res, err := svc.c.ModelBaseServiceClient.InsertMany(ctx, &grpc.ModelServiceInsertManyRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.InsertMany(ctx, &grpc.ModelServiceInsertManyRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Models: modelsData,
Expand All @@ -277,7 +276,7 @@ func (svc *ModelService[T]) InsertMany(models []T) (ids []primitive.ObjectID, er
}

func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.ObjectID, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
Expand All @@ -287,7 +286,7 @@ func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.Objec
if err != nil {
return primitive.NilObjectID, err
}
res, err := svc.c.ModelBaseServiceClient.UpsertOne(ctx, &grpc.ModelServiceUpsertOneRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.UpsertOne(ctx, &grpc.ModelServiceUpsertOneRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand All @@ -301,13 +300,13 @@ func (svc *ModelService[T]) UpsertOne(query bson.M, model T) (id primitive.Objec
}

func (svc *ModelService[T]) Count(query bson.M) (total int, err error) {
ctx, cancel := svc.c.Context()
ctx, cancel := client.GetGrpcClient().Context()
defer cancel()
queryData, err := json.Marshal(query)
if err != nil {
return 0, err
}
res, err := svc.c.ModelBaseServiceClient.Count(ctx, &grpc.ModelServiceCountRequest{
res, err := client.GetGrpcClient().ModelBaseServiceClient.Count(ctx, &grpc.ModelServiceCountRequest{
NodeKey: svc.cfg.GetNodeKey(),
ModelType: svc.modelType,
Query: queryData,
Expand Down Expand Up @@ -356,18 +355,9 @@ func NewModelService[T any]() *ModelService[T] {

var instance *ModelService[T]

c := client.GetGrpcClient()
if !c.IsStarted() {
err := c.Start()
if err != nil {
panic(err)
}
}

onceMap[typeName].Do(func() {
instance = &ModelService[T]{
cfg: nodeconfig.GetNodeConfigService(),
c: c,
modelType: typeName,
}
instanceMap[typeName] = instance
Expand Down
Loading

0 comments on commit 3cb74d7

Please sign in to comment.