Skip to content

Commit

Permalink
WIP: Encode object into transport message once during PUT
Browse files Browse the repository at this point in the history
Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Dec 15, 2023
1 parent 3158764 commit b72e519
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/nspcc-dev/neo-go v0.104.0
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20231211142108-0f4d49e4804c
github.com/nspcc-dev/neofs-contract v0.19.1
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20231214115557-e600b997cd80
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20231215123200-67afdd00d58c
github.com/nspcc-dev/tzhash v1.7.1
github.com/olekukonko/tablewriter v0.0.5
github.com/panjf2000/ants/v2 v2.8.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ github.com/nspcc-dev/neofs-contract v0.19.1 h1:U1Uh+MlzfkalO0kRJ2pADZyHrmAOroC6K
github.com/nspcc-dev/neofs-contract v0.19.1/go.mod h1:ZOGouuwuHpgvYkx/LCGufGncIzEUhYEO18LL4cWEbyw=
github.com/nspcc-dev/neofs-crypto v0.4.0 h1:5LlrUAM5O0k1+sH/sktBtrgfWtq1pgpDs09fZo+KYi4=
github.com/nspcc-dev/neofs-crypto v0.4.0/go.mod h1:6XJ8kbXgOfevbI2WMruOtI+qUJXNwSGM/E9eClXxPHs=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20231214115557-e600b997cd80 h1:pTDoIhN2QEAvNFDEFhXb5hqPi1ZWyIbzk+tJgOus2r8=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20231214115557-e600b997cd80/go.mod h1:d566MJDWH80Xu+1GfyWjMMoRmZ23oVD5+hTrPK1o5Ww=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20231215123200-67afdd00d58c h1:msWkRvmCKZN8h3XgRnTO6+IMkRQwmJyaqwMLdur7Z/0=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20231215123200-67afdd00d58c/go.mod h1:d566MJDWH80Xu+1GfyWjMMoRmZ23oVD5+hTrPK1o5Ww=
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
github.com/nspcc-dev/tzhash v1.7.1 h1:6zmexLqdTF/ssbUAh7XJS7RxgKWaw28kdNpE/4UFdEU=
Expand Down
1 change: 1 addition & 0 deletions pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Client interface {
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ReplicateObject(ctx context.Context, src io.ReadSeeker, signer neofscrypto.Signer, opts client.ReplicateObjectOptions) error
PutFullObjectToNode(context.Context, object.Object, neofscrypto.Signer, client.PutFullObjectToNodeOptions) error
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
Expand Down
6 changes: 6 additions & 0 deletions pkg/network/cache/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func (x *multiClient) ReplicateObject(ctx context.Context, src io.ReadSeeker, si
return errSeek
}

func (x *multiClient) PutFullObjectToNode(ctx context.Context, obj objectSDK.Object, signer neofscrypto.Signer, opts client.PutFullObjectToNodeOptions) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.PutFullObjectToNode(ctx, obj, signer, opts)
})
}

func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
return x.iterateClients(ctx, func(c clientcore.Client) error {
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)
Expand Down
29 changes: 9 additions & 20 deletions pkg/services/object/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,38 +383,27 @@ func (x PutObjectRes) ID() oid.ID {
//
// Returns any error which prevented the operation from completing correctly in error return.
func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
var prmCli client.PrmObjectPutInit

prmCli.MarkLocal()
var opts client.PutFullObjectToNodeOptions

if prm.tokenSession != nil {
prmCli.WithinSession(*prm.tokenSession)
opts.WithinSession(*prm.tokenSession)
}

if prm.tokenBearer != nil {
prmCli.WithBearerToken(*prm.tokenBearer)
opts.WithBearerToken(*prm.tokenBearer)
}

prmCli.WithXHeaders(prm.xHeaders...)

w, err := prm.cli.ObjectPutInit(prm.ctx, *prm.obj, prm.signer, prmCli)
fmt.Println("AAAAAAAAAAAAAAAAAAAAAA")
err := prm.cli.PutFullObjectToNode(prm.ctx, *prm.obj, prm.signer, opts)
fmt.Println("BBBBBBBBBBBBBBBBBBBBBB")
if err != nil {
return nil, fmt.Errorf("init object writing on client: %w", err)
return nil, fmt.Errorf("put full object node: %w", err)
}

_, err = w.Write(prm.obj.Payload())
if err != nil {
return nil, fmt.Errorf("write object payload into stream: %w", err)
}

err = w.Close()
if err != nil {
ReportError(prm.cli, err)
return nil, fmt.Errorf("finish object stream: %w", err)
}
id, _ := prm.obj.ID()

return &PutObjectRes{
id: w.GetResult().StoredObjectID(),
id: id,
}, nil
}

Expand Down
18 changes: 11 additions & 7 deletions pkg/services/object/put/distributed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package putsvc

import (
"context"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -10,16 +11,19 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/client"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)

type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error)
}

type distributedTarget struct {
ctx context.Context

traversal traversal

remotePool, localPool util.WorkerPool
Expand Down Expand Up @@ -145,25 +149,25 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
t.traversal.extraBroadcastEnabled = true
}

return t.iteratePlacement(t.sendObject)
return t.iteratePlacement(client.NewSharedPutFullObjectContext(t.ctx), t.sendObject)
}

func (t *distributedTarget) sendObject(node nodeDesc) error {
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
if !node.local && t.relay != nil {
return t.relay(node)
}

target := t.nodeTargetInitializer(node)

if err := target.WriteObject(t.obj, t.objMeta); err != nil {
if err := target.WriteObject(ctx, t.obj, t.objMeta); err != nil {
return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil {
return fmt.Errorf("could not close object stream: %w", err)
}
return nil
}

func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) {
func (t *distributedTarget) iteratePlacement(ctx context.Context, f func(context.Context, nodeDesc) error) (*transformer.AccessIdentifiers, error) {
id, _ := t.obj.ID()

traverser, err := placement.NewTraverser(
Expand Down Expand Up @@ -207,7 +211,7 @@ loop:
if err := workerPool.Submit(func() {
defer wg.Done()

err := f(nodeDesc{local: isLocal, info: addr})
err := f(ctx, nodeDesc{local: isLocal, info: addr})

// mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't
Expand Down Expand Up @@ -244,7 +248,7 @@ loop:

// perform additional container broadcast if needed
if t.traversal.submitPrimaryPlacementFinish() {
_, err = t.iteratePlacement(f)
_, err = t.iteratePlacement(ctx, f)
if err != nil {
t.log.Error("additional container broadcast failure",
zap.Error(err),
Expand Down
3 changes: 2 additions & 1 deletion pkg/services/object/put/local.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package putsvc

import (
"context"
"fmt"

objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
Expand Down Expand Up @@ -31,7 +32,7 @@ type localTarget struct {
meta objectCore.ContentMeta
}

func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMeta) error {
func (t *localTarget) WriteObject(_ context.Context, obj *object.Object, meta objectCore.ContentMeta) error {
t.obj = obj
t.meta = meta

Expand Down
6 changes: 3 additions & 3 deletions pkg/services/object/put/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type RemotePutPrm struct {
obj *object.Object
}

func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) error {
func (t *remoteTarget) WriteObject(ctx context.Context, obj *object.Object, _ objectcore.ContentMeta) error {
t.ctx = ctx
t.obj = obj

return nil
Expand Down Expand Up @@ -120,7 +121,6 @@ func (p *RemotePutPrm) WithObject(v *object.Object) *RemotePutPrm {
// PutObject sends object to remote node.
func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
t := &remoteTarget{
ctx: ctx,
keyStorage: s.keyStorage,
clientConstructor: s.clientConstructor,
}
Expand All @@ -130,7 +130,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return fmt.Errorf("parse client node info: %w", err)
}

if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
if err := t.WriteObject(ctx, p.obj, objectcore.ContentMeta{}); err != nil {
return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/put/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
withBroadcast := !prm.common.LocalOnly() && (typ == object.TypeTombstone || typ == object.TypeLock)

return &distributedTarget{
ctx: p.ctx,
traversal: traversal{
opts: prm.traverseOpts,

Expand All @@ -233,7 +234,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
}

rt := &remoteTarget{
ctx: p.ctx,
keyStorage: p.keyStorage,
commonPrm: prm.common,
clientConstructor: p.clientConstructor,
Expand Down

0 comments on commit b72e519

Please sign in to comment.