Skip to content

Commit

Permalink
feat/Policer enhancements (#2600)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Oct 9, 2023
2 parents 154eb64 + 40762d3 commit 1e568b6
Show file tree
Hide file tree
Showing 15 changed files with 217 additions and 39 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ Changelog for NeoFS Node

## [Unreleased]

### Added
- Policer's setting to the SN's application configuration (#2600)

### Fixed
- `neofs-cli netmap netinfo` documentation (#2555)
- `GETRANGEHASH` to a node without an object produced `GETRANGE` or `GET` requests (#2541, #2598)
Expand All @@ -13,9 +16,11 @@ Changelog for NeoFS Node
### Changed
- FSTree storage now uses more efficient and safe temporary files under Linux (#2566)
- BoltDB open timeout increased from 100ms to 1s (#2499)
- Internal container cache size from 10 to 1000 (#2600)

### Removed
- deprecated `no-precheck` flag of `neofs-cli container set-eacl` (#2496)
- Recently-handled objects Policer's cache (#2600)

### Updated
- Update minimal supported Go version up to v1.19 (#2485)
Expand Down
33 changes: 33 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/storage"
"github.com/nspcc-dev/neofs-node/misc"
Expand All @@ -56,6 +57,7 @@ import (
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone"
tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
Expand Down Expand Up @@ -100,6 +102,13 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []storage.ShardCfg
}

PolicerCfg struct {
maxCapacity uint32
headTimeout time.Duration
replicationCooldown time.Duration
objectBatchSize uint32
}
}

// readConfig fills applicationConfiguration with raw configuration values
Expand All @@ -126,6 +135,13 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {

a.LoggerCfg.level = loggerconfig.Level(c)

// Policer

a.PolicerCfg.maxCapacity = policerconfig.MaxWorkers(c)
a.PolicerCfg.headTimeout = policerconfig.HeadTimeout(c)
a.PolicerCfg.replicationCooldown = policerconfig.ReplicationCooldown(c)
a.PolicerCfg.objectBatchSize = policerconfig.ObjectBatchSize(c)

// Storage Engine

a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
Expand Down Expand Up @@ -304,6 +320,8 @@ type shared struct {

respSvc *response.Service

policer *policer.Policer

replicator *replicator.Replicator

treeService *tree.Service
Expand Down Expand Up @@ -727,6 +745,17 @@ func (c *cfg) shardOpts() []shardOptsWithID {
return shards
}

func (c *cfg) policerOpts() []policer.Option {
pCfg := c.applicationConfiguration.PolicerCfg

return []policer.Option{
policer.WithMaxCapacity(pCfg.maxCapacity),
policer.WithHeadTimeout(pCfg.headTimeout),
policer.WithReplicationCooldown(pCfg.replicationCooldown),
policer.WithObjectBatchSize(pCfg.objectBatchSize),
}
}

func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr
}
Expand Down Expand Up @@ -905,6 +934,10 @@ func (c *cfg) configWatcher(ctx context.Context) {
continue
}

// Policer

c.shared.policer.Reload(c.policerOpts()...)

// Storage Engine

var rcfg engine.ReConfiguration
Expand Down
47 changes: 47 additions & 0 deletions cmd/neofs-node/config/policer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ const (

// HeadTimeoutDefault is a default object.Head request timeout in policer.
HeadTimeoutDefault = 5 * time.Second

// ReplicationCooldownDefault is a default cooldown time b/w replication tasks
// submitting.
ReplicationCooldownDefault = time.Duration(0)
// ObjectBatchSizeDefault is a default replication's objects batch size.
ObjectBatchSizeDefault = 10
// MaxWorkersDefault is a default replication's worker pool's maximum size.
MaxWorkersDefault = 20
)

// HeadTimeout returns the value of "head_timeout" config parameter
Expand All @@ -25,3 +33,42 @@ func HeadTimeout(c *config.Config) time.Duration {

return HeadTimeoutDefault
}

// ReplicationCooldown returns the value of "replication_cooldown" config parameter
// from "policer" section.
//
// Returns ReplicationCooldownDefault if a value is not a positive duration.
func ReplicationCooldown(c *config.Config) time.Duration {
v := config.DurationSafe(c.Sub(subsection), "replication_cooldown")
if v > 0 {
return v
}

return ReplicationCooldownDefault
}

// ObjectBatchSize returns the value of "object_batch_size" config parameter
// from "policer" section.
//
// Returns ObjectBatchSizeDefault if a value is not a positive number.
func ObjectBatchSize(c *config.Config) uint32 {
v := config.Uint32Safe(c.Sub(subsection), "object_batch_size")
if v > 0 {
return v
}

return ObjectBatchSizeDefault
}

// MaxWorkers returns the value of "max_workers" config parameter
// from "policer" section.
//
// Returns MaxWorkersDefault if a value is not a positive number.
func MaxWorkers(c *config.Config) uint32 {
v := config.Uint32Safe(c.Sub(subsection), "max_workers")
if v > 0 {
return v
}

return MaxWorkersDefault
}
6 changes: 6 additions & 0 deletions cmd/neofs-node/config/policer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ func TestPolicerSection(t *testing.T) {
empty := configtest.EmptyConfig()

require.Equal(t, policerconfig.HeadTimeoutDefault, policerconfig.HeadTimeout(empty))
require.Equal(t, policerconfig.ReplicationCooldownDefault, policerconfig.ReplicationCooldown(empty))
require.Equal(t, uint32(policerconfig.ObjectBatchSizeDefault), policerconfig.ObjectBatchSize(empty))
require.Equal(t, uint32(policerconfig.MaxWorkersDefault), policerconfig.MaxWorkers(empty))
})

const path = "../../../../config/example/node"

var fileConfigTest = func(c *config.Config) {
require.Equal(t, 15*time.Second, policerconfig.HeadTimeout(c))
require.Equal(t, 101*time.Millisecond, policerconfig.ReplicationCooldown(c))
require.Equal(t, uint32(11), policerconfig.ObjectBatchSize(c))
require.Equal(t, uint32(21), policerconfig.MaxWorkers(c))
}

configtest.ForEachFileType(path, fileConfigTest)
Expand Down
13 changes: 6 additions & 7 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
Expand Down Expand Up @@ -190,7 +189,7 @@ func initObjectService(c *cfg) {
),
)

pol := policer.New(
c.policer = policer.New(
policer.WithLogger(c.log),
policer.WithLocalStorage(ls),
policer.WithContainerSource(c.cfgObject.cnrSource),
Expand All @@ -201,9 +200,7 @@ func initObjectService(c *cfg) {
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
),
policer.WithNetmapKeys(c),
policer.WithHeadTimeout(
policerconfig.HeadTimeout(c.appCfg),
),
policer.WithHeadTimeout(c.applicationConfiguration.PolicerCfg.headTimeout),
policer.WithReplicator(c.replicator),
policer.WithRedundantCopyCallback(func(addr oid.Address) {
var inhumePrm engine.InhumePrm
Expand All @@ -216,15 +213,17 @@ func initObjectService(c *cfg) {
)
}
}),
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
policer.WithMaxCapacity(c.applicationConfiguration.PolicerCfg.maxCapacity),
policer.WithPool(c.cfgObject.pool.replication),
policer.WithNodeLoader(c),
policer.WithNetwork(c),
policer.WithReplicationCooldown(c.applicationConfiguration.PolicerCfg.replicationCooldown),
policer.WithObjectBatchSize(c.applicationConfiguration.PolicerCfg.objectBatchSize),
)

traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)

c.workers = append(c.workers, pol)
c.workers = append(c.workers, c.policer)

var os putsvc.ObjectStorage = engineWithoutNotifications{
engine: ls,
Expand Down
5 changes: 5 additions & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ NEOFS_APICLIENT_ALLOW_EXTERNAL=true

# Policer section
NEOFS_POLICER_HEAD_TIMEOUT=15s
NEOFS_POLICER_CACHE_SIZE=1000001
NEOFS_POLICER_CACHE_TIME=31s
NEOFS_POLICER_REPLICATION_COOLDOWN=101ms
NEOFS_POLICER_OBJECT_BATCH_SIZE=11
NEOFS_POLICER_MAX_WORKERS=21

# Replicator section
NEOFS_REPLICATOR_PUT_TIMEOUT=15s
Expand Down
7 changes: 6 additions & 1 deletion config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@
"allow_external": true
},
"policer": {
"head_timeout": "15s"
"head_timeout": "15s",
"cache_size": "1000001",
"cache_time": "31s",
"replication_cooldown": "101ms",
"object_batch_size": "11",
"max_workers": "21"
},
"replicator": {
"pool_size": 10,
Expand Down
5 changes: 5 additions & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ apiclient:

policer:
head_timeout: 15s # timeout for the Policer HEAD remote operation
cache_size: 1000001 # recently-handled objects cache size
cache_time: 31s # recently-handled objects cache expiration time
replication_cooldown: 101ms # cooldown time b/w replication tasks submitting
object_batch_size: 11 # replication's objects batch size
max_workers: 21 # replication's worker pool's maximum size

replicator:
put_timeout: 15s # timeout for the Replicator PUT remote operation (defaults to 1m)
Expand Down
13 changes: 13 additions & 0 deletions docs/sighup.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@

Logger level can be reloaded with a SIGHUP.

## Policer

Available for reconfiguration fields:

```yml
head_timeout:
cache_size:
cache_time:
replication_cooldown:
object_batch_size:
max_workers:
```
## Storage engine
Shards can be added, removed or reloaded with SIGHUP.
Expand Down
12 changes: 9 additions & 3 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,17 @@ Configuration for the Policer service. It ensures that object is stored accordin
```yaml
policer:
head_timeout: 15s
replication_cooldown: 100ms
object_batch_size: 10
max_workers: 20
```

| Parameter | Type | Default value | Description |
|----------------|------------|---------------|----------------------------------------------|
| `head_timeout` | `duration` | `5s` | Timeout for performing the `HEAD` operation. |
| Parameter | Type | Default value | Description |
|------------------------|------------|---------------|-----------------------------------------------------|
| `head_timeout` | `duration` | `5s` | Timeout for performing the `HEAD` operation. |
| `replication_cooldown` | `duration` | `0s` | Cooldown time between replication tasks submitting. |
| `object_batch_size` | `int` | `10` | Replication's objects batch size. |
| `max_workers` | `int` | `20` | Replication's worker pool's maximum size. |

# `replicator` section

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object_manager/placement/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type netMapSrc struct {
}

// defaultContainerCacheSize is the default size for the container cache.
const defaultContainerCacheSize = 10
const defaultContainerCacheSize = 1000

func NewNetworkMapBuilder(nm *netmapSDK.NetMap) Builder {
cache, _ := simplelru.NewLRU[string, [][]netmapSDK.NodeInfo](defaultContainerCacheSize, nil) // no error
Expand Down
6 changes: 5 additions & 1 deletion pkg/services/policer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ type processPlacementContext struct {
func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address)

p.cfg.RLock()
headTimeout := p.headTimeout
p.cfg.RUnlock()

// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int

Expand Down Expand Up @@ -262,7 +266,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node
continue
}

callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
callCtx, cancel := context.WithTimeout(ctx, headTimeout)

_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))

Expand Down
Loading

0 comments on commit 1e568b6

Please sign in to comment.