Skip to content

Commit

Permalink
control: set the highest priority for control service. (#2645)
Browse files Browse the repository at this point in the history
Closes: #2585.
  • Loading branch information
roman-khimov authored Nov 20, 2023
2 parents 3fcafec + 5815476 commit 67c5b02
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Changelog for NeoFS Node
- IR uses internal LOCODE DB from Go package (#2610)
- Contract group for the committee is no longer registered/used in the Sidechain auto-deployment (#2642)
- The priority of metrics service is increased (#2428)
- The priority of running control service is maximized (#2585)

### Removed
- deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496)
Expand Down
7 changes: 4 additions & 3 deletions cmd/neofs-node/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ func initControlService(c *cfg) {
})

control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc)

c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
c.wg.Add(1)
go func() {
runAndLog(c, "control", false, func(c *cfg) {
fatalOnErr(c.cfgControlService.server.Serve(lis))
c.wg.Done()
})
}))
}()
}

func (c *cfg) NetmapStatus() control.NetmapStatus {
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) {
}

func initApp(c *cfg) {
initAndLog(c, "control", initControlService)
initLocalStorage(c)

c.ctx, c.ctxCancel = signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -139,7 +140,6 @@ func initApp(c *cfg) {
initAndLog(c, "notification", initNotifications)
initAndLog(c, "object", initObjectService)
initAndLog(c, "tree", initTreeService)
initAndLog(c, "control", initControlService)

initAndLog(c, "morph notifications", listenMorphNotifications)

Expand Down
98 changes: 47 additions & 51 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,53 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
}
}

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return nil, err
}
var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

go func() {
errChan <- grpcControlSrv.Serve(lis)
}()

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

// create morph listener
server.morphListener, err = createListener(server.morphClient, morphChain)
if err != nil {
Expand Down Expand Up @@ -985,57 +1032,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-

server.addBlockTimer(emissionTimer)

controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))

for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}

authKeys = append(authKeys, key)
}

var p controlsrv.Prm

p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)

controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)

grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)

server.runners = append(server.runners, func(ch chan<- error) error {
lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
}

go func() {
ch <- grpcControlSrv.Serve(lis)
}()
return nil
})

server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}

if cfg.GetString("prometheus.address") != "" {
m := metrics.NewInnerRingMetrics(misc.Version)
server.metrics = &m
}

return server, nil
}

Expand Down

0 comments on commit 67c5b02

Please sign in to comment.