diff --git a/core/constants/dependency.go b/core/constants/dependency.go index ec40a8a4f..e6457f039 100644 --- a/core/constants/dependency.go +++ b/core/constants/dependency.go @@ -21,3 +21,7 @@ const ( DependencyFileTypeRequirementsTxt = "requirements.txt" DependencyFileTypePackageJson = "package.json" ) +const ( + DependencyActionSync = "sync" + DependencyActionSetup = "setup" +) diff --git a/core/grpc/client/client.go b/core/grpc/client/client.go index 4ff2ef04a..6df1ec08c 100644 --- a/core/grpc/client/client.go +++ b/core/grpc/client/client.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/apex/log" "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab/core/grpc/middlewares" "github.com/crawlab-team/crawlab/core/interfaces" @@ -31,6 +30,7 @@ type GrpcClient struct { once sync.Once stopped bool stop chan struct{} + logger interfaces.Logger // clients NodeClient grpc2.NodeServiceClient @@ -45,7 +45,7 @@ type GrpcClient struct { reconnect chan struct{} } -func (c *GrpcClient) Start() (err error) { +func (c *GrpcClient) Start() { c.once.Do(func() { // initialize reconnect channel c.reconnect = make(chan struct{}) @@ -54,23 +54,22 @@ func (c *GrpcClient) Start() (err error) { go c.monitorState() // connect - err = c.connect() + err := c.connect() if err != nil { + c.logger.Fatalf("failed to connect: %v", err) return } // register rpc services c.register() }) - - return err } func (c *GrpcClient) Stop() (err error) { // set stopped flag c.stopped = true c.stop <- struct{}{} - log.Infof("[GrpcClient] stopped") + c.logger.Infof("stopped") // skip if connection is nil if c.conn == nil { @@ -79,10 +78,10 @@ func (c *GrpcClient) Stop() (err error) { // close connection if err := c.conn.Close(); err != nil { - log.Errorf("grpc client failed to close connection: %v", err) + c.logger.Errorf("failed to close connection: %v", err) return err } - log.Infof("grpc client disconnected from %s", c.address) + c.logger.Infof("disconnected from %s", c.address) return nil } @@ -94,11 +93,11 @@ func (c *GrpcClient) WaitForReady() { select { case <-ticker.C: if c.IsReady() { - log.Debugf("grpc client ready") + c.logger.Debugf("ready") return } case <-c.stop: - log.Errorf("grpc client stopped") + c.logger.Errorf("stopped") } } } @@ -143,7 +142,7 @@ func (c *GrpcClient) monitorState() { if previous != current { c.setState(current) - log.Infof("[GrpcClient] state changed from %s to %s", previous, current) + c.logger.Infof("state changed from %s to %s", previous, current) // Trigger reconnect if connection is lost or becomes idle from ready state if current == connectivity.TransientFailure || @@ -151,7 +150,7 @@ func (c *GrpcClient) monitorState() { (previous == connectivity.Ready && current == connectivity.Idle) { select { case c.reconnect <- struct{}{}: - log.Infof("[GrpcClient] triggering reconnection due to state change to %s", current) + c.logger.Infof("triggering reconnection due to state change to %s", current) default: } } @@ -183,9 +182,9 @@ func (c *GrpcClient) connect() (err error) { return case <-c.reconnect: if !c.stopped { - log.Infof("[GrpcClient] attempting to reconnect to %s", c.address) + c.logger.Infof("attempting to reconnect to %s", c.address) if err := c.doConnect(); err != nil { - log.Errorf("[GrpcClient] reconnection failed: %v", err) + c.logger.Errorf("reconnection failed: %v", err) } } } @@ -207,12 +206,12 @@ func (c *GrpcClient) doConnect() (err error) { // create new client connection c.conn, err = grpc.NewClient(c.address, opts...) if err != nil { - log.Errorf("[GrpcClient] grpc client failed to connect to %s: %v", c.address, err) + c.logger.Errorf("failed to connect to %s: %v", c.address, err) return err } // connect - log.Infof("[GrpcClient] grpc client connecting to %s", c.address) + c.logger.Infof("connecting to %s", c.address) c.conn.Connect() // wait for connection to be ready @@ -220,11 +219,11 @@ func (c *GrpcClient) doConnect() (err error) { defer cancel() ok := c.conn.WaitForStateChange(ctx, connectivity.Ready) if !ok { - return fmt.Errorf("[GrpcClient] grpc client failed to connect to %s: timed out", c.address) + return fmt.Errorf("failed to connect to %s: timed out", c.address) } // success - log.Infof("[GrpcClient] grpc client connected to %s", c.address) + c.logger.Infof("connected to %s", c.address) return nil } @@ -232,7 +231,7 @@ func (c *GrpcClient) doConnect() (err error) { b.InitialInterval = 5 * time.Second b.MaxElapsedTime = 10 * time.Minute n := func(err error, duration time.Duration) { - log.Errorf("[GrpcClient] grpc client failed to connect to %s: %v, retrying in %s", c.address, err, duration) + c.logger.Errorf("failed to connect to %s: %v, retrying in %s", c.address, err, duration) } return backoff.RetryNotify(op, b, n) } @@ -242,6 +241,7 @@ func newGrpcClient() (c *GrpcClient) { address: utils.GetGrpcAddress(), timeout: 10 * time.Second, stop: make(chan struct{}), + logger: utils.NewServiceLogger("GrpcClient"), state: connectivity.Idle, } } @@ -252,12 +252,7 @@ var _clientOnce sync.Once func GetGrpcClient() *GrpcClient { _clientOnce.Do(func() { _client = newGrpcClient() - go func() { - err := _client.Start() - if err != nil { - log.Fatalf("[GrpcClient] failed to start: %v", err) - } - }() + go _client.Start() }) return _client } diff --git a/core/models/client/model_service_test.go b/core/models/client/model_service_test.go index 12e0b45fd..0c7f15ff5 100644 --- a/core/models/client/model_service_test.go +++ b/core/models/client/model_service_test.go @@ -57,8 +57,7 @@ func TestModelService_GetById(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() res, err := clientSvc.GetById(m.Id) @@ -83,8 +82,7 @@ func TestModelService_GetOne(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() res, err := clientSvc.GetOne(bson.M{"name": m.Name}, nil) @@ -109,8 +107,7 @@ func TestModelService_GetMany(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() res, err := clientSvc.GetMany(bson.M{"name": m.Name}, nil) @@ -136,8 +133,7 @@ func TestModelService_DeleteById(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() err = clientSvc.DeleteById(m.Id) @@ -164,8 +160,7 @@ func TestModelService_DeleteOne(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() err = clientSvc.DeleteOne(bson.M{"name": m.Name}) @@ -192,8 +187,7 @@ func TestModelService_DeleteMany(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() err = clientSvc.DeleteMany(bson.M{"name": m.Name}) @@ -220,8 +214,7 @@ func TestModelService_UpdateById(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() err = clientSvc.UpdateById(m.Id, bson.M{"$set": bson.M{"name": "New Name"}}) @@ -248,8 +241,7 @@ func TestModelService_UpdateOne(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() err = clientSvc.UpdateOne(bson.M{"name": m.Name}, bson.M{"$set": bson.M{"name": "New Name"}}) @@ -280,8 +272,7 @@ func TestModelService_UpdateMany(t *testing.T) { require.Nil(t, err) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() err = clientSvc.UpdateMany(bson.M{"name": "Test Name"}, bson.M{"$set": bson.M{"name": "New Name"}}) @@ -308,8 +299,7 @@ func TestModelService_ReplaceById(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() m.Name = "New Name" @@ -337,8 +327,7 @@ func TestModelService_ReplaceOne(t *testing.T) { m.SetId(id) time.Sleep(100 * time.Millisecond) - err = client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() m.Name = "New Name" @@ -357,8 +346,7 @@ func TestModelService_InsertOne(t *testing.T) { go startSvr(svr) defer stopSvr(svr) - err := client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() m := models.TestModel{ @@ -379,8 +367,7 @@ func TestModelService_InsertMany(t *testing.T) { go startSvr(svr) defer stopSvr(svr) - err := client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() testModels := []models.TestModel{ @@ -413,8 +400,7 @@ func TestModelService_Count(t *testing.T) { } time.Sleep(100 * time.Millisecond) - err := client2.GetGrpcClient().Start() - require.Nil(t, err) + client2.GetGrpcClient().Start() clientSvc := client.NewModelService[models.TestModel]() count, err := clientSvc.Count(bson.M{}) diff --git a/frontend/packages/crawlab-ui b/frontend/packages/crawlab-ui index 2814ef081..1b60483ed 160000 --- a/frontend/packages/crawlab-ui +++ b/frontend/packages/crawlab-ui @@ -1 +1 @@ -Subproject commit 2814ef081354b4bd303cf074191060305dabfba4 +Subproject commit 1b60483eda8490d108d0d8e2d175ff7c74e6bc71