diff --git a/CHANGELOG.md b/CHANGELOG.md index f01252c2d5..99feebc32f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Changelog for NeoFS Node - IR uses internal LOCODE DB from Go package (#2610) - Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642) - The priority of metrics service is increased (#2428) +- The priority of running control service is maximized (#2585) ### Removed - deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496) diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 02ece79da7..e29dc0566d 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -63,12 +63,13 @@ func initControlService(c *cfg) { }) control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) - - c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { + c.wg.Add(1) + go func() { runAndLog(c, "control", false, func(c *cfg) { fatalOnErr(c.cfgControlService.server.Serve(lis)) + c.wg.Done() }) - })) + }() } func (c *cfg) NetmapStatus() control.NetmapStatus { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index b6e890518b..8a7fa836bc 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -121,6 +121,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) { } func initApp(c *cfg) { + initAndLog(c, "control", initControlService) initLocalStorage(c) c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) @@ -139,7 +140,6 @@ func initApp(c *cfg) { initAndLog(c, "notification", initNotifications) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) - initAndLog(c, "control", initControlService) initAndLog(c, "morph notifications", listenMorphNotifications) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 0f2d5c8ca3..67f4782d68 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -519,6 +519,53 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- } } + controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") + if controlSvcEndpoint != "" { + authKeysStr := cfg.GetStringSlice("control.authorized_keys") + authKeys := make([][]byte, 0, len(authKeysStr)) + + for i := range authKeysStr { + key, err := hex.DecodeString(authKeysStr[i]) + if err != nil { + return nil, fmt.Errorf("could not parse Control authorized key %s: %w", + authKeysStr[i], + err, + ) + } + + authKeys = append(authKeys, key) + } + + lis, err := net.Listen("tcp", controlSvcEndpoint) + if err != nil { + return nil, err + } + var p controlsrv.Prm + + p.SetPrivateKey(*server.key) + p.SetHealthChecker(server) + + controlSvc := controlsrv.New(p, + controlsrv.WithAllowedKeys(authKeys), + ) + + grpcControlSrv := grpc.NewServer() + control.RegisterControlServiceServer(grpcControlSrv, controlSvc) + + go func() { + errChan <- grpcControlSrv.Serve(lis) + }() + + server.registerNoErrCloser(grpcControlSrv.GracefulStop) + } else { + log.Info("no Control server endpoint specified, service is disabled") + } + + if cfg.GetString("prometheus.address") != "" { + m := metrics.NewInnerRingMetrics(misc.Version) + server.metrics = &m + } + // create morph listener server.morphListener, err = createListener(server.morphClient, morphChain) if err != nil { @@ -985,57 +1032,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- server.addBlockTimer(emissionTimer) - controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") - if controlSvcEndpoint != "" { - authKeysStr := cfg.GetStringSlice("control.authorized_keys") - authKeys := make([][]byte, 0, len(authKeysStr)) - - for i := range authKeysStr { - key, err := hex.DecodeString(authKeysStr[i]) - if err != nil { - return nil, fmt.Errorf("could not parse Control authorized key %s: %w", - authKeysStr[i], - err, - ) - } - - authKeys = append(authKeys, key) - } - - var p controlsrv.Prm - - p.SetPrivateKey(*server.key) - p.SetHealthChecker(server) - - controlSvc := controlsrv.New(p, - controlsrv.WithAllowedKeys(authKeys), - ) - - grpcControlSrv := grpc.NewServer() - control.RegisterControlServiceServer(grpcControlSrv, controlSvc) - - server.runners = append(server.runners, func(ch chan<- error) error { - lis, err := net.Listen("tcp", controlSvcEndpoint) - if err != nil { - return err - } - - go func() { - ch <- grpcControlSrv.Serve(lis) - }() - return nil - }) - - server.registerNoErrCloser(grpcControlSrv.GracefulStop) - } else { - log.Info("no Control server endpoint specified, service is disabled") - } - - if cfg.GetString("prometheus.address") != "" { - m := metrics.NewInnerRingMetrics(misc.Version) - server.metrics = &m - } - return server, nil }