diff --git a/CHANGELOG.md b/CHANGELOG.md index 9acf369a30..6d48950dd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ Changelog for NeoFS Node ## [Unreleased] +### Added +- Policer's setting to the SN's application configuration (#2600) + ### Fixed - `neofs-cli netmap netinfo` documentation (#2555) - `GETRANGEHASH` to a node without an object produced `GETRANGE` or `GET` requests (#2541, #2598) @@ -13,9 +16,11 @@ Changelog for NeoFS Node ### Changed - FSTree storage now uses more efficient and safe temporary files under Linux (#2566) - BoltDB open timeout increased from 100ms to 1s (#2499) +- Internal container cache size from 10 to 1000 (#2600) ### Removed - deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496) +- Recently-handled objects Policer's cache (#2600) ### Updated - Update minimal supported Go version up to v1.19 (#2485) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index f7fb75ed54..b0d3c9b6ec 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -30,6 +30,7 @@ import ( metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object" + policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/storage" "github.com/nspcc-dev/neofs-node/misc" @@ -56,6 +57,7 @@ import ( getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone" tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source" + "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" @@ -100,6 +102,13 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []storage.ShardCfg } + + PolicerCfg struct { + maxCapacity uint32 + headTimeout time.Duration + replicationCooldown time.Duration + objectBatchSize uint32 + } } // readConfig fills applicationConfiguration with raw configuration values @@ -126,6 +135,13 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.LoggerCfg.level = loggerconfig.Level(c) + // Policer + + a.PolicerCfg.maxCapacity = policerconfig.MaxWorkers(c) + a.PolicerCfg.headTimeout = policerconfig.HeadTimeout(c) + a.PolicerCfg.replicationCooldown = policerconfig.ReplicationCooldown(c) + a.PolicerCfg.objectBatchSize = policerconfig.ObjectBatchSize(c) + // Storage Engine a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) @@ -304,6 +320,8 @@ type shared struct { respSvc *response.Service + policer *policer.Policer + replicator *replicator.Replicator treeService *tree.Service @@ -727,6 +745,17 @@ func (c *cfg) shardOpts() []shardOptsWithID { return shards } +func (c *cfg) policerOpts() []policer.Option { + pCfg := c.applicationConfiguration.PolicerCfg + + return []policer.Option{ + policer.WithMaxCapacity(pCfg.maxCapacity), + policer.WithHeadTimeout(pCfg.headTimeout), + policer.WithReplicationCooldown(pCfg.replicationCooldown), + policer.WithObjectBatchSize(pCfg.objectBatchSize), + } +} + func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } @@ -905,6 +934,10 @@ func (c *cfg) configWatcher(ctx context.Context) { continue } + // Policer + + c.shared.policer.Reload(c.policerOpts()...) + // Storage Engine var rcfg engine.ReConfiguration diff --git a/cmd/neofs-node/config/policer/config.go b/cmd/neofs-node/config/policer/config.go index 5e61175554..e223b3458f 100644 --- a/cmd/neofs-node/config/policer/config.go +++ b/cmd/neofs-node/config/policer/config.go @@ -11,6 +11,14 @@ const ( // HeadTimeoutDefault is a default object.Head request timeout in policer. HeadTimeoutDefault = 5 * time.Second + + // ReplicationCooldownDefault is a default cooldown time b/w replication tasks + // submitting. + ReplicationCooldownDefault = time.Duration(0) + // ObjectBatchSizeDefault is a default replication's objects batch size. + ObjectBatchSizeDefault = 10 + // MaxWorkersDefault is a default replication's worker pool's maximum size. + MaxWorkersDefault = 20 ) // HeadTimeout returns the value of "head_timeout" config parameter @@ -25,3 +33,42 @@ func HeadTimeout(c *config.Config) time.Duration { return HeadTimeoutDefault } + +// ReplicationCooldown returns the value of "replication_cooldown" config parameter +// from "policer" section. +// +// Returns ReplicationCooldownDefault if a value is not a positive duration. +func ReplicationCooldown(c *config.Config) time.Duration { + v := config.DurationSafe(c.Sub(subsection), "replication_cooldown") + if v > 0 { + return v + } + + return ReplicationCooldownDefault +} + +// ObjectBatchSize returns the value of "object_batch_size" config parameter +// from "policer" section. +// +// Returns ObjectBatchSizeDefault if a value is not a positive number. +func ObjectBatchSize(c *config.Config) uint32 { + v := config.Uint32Safe(c.Sub(subsection), "object_batch_size") + if v > 0 { + return v + } + + return ObjectBatchSizeDefault +} + +// MaxWorkers returns the value of "max_workers" config parameter +// from "policer" section. +// +// Returns MaxWorkersDefault if a value is not a positive number. +func MaxWorkers(c *config.Config) uint32 { + v := config.Uint32Safe(c.Sub(subsection), "max_workers") + if v > 0 { + return v + } + + return MaxWorkersDefault +} diff --git a/cmd/neofs-node/config/policer/config_test.go b/cmd/neofs-node/config/policer/config_test.go index 03a2f3a86c..95debfa7a0 100644 --- a/cmd/neofs-node/config/policer/config_test.go +++ b/cmd/neofs-node/config/policer/config_test.go @@ -15,12 +15,18 @@ func TestPolicerSection(t *testing.T) { empty := configtest.EmptyConfig() require.Equal(t, policerconfig.HeadTimeoutDefault, policerconfig.HeadTimeout(empty)) + require.Equal(t, policerconfig.ReplicationCooldownDefault, policerconfig.ReplicationCooldown(empty)) + require.Equal(t, uint32(policerconfig.ObjectBatchSizeDefault), policerconfig.ObjectBatchSize(empty)) + require.Equal(t, uint32(policerconfig.MaxWorkersDefault), policerconfig.MaxWorkers(empty)) }) const path = "../../../../config/example/node" var fileConfigTest = func(c *config.Config) { require.Equal(t, 15*time.Second, policerconfig.HeadTimeout(c)) + require.Equal(t, 101*time.Millisecond, policerconfig.ReplicationCooldown(c)) + require.Equal(t, uint32(11), policerconfig.ObjectBatchSize(c)) + require.Equal(t, uint32(21), policerconfig.MaxWorkers(c)) } configtest.ForEachFileType(path, fileConfigTest) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index c558049d02..b966e9ce6d 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -8,7 +8,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" - policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" @@ -190,7 +189,7 @@ func initObjectService(c *cfg) { ), ) - pol := policer.New( + c.policer = policer.New( policer.WithLogger(c.log), policer.WithLocalStorage(ls), policer.WithContainerSource(c.cfgObject.cnrSource), @@ -201,9 +200,7 @@ func initObjectService(c *cfg) { headsvc.NewRemoteHeader(keyStorage, clientConstructor), ), policer.WithNetmapKeys(c), - policer.WithHeadTimeout( - policerconfig.HeadTimeout(c.appCfg), - ), + policer.WithHeadTimeout(c.applicationConfiguration.PolicerCfg.headTimeout), policer.WithReplicator(c.replicator), policer.WithRedundantCopyCallback(func(addr oid.Address) { var inhumePrm engine.InhumePrm @@ -216,15 +213,17 @@ func initObjectService(c *cfg) { ) } }), - policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize), + policer.WithMaxCapacity(c.applicationConfiguration.PolicerCfg.maxCapacity), policer.WithPool(c.cfgObject.pool.replication), policer.WithNodeLoader(c), policer.WithNetwork(c), + policer.WithReplicationCooldown(c.applicationConfiguration.PolicerCfg.replicationCooldown), + policer.WithObjectBatchSize(c.applicationConfiguration.PolicerCfg.objectBatchSize), ) traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c) - c.workers = append(c.workers, pol) + c.workers = append(c.workers, c.policer) var os putsvc.ObjectStorage = engineWithoutNotifications{ engine: ls, diff --git a/config/example/node.env b/config/example/node.env index 5449b6567a..f0f651394f 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -74,6 +74,11 @@ NEOFS_APICLIENT_ALLOW_EXTERNAL=true # Policer section NEOFS_POLICER_HEAD_TIMEOUT=15s +NEOFS_POLICER_CACHE_SIZE=1000001 +NEOFS_POLICER_CACHE_TIME=31s +NEOFS_POLICER_REPLICATION_COOLDOWN=101ms +NEOFS_POLICER_OBJECT_BATCH_SIZE=11 +NEOFS_POLICER_MAX_WORKERS=21 # Replicator section NEOFS_REPLICATOR_PUT_TIMEOUT=15s diff --git a/config/example/node.json b/config/example/node.json index fa11730f57..cbb685a95f 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -108,7 +108,12 @@ "allow_external": true }, "policer": { - "head_timeout": "15s" + "head_timeout": "15s", + "cache_size": "1000001", + "cache_time": "31s", + "replication_cooldown": "101ms", + "object_batch_size": "11", + "max_workers": "21" }, "replicator": { "pool_size": 10, diff --git a/config/example/node.yaml b/config/example/node.yaml index fde52a969f..18eea8fe84 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -94,6 +94,11 @@ apiclient: policer: head_timeout: 15s # timeout for the Policer HEAD remote operation + cache_size: 1000001 # recently-handled objects cache size + cache_time: 31s # recently-handled objects cache expiration time + replication_cooldown: 101ms # cooldown time b/w replication tasks submitting + object_batch_size: 11 # replication's objects batch size + max_workers: 21 # replication's worker pool's maximum size replicator: put_timeout: 15s # timeout for the Replicator PUT remote operation (defaults to 1m) diff --git a/docs/sighup.md b/docs/sighup.md index 4243d620c1..0c41564159 100644 --- a/docs/sighup.md +++ b/docs/sighup.md @@ -4,6 +4,19 @@ Logger level can be reloaded with a SIGHUP. +## Policer + +Available for reconfiguration fields: + +```yml + head_timeout: + cache_size: + cache_time: + replication_cooldown: + object_batch_size: + max_workers: +``` + ## Storage engine Shards can be added, removed or reloaded with SIGHUP. diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 6eccf54569..c76704a6d9 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -378,11 +378,17 @@ Configuration for the Policer service. It ensures that object is stored accordin ```yaml policer: head_timeout: 15s + replication_cooldown: 100ms + object_batch_size: 10 + max_workers: 20 ``` -| Parameter | Type | Default value | Description | -|----------------|------------|---------------|----------------------------------------------| -| `head_timeout` | `duration` | `5s` | Timeout for performing the `HEAD` operation. | +| Parameter | Type | Default value | Description | +|------------------------|------------|---------------|-----------------------------------------------------| +| `head_timeout` | `duration` | `5s` | Timeout for performing the `HEAD` operation. | +| `replication_cooldown` | `duration` | `0s` | Cooldown time between replication tasks submitting. | +| `object_batch_size` | `int` | `10` | Replication's objects batch size. | +| `max_workers` | `int` | `20` | Replication's worker pool's maximum size. | # `replicator` section diff --git a/pkg/services/object_manager/placement/netmap.go b/pkg/services/object_manager/placement/netmap.go index 1e0e79aff2..c5013ddd0b 100644 --- a/pkg/services/object_manager/placement/netmap.go +++ b/pkg/services/object_manager/placement/netmap.go @@ -29,7 +29,7 @@ type netMapSrc struct { } // defaultContainerCacheSize is the default size for the container cache. -const defaultContainerCacheSize = 10 +const defaultContainerCacheSize = 1000 func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder { cache, _ := simplelru.NewLRU[string, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 3e88b25366..3b333016b4 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -203,6 +203,10 @@ type processPlacementContext struct { func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) { prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address) + p.cfg.RLock() + headTimeout := p.headTimeout + p.cfg.RUnlock() + // Number of copies that are stored on maintenance nodes. var uncheckedCopies int @@ -262,7 +266,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node continue } - callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) + callCtx, cancel := context.WithTimeout(ctx, headTimeout) _, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i])) diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index a1d9a55d60..46577cbd96 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -4,7 +4,6 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru/v2" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -52,8 +51,6 @@ func (oiw *objectsInWork) add(addr oid.Address) { type Policer struct { *cfg - cache *lru.Cache[oid.Address, time.Time] - objsInWork *objectsInWork } @@ -73,7 +70,12 @@ type Network interface { } type cfg struct { + sync.RWMutex + // available for runtime reconfiguration headTimeout time.Duration + repCooldown time.Duration + batchSize uint32 + maxCapacity uint32 log *zap.Logger @@ -95,11 +97,7 @@ type cfg struct { loader nodeLoader - maxCapacity int - - batchSize, cacheSize uint32 - - rebalanceFreq, evictDuration time.Duration + rebalanceFreq time.Duration network Network } @@ -108,9 +106,8 @@ func defaultCfg() *cfg { return &cfg{ log: zap.L(), batchSize: 10, - cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB rebalanceFreq: 1 * time.Second, - evictDuration: 30 * time.Second, + repCooldown: 0, } } @@ -124,14 +121,8 @@ func New(opts ...Option) *Policer { c.log = c.log.With(zap.String("component", "Object Policer")) - cache, err := lru.New[oid.Address, time.Time](int(c.cacheSize)) - if err != nil { - panic(err) - } - return &Policer{ - cfg: c, - cache: cache, + cfg: c, objsInWork: &objectsInWork{ objs: make(map[oid.Address]struct{}, c.maxCapacity), }, @@ -205,7 +196,7 @@ func WithRedundantCopyCallback(cb RedundantCopyCallback) Option { // WithMaxCapacity returns option to set max capacity // that can be set to the pool. -func WithMaxCapacity(capacity int) Option { +func WithMaxCapacity(capacity uint32) Option { return func(c *cfg) { c.maxCapacity = capacity } @@ -232,3 +223,20 @@ func WithNetwork(n Network) Option { c.network = n } } + +// WithReplicationCooldown returns option to set replication +// cooldown: the [Policer] will not submit more than one task +// per a provided time duration. +func WithReplicationCooldown(d time.Duration) Option { + return func(c *cfg) { + c.repCooldown = d + } +} + +// WithObjectBatchSize returns option to set maximum objects +// read from the Storage at once. +func WithObjectBatchSize(s uint32) Option { + return func(c *cfg) { + c.batchSize = s + } +} diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index d29c6fc551..801de5fc0d 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -20,12 +20,24 @@ func (p *Policer) Run(ctx context.Context) { } func (p *Policer) shardPolicyWorker(ctx context.Context) { + p.cfg.RLock() + repCooldown := p.repCooldown + batchSize := p.batchSize + p.cfg.RUnlock() + var ( addrs []objectcore.AddressWithType cursor *engine.Cursor err error ) + t := time.NewTimer(repCooldown) + defer func() { + if !t.Stop() { + <-t.C + } + }() + for { select { case <-ctx.Done(): @@ -33,7 +45,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { default: } - addrs, cursor, err = p.jobQueue.Select(cursor, p.batchSize) + addrs, cursor, err = p.jobQueue.Select(cursor, batchSize) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { time.Sleep(time.Second) // finished whole cycle, sleep a bit @@ -55,16 +67,10 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { } err = p.taskPool.Submit(func() { - lastTime, ok := p.cache.Get(addr.Address) - if ok && time.Since(lastTime) < p.evictDuration { - return - } - p.objsInWork.add(addr.Address) p.processObject(ctx, addr) - p.cache.Add(addr.Address, time.Now()) p.objsInWork.remove(addr.Address) }) if err != nil { @@ -72,10 +78,23 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { } } } + + select { + case <-ctx.Done(): + return + case <-t.C: + p.cfg.RLock() + t.Reset(p.repCooldown) + p.cfg.RUnlock() + } } } func (p *Policer) poolCapacityWorker(ctx context.Context) { + p.cfg.RLock() + maxCapacity := p.maxCapacity + p.cfg.RUnlock() + ticker := time.NewTicker(p.rebalanceFreq) for { select { @@ -84,7 +103,7 @@ func (p *Policer) poolCapacityWorker(ctx context.Context) { return case <-ticker.C: neofsSysLoad := p.loader.ObjectServiceLoad() - newCapacity := int((1.0 - neofsSysLoad) * float64(p.maxCapacity)) + newCapacity := int((1.0 - neofsSysLoad) * float64(maxCapacity)) if newCapacity == 0 { newCapacity++ } diff --git a/pkg/services/policer/reload.go b/pkg/services/policer/reload.go new file mode 100644 index 0000000000..b0e36940d6 --- /dev/null +++ b/pkg/services/policer/reload.go @@ -0,0 +1,23 @@ +package policer + +// Reload allows runtime reconfiguration for the following parameters: +// - [WithHeadTimeout]; +// - [WithObjectCacheTime]; +// - [WithReplicationCooldown]; +// - [WithMaxCapacity]; +// - [WithObjectBatchSize]; +// - [WithObjectCacheSize]. +func (p *Policer) Reload(opts ...Option) { + cfg := new(cfg) + for _, o := range opts { + o(cfg) + } + + p.cfg.Lock() + defer p.cfg.Unlock() + + p.headTimeout = cfg.headTimeout + p.repCooldown = cfg.repCooldown + p.maxCapacity = cfg.maxCapacity + p.batchSize = cfg.batchSize +}