From 459406eed697e86c3de508233ae8fdb96055d5b0 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Sat, 23 Nov 2024 15:57:20 +0300 Subject: [PATCH 1/3] object/put: fix concurrent PUT data corruption If ants pool is busy and cannot take task, early `return` without `wg.Wait()` leads to `iterateNodesForObject`'s `return` and all the buffers for binary replication from now may be reused while are still in use by the other routines inside the pool. Wait for WG before any `return` is called. Closes #2978, closes #2988, closes #2975, closes #2971. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/services/object/put/distibuted_test.go | 40 ++++++++++++++++++++++ pkg/services/object/put/distributed.go | 1 + 3 files changed, 42 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d40f0669ef..83de5429bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - Panic in event listener related to inability to switch RPC node (#2970) - Non-container nodes never check placement policy on PUT, SEARCH requests (#3014) - If shards are overloaded with PUT requests, operation is not skipped but waits for 30 seconds (#2871) +- Data corruption if PUT is done too concurrently (#2978) ### Changed - `ObjectService`'s `Put` RPC handler caches up to 10K lists of per-object sorted container nodes (#2901) diff --git a/pkg/services/object/put/distibuted_test.go b/pkg/services/object/put/distibuted_test.go index af4b462e62..ef76b669f3 100644 --- a/pkg/services/object/put/distibuted_test.go +++ b/pkg/services/object/put/distibuted_test.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" "testing" + "time" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -492,4 +493,43 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), }) }) + t.Run("return only after worker pool finished", func(t *testing.T) { + objID := oidtest.ID() + cnrNodes := allocNodes([]uint{2, 3, 1}) + poolErr := errors.New("pool err") + iter := placementIterator{ + log: zap.NewNop(), + neoFSNet: new(testNetwork), + remotePool: &testWorkerPool{ + err: poolErr, + nFail: 2, + }, + containerNodes: testContainerNodes{ + objID: objID, + cnrNodes: cnrNodes, + primCounts: []uint{2, 3, 1}, + }, + } + blockCh := make(chan struct{}) + returnCh := make(chan struct{}) + go func() { + err := iter.iterateNodesForObject(objID, func(node nodeDesc) error { + <-blockCh + return nil + }) + require.ErrorContains(t, err, poolErr.Error()) + close(returnCh) + }() + select { + case <-returnCh: + t.Fatal("`iterateNodesForObject` is not synced with worker pools") + case <-time.After(time.Second / 2): + } + close(blockCh) + select { + case <-returnCh: + case <-time.After(10 * time.Second): + t.Fatal("unexpected test lock") + } + }) } diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 476bad12e7..5a6d21552c 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -331,6 +331,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er if e, _ := lastRespErr.Load().(error); e != nil { err = fmt.Errorf("%w (last node error: %w)", err, e) } + wg.Wait() return errIncompletePut{singleErr: err} } } From 215189a603adbfadf5b5e4385592307cdff0891d Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 25 Nov 2024 19:17:58 +0300 Subject: [PATCH 2/3] object/put: try nodes for object distribution more If an object cannot be PUT due to local overload (i-th routine for (i-1)-length worker pool), log the error and continue over other nodes, and even other placement vectors. `errNotEnoughNodes` will be also returned as the natural replication number handling in the outer `for`. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/services/object/put/distibuted_test.go | 8 +++----- pkg/services/object/put/distributed.go | 7 +------ 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83de5429bf..bf3dc70a05 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ attribute, which is used for container domain name in NNS contracts (#2954) - Log sampling is disabled by default now (#3011) - EACL is no longer considered for system role (#2972) - Deprecate peapod substorage (#3013) +- Node does not stop trying to PUT an object if there are more PUT tasks than configured (#3027) ### Removed - Support for node.key configuration (#2959) diff --git a/pkg/services/object/put/distibuted_test.go b/pkg/services/object/put/distibuted_test.go index ef76b669f3..e03c37f8c3 100644 --- a/pkg/services/object/put/distibuted_test.go +++ b/pkg/services/object/put/distibuted_test.go @@ -365,8 +365,7 @@ func TestIterateNodesForObject(t *testing.T) { cnrNodes[1][0].PublicKey(), cnrNodes[1][1].PublicKey(), }) require.EqualError(t, err, "incomplete object PUT by placement: "+ - "submit next job to save an object to the worker pool: any worker pool error "+ - "(last node error: any node error)") + "number of replicas cannot be met for list #1: 1 required, 0 nodes remaining (last node error: any node error)") }) t.Run("not enough nodes a priori", func(t *testing.T) { // nodes: [A B] [C D E] [F] @@ -496,12 +495,11 @@ func TestIterateNodesForObject(t *testing.T) { t.Run("return only after worker pool finished", func(t *testing.T) { objID := oidtest.ID() cnrNodes := allocNodes([]uint{2, 3, 1}) - poolErr := errors.New("pool err") iter := placementIterator{ log: zap.NewNop(), neoFSNet: new(testNetwork), remotePool: &testWorkerPool{ - err: poolErr, + err: errors.New("pool err"), nFail: 2, }, containerNodes: testContainerNodes{ @@ -517,7 +515,7 @@ func TestIterateNodesForObject(t *testing.T) { <-blockCh return nil }) - require.ErrorContains(t, err, poolErr.Error()) + require.EqualError(t, err, "incomplete object PUT by placement: number of replicas cannot be met for list #0: 1 required, 0 nodes remaining") close(returnCh) }() select { diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 5a6d21552c..9b1b54977e 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -326,13 +326,8 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er } }); err != nil { wg.Done() - svcutil.LogWorkerPoolError(x.log, "PUT", err) err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err) - if e, _ := lastRespErr.Load().(error); e != nil { - err = fmt.Errorf("%w (last node error: %w)", err, e) - } - wg.Wait() - return errIncompletePut{singleErr: err} + svcutil.LogWorkerPoolError(x.log, "PUT", err) } } wg.Wait() From 872baaf1070e886d83881de55c43568fb2e0907d Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 25 Nov 2024 19:19:58 +0300 Subject: [PATCH 3/3] object/put: make `iterateNodesForObject` log object ID Signed-off-by: Pavel Karpy --- pkg/services/object/put/distributed.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 9b1b54977e..a124ae80d8 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -214,6 +214,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er var err error var nodeLists [][]netmap.NodeInfo var replCounts []uint + var l = x.log.With(zap.Stringer("oid", obj)) if x.localOnly { // TODO: although this particular case fits correctly into the general approach, // much less actions can be done @@ -288,7 +289,7 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er } // critical error that may ultimately block the storage service. Normally it // should not appear because entry into the network map under strict control - x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", zap.String("public key", netmap.StringifyPublicKey(nodeLists[listInd][j])), zap.Error(nr.convertErr)) if listLen-nodesCounters[listInd].processed-1 < replRem { // -1 includes current node failure err = fmt.Errorf("%w (last node error: failed to decode network addresses: %w)", @@ -321,13 +322,13 @@ func (x placementIterator) iterateNodesForObject(obj oid.ID, f func(nodeDesc) er processedNodesMtx.Unlock() if err != nil { lastRespErr.Store(err) - svcutil.LogServiceError(x.log, "PUT", nr.desc.info.AddressGroup(), err) + svcutil.LogServiceError(l, "PUT", nr.desc.info.AddressGroup(), err) return } }); err != nil { wg.Done() err = fmt.Errorf("submit next job to save an object to the worker pool: %w", err) - svcutil.LogWorkerPoolError(x.log, "PUT", err) + svcutil.LogWorkerPoolError(l, "PUT", err) } } wg.Wait() @@ -359,7 +360,7 @@ broadcast: if nr.convertErr != nil { // critical error that may ultimately block the storage service. Normally it // should not appear because entry into the network map under strict control - x.log.Error("failed to decode network endpoints of the storage node from the network map, skip the node", + l.Error("failed to decode network endpoints of the storage node from the network map, skip the node", zap.String("public key", netmap.StringifyPublicKey(nodeLists[i][j])), zap.Error(nr.convertErr)) continue // to send as many replicas as possible } @@ -378,12 +379,12 @@ broadcast: nodeResults[pks] = nr processedNodesMtx.Unlock() if err != nil { - svcutil.LogServiceError(x.log, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err) + svcutil.LogServiceError(l, "PUT (extra broadcast)", nr.desc.info.AddressGroup(), err) return } }); err != nil { wg.Done() - svcutil.LogWorkerPoolError(x.log, "PUT (extra broadcast)", err) + svcutil.LogWorkerPoolError(l, "PUT (extra broadcast)", err) break broadcast } }