From 5815476722af204a615cf4f48d9262cd65bbc333 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 20 Nov 2023 17:43:00 +0400 Subject: [PATCH] control: set the highest priority for control service Control ports should be initialised and start served first of all. The priority of running control service is maximized for both SN and IR. Closes: #2585. Signed-off-by: Ekaterina Pavlova --- CHANGELOG.md | 1 + cmd/neofs-node/control.go | 7 +-- cmd/neofs-node/main.go | 2 +- pkg/innerring/innerring.go | 98 ++++++++++++++++++-------------------- 4 files changed, 53 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f01252c2d5..99feebc32f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Changelog for NeoFS Node - IR uses internal LOCODE DB from Go package (#2610) - Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642) - The priority of metrics service is increased (#2428) +- The priority of running control service is maximized (#2585) ### Removed - deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496) diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 02ece79da7..e29dc0566d 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -63,12 +63,13 @@ func initControlService(c *cfg) { }) control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) - - c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { + c.wg.Add(1) + go func() { runAndLog(c, "control", false, func(c *cfg) { fatalOnErr(c.cfgControlService.server.Serve(lis)) + c.wg.Done() }) - })) + }() } func (c *cfg) NetmapStatus() control.NetmapStatus { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index b6e890518b..8a7fa836bc 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -121,6 +121,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) { } func initApp(c *cfg) { + initAndLog(c, "control", initControlService) initLocalStorage(c) c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) @@ -139,7 +140,6 @@ func initApp(c *cfg) { initAndLog(c, "notification", initNotifications) initAndLog(c, "object", initObjectService) initAndLog(c, "tree", initTreeService) - initAndLog(c, "control", initControlService) initAndLog(c, "morph notifications", listenMorphNotifications) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 0f2d5c8ca3..67f4782d68 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -519,6 +519,53 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- } } + controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") + if controlSvcEndpoint != "" { + authKeysStr := cfg.GetStringSlice("control.authorized_keys") + authKeys := make([][]byte, 0, len(authKeysStr)) + + for i := range authKeysStr { + key, err := hex.DecodeString(authKeysStr[i]) + if err != nil { + return nil, fmt.Errorf("could not parse Control authorized key %s: %w", + authKeysStr[i], + err, + ) + } + + authKeys = append(authKeys, key) + } + + lis, err := net.Listen("tcp", controlSvcEndpoint) + if err != nil { + return nil, err + } + var p controlsrv.Prm + + p.SetPrivateKey(*server.key) + p.SetHealthChecker(server) + + controlSvc := controlsrv.New(p, + controlsrv.WithAllowedKeys(authKeys), + ) + + grpcControlSrv := grpc.NewServer() + control.RegisterControlServiceServer(grpcControlSrv, controlSvc) + + go func() { + errChan <- grpcControlSrv.Serve(lis) + }() + + server.registerNoErrCloser(grpcControlSrv.GracefulStop) + } else { + log.Info("no Control server endpoint specified, service is disabled") + } + + if cfg.GetString("prometheus.address") != "" { + m := metrics.NewInnerRingMetrics(misc.Version) + server.metrics = &m + } + // create morph listener server.morphListener, err = createListener(server.morphClient, morphChain) if err != nil { @@ -985,57 +1032,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- server.addBlockTimer(emissionTimer) - controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") - if controlSvcEndpoint != "" { - authKeysStr := cfg.GetStringSlice("control.authorized_keys") - authKeys := make([][]byte, 0, len(authKeysStr)) - - for i := range authKeysStr { - key, err := hex.DecodeString(authKeysStr[i]) - if err != nil { - return nil, fmt.Errorf("could not parse Control authorized key %s: %w", - authKeysStr[i], - err, - ) - } - - authKeys = append(authKeys, key) - } - - var p controlsrv.Prm - - p.SetPrivateKey(*server.key) - p.SetHealthChecker(server) - - controlSvc := controlsrv.New(p, - controlsrv.WithAllowedKeys(authKeys), - ) - - grpcControlSrv := grpc.NewServer() - control.RegisterControlServiceServer(grpcControlSrv, controlSvc) - - server.runners = append(server.runners, func(ch chan<- error) error { - lis, err := net.Listen("tcp", controlSvcEndpoint) - if err != nil { - return err - } - - go func() { - ch <- grpcControlSrv.Serve(lis) - }() - return nil - }) - - server.registerNoErrCloser(grpcControlSrv.GracefulStop) - } else { - log.Info("no Control server endpoint specified, service is disabled") - } - - if cfg.GetString("prometheus.address") != "" { - m := metrics.NewInnerRingMetrics(misc.Version) - server.metrics = &m - } - return server, nil }