Skip to content

Commit

Permalink
WIP: Encode object into transport message once per replication round
Browse files Browse the repository at this point in the history
Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Dec 14, 2023
1 parent 8c80886 commit 3158764
Showing 1 changed file with 43 additions and 9 deletions.
52 changes: 43 additions & 9 deletions pkg/services/replicator/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package replicator

import (
"context"
"errors"
"io"

putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)
Expand All @@ -16,6 +18,34 @@ type TaskResult interface {
SubmitSuccessfulReplication(netmap.NodeInfo)
}

type readSeekerClosedOnEOF struct {
closed bool

rs io.ReadSeeker
c io.Closer
}

func (x *readSeekerClosedOnEOF) Read(p []byte) (int, error) {
n, err := x.rs.Read(p)
if errors.Is(err, io.EOF) {
x.closed = true
_ = x.c.Close()
}
return n, err
}

func (x *readSeekerClosedOnEOF) Seek(offset int64, whence int) (int64, error) {
return x.rs.Seek(offset, whence)
}

func (x *readSeekerClosedOnEOF) Close() error {
if !x.closed {
x.closed = true
return x.c.Close()
}
return nil
}

// HandleTask executes replication task inside invoking goroutine.
// Passes all the nodes that accepted the replication to the TaskResult.
func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) {
Expand All @@ -38,6 +68,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
return
}

if len(task.nodes) > 1 {
rs := client.ShareReplicatedObject(binObjStream)
// since in this case we read object once it's worth to close the stream insta
// after reading finish so that no longer used resources do not hang up
binObjStream = &readSeekerClosedOnEOF{
rs: rs,
c: binObjStream,
}
}

defer func() {
if err := binObjStream.Close(); err != nil {
p.log.Debug("failed to close replicated object's binary stream from the local storage",
Expand All @@ -55,15 +95,6 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
default:
}

if i > 0 && binObjStream != nil {
_, err = binObjStream.Seek(0, io.SeekStart)
if err != nil {
p.log.Error("failed to seek start of the replicated object's binary stream from the local storage",
zap.Stringer("object", task.addr), zap.Error(err))
return
}
}

log := p.log.With(
zap.String("node", netmap.StringifyPublicKey(task.nodes[i])),
zap.Stringer("object", task.addr),
Expand All @@ -73,6 +104,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)

if binObjStream != nil {
err = p.remoteSender.CopyObjectToNode(ctx, task.nodes[i], binObjStream)
// note that we don't need to reset binObjStream because it always read once:
// - if len(task.nodes) == 1, we won't come here again
// - otherwise, we use client.ShareReplicatedObject (see above)
// FIXME: what if node is old?
} else {
err = p.remoteSender.PutObject(callCtx, prm.WithObject(task.obj).WithNodeInfo(task.nodes[i]))
Expand Down

0 comments on commit 3158764

Please sign in to comment.