Skip to content

Commit

Permalink
cli: Collect storagegroup members with new V2 split
Browse files Browse the repository at this point in the history
It was easier to implement it from scratch than to try to understand callbacks
and recursion.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Mar 12, 2024
1 parent e976bd6 commit 40ffda7
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 28 deletions.
44 changes: 40 additions & 4 deletions cmd/neofs-cli/modules/storagegroup/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-cli/internal/key"
objectCli "github.com/nspcc-dev/neofs-node/cmd/neofs-cli/modules/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand Down Expand Up @@ -83,6 +84,7 @@ func putSG(cmd *cobra.Command, _ []string) {
headPrm internalclient.HeadObjectPrm
putPrm internalclient.PutObjectPrm
getCnrPrm internalclient.GetContainerPrm
getPrm internalclient.GetObjectPrm

Check warning on line 87 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L87

Added line #L87 was not covered by tests
)

cli := internalclient.GetSDKClientByFlag(ctx, cmd, commonflags.RPC)
Expand All @@ -99,12 +101,19 @@ func putSG(cmd *cobra.Command, _ []string) {
headPrm.SetClient(cli)
headPrm.SetPrivateKey(*pk)

headPrm.SetRawFlag(true)

Check warning on line 104 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L104

Added line #L104 was not covered by tests
getPrm.SetClient(cli)
getPrm.SetPrivateKey(*pk)
objectCli.Prepare(cmd, &getPrm)

Check warning on line 108 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L106-L108

Added lines #L106 - L108 were not covered by tests
sg, err := storagegroup.CollectMembers(sgHeadReceiver{
ctx: ctx,
cmd: cmd,
key: pk,
ownerID: &ownerID,
prm: headPrm,
prmHead: headPrm,
cli: cli,

Check warning on line 115 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L114-L115

Added lines #L114 - L115 were not covered by tests
getPrm: getPrm,
}, cnr, members, !resGetCnr.Container().IsHomomorphicHashingDisabled())
common.ExitOnErr(cmd, "could not collect storage group members: %w", err)

Expand Down Expand Up @@ -141,13 +150,40 @@ type sgHeadReceiver struct {
cmd *cobra.Command
key *ecdsa.PrivateKey
ownerID *user.ID
prm internalclient.HeadObjectPrm
prmHead internalclient.HeadObjectPrm
cli *client.Client
getPrm internalclient.GetObjectPrm
}

type payloadWriter struct {
payload []byte
}

Check warning on line 161 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L159-L161

Added lines #L159 - L161 were not covered by tests
func (pw *payloadWriter) Write(p []byte) (n int, err error) {
pw.payload = append(pw.payload, p...)
return len(p), nil
}

func (c sgHeadReceiver) Get(addr oid.Address) (object.Object, error) {

Check warning on line 167 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L164-L167

Added lines #L164 - L167 were not covered by tests
pw := &payloadWriter{}
c.getPrm.SetPayloadWriter(pw)
c.getPrm.SetAddress(addr)

Check warning on line 171 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L169-L171

Added lines #L169 - L171 were not covered by tests
res, err := internalclient.GetObject(c.ctx, c.getPrm)
if err != nil {
return object.Object{}, fmt.Errorf("rpc error: %w", err)
}

Check warning on line 175 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L174-L175

Added lines #L174 - L175 were not covered by tests

obj := res.Header()

Check warning on line 177 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L177

Added line #L177 was not covered by tests
obj.SetPayload(pw.payload)

return *obj, nil

Check warning on line 180 in cmd/neofs-cli/modules/storagegroup/put.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-cli/modules/storagegroup/put.go#L180

Added line #L180 was not covered by tests
}

func (c sgHeadReceiver) Head(addr oid.Address) (any, error) {
c.prm.SetAddress(addr)
c.prmHead.SetAddress(addr)

res, err := internalclient.HeadObject(c.ctx, c.prm)
res, err := internalclient.HeadObject(c.ctx, c.prmHead)

var errSplitInfo *object.SplitInfoError

Expand Down
181 changes: 159 additions & 22 deletions pkg/services/object/util/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ import (
"errors"
"fmt"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// HeadReceiver is an interface of entity that can receive
// SplitInformer is an interface of entity that can receive
// object header or the information about the object relations.
type HeadReceiver interface {
type SplitInformer interface {
// Head must return one of:
// * object header (*object.Object);
// * structured information about split-chain (*object.SplitInfo).
Head(id oid.Address) (any, error)

// Get must return object by its address.
Get(address oid.Address) (object.Object, error)
}

// SplitMemberHandler is a handler of next split-chain element.
Expand All @@ -24,7 +28,7 @@ type HeadReceiver interface {
type SplitMemberHandler func(member *object.Object, reverseDirection bool) (stop bool)

// IterateAllSplitLeaves is an iterator over all object split-tree leaves in direct order.
func IterateAllSplitLeaves(r HeadReceiver, addr oid.Address, h func(*object.Object)) error {
func IterateAllSplitLeaves(r SplitInformer, addr oid.Address, h func(*object.Object)) error {

Check warning on line 31 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L31

Added line #L31 was not covered by tests
return IterateSplitLeaves(r, addr, func(leaf *object.Object) bool {
h(leaf)
return false
Expand All @@ -34,45 +38,178 @@ func IterateAllSplitLeaves(r HeadReceiver, addr oid.Address, h func(*object.Obje
// IterateSplitLeaves is an iterator over object split-tree leaves in direct order.
//
// If member handler returns true, then the iterator aborts without error.
func IterateSplitLeaves(r HeadReceiver, addr oid.Address, h func(*object.Object) bool) error {
var (
reverse bool
leaves []*object.Object
)

if err := TraverseSplitChain(r, addr, func(member *object.Object, reverseDirection bool) (stop bool) {
reverse = reverseDirection

if reverse {
leaves = append(leaves, member)
return false
func IterateSplitLeaves(r SplitInformer, addr oid.Address, h func(*object.Object) bool) error {
info, err := r.Head(addr)
if err != nil {
return fmt.Errorf("receiving information about the object: %w", err)

Check warning on line 44 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L41-L44

Added lines #L41 - L44 were not covered by tests
}

switch res := info.(type) {
default:
panic(fmt.Sprintf("unexpected result of %T: %T", r, info))
case *object.Object:
h(res)
case *object.SplitInfo:
if res.SplitID() == nil {
return iterateV2Split(r, res, addr.Container(), h)
} else {
return iterateV1Split(r, res, addr.Container(), h)

Check warning on line 56 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L47-L56

Added lines #L47 - L56 were not covered by tests
}
}

return h(member)
}); err != nil {
return err
return nil

Check warning on line 60 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L60

Added line #L60 was not covered by tests
}

func iterateV1Split(r SplitInformer, info *object.SplitInfo, cID cid.ID, handler func(*object.Object) bool) error {
var addr oid.Address
addr.SetContainer(cID)

Check warning on line 65 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L63-L65

Added lines #L63 - L65 were not covered by tests

linkID, ok := info.Link()
if ok {
addr.SetObject(linkID)

Check warning on line 69 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L67-L69

Added lines #L67 - L69 were not covered by tests

linkObj, err := headFromInformer(r, addr)
if err != nil {
return err

Check warning on line 73 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}

for _, child := range linkObj.Children() {
addr.SetObject(child)

Check warning on line 77 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L76-L77

Added lines #L76 - L77 were not covered by tests

childHeader, err := headFromInformer(r, addr)
if err != nil {
return err

Check warning on line 81 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L79-L81

Added lines #L79 - L81 were not covered by tests
}

if stop := handler(childHeader); stop {
return nil

Check warning on line 85 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}
}

handler(linkObj)

Check warning on line 89 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L89

Added line #L89 was not covered by tests

return nil

Check warning on line 91 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L91

Added line #L91 was not covered by tests
}

lastID, ok := info.LastPart()
if ok {
addr.SetObject(lastID)
return iterateFromLastObject(r, addr, handler)

Check warning on line 97 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L94-L97

Added lines #L94 - L97 were not covered by tests
}

for i := len(leaves) - 1; i >= 0; i-- {
if h(leaves[i]) {
return errors.New("nor link, nor last object ID is found")

Check warning on line 100 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L100

Added line #L100 was not covered by tests
}

func iterateV2Split(r SplitInformer, info *object.SplitInfo, cID cid.ID, handler func(*object.Object) bool) error {
var addr oid.Address
addr.SetContainer(cID)

Check warning on line 105 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L103-L105

Added lines #L103 - L105 were not covered by tests

linkID, ok := info.Link()
if ok {
addr.SetObject(linkID)

Check warning on line 109 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L107-L109

Added lines #L107 - L109 were not covered by tests

linkObjRaw, err := r.Get(addr)
if err != nil {
return fmt.Errorf("receiving link object %s: %w", addr, err)

Check warning on line 113 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L111-L113

Added lines #L111 - L113 were not covered by tests
}

if stop := handler(&linkObjRaw); stop {
return nil

Check warning on line 117 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L116-L117

Added lines #L116 - L117 were not covered by tests
}

var linkObj object.Link
err = linkObjRaw.ReadLink(&linkObj)
if err != nil {
return fmt.Errorf("decoding link object: %w", err)

Check warning on line 123 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L120-L123

Added lines #L120 - L123 were not covered by tests
}

for _, child := range linkObj.Objects() {
addr.SetObject(child.ObjectID())

Check warning on line 127 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L126-L127

Added lines #L126 - L127 were not covered by tests

childObj, err := headFromInformer(r, addr)
if err != nil {
return fmt.Errorf("fetching child object: %w", err)

Check warning on line 131 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L129-L131

Added lines #L129 - L131 were not covered by tests
}

if stop := handler(childObj); stop {
return nil

Check warning on line 135 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L134-L135

Added lines #L134 - L135 were not covered by tests
}
}

return nil

Check warning on line 139 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L139

Added line #L139 was not covered by tests
}

lastID, ok := info.LastPart()
if ok {
addr.SetObject(lastID)
return iterateFromLastObject(r, addr, handler)

Check warning on line 145 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L142-L145

Added lines #L142 - L145 were not covered by tests
}

return errors.New("nor link, nor last object ID is found")

Check warning on line 148 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L148

Added line #L148 was not covered by tests
}

func iterateFromLastObject(r SplitInformer, lastAddr oid.Address, handler func(*object.Object) bool) error {
var idBuff []oid.ID
addr := lastAddr

Check warning on line 153 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L151-L153

Added lines #L151 - L153 were not covered by tests

for {
obj, err := headFromInformer(r, addr)
if err != nil {
return err

Check warning on line 158 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L155-L158

Added lines #L155 - L158 were not covered by tests
}

oID, _ := obj.ID()
idBuff = append(idBuff, oID)

Check warning on line 162 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L161-L162

Added lines #L161 - L162 were not covered by tests

prevOID, set := obj.PreviousID()
if !set {

Check warning on line 165 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L164-L165

Added lines #L164 - L165 were not covered by tests
break
}

addr.SetObject(prevOID)

Check warning on line 169 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L169

Added line #L169 was not covered by tests
}

for i := len(idBuff) - 1; i >= 0; i-- {
addr.SetObject(idBuff[i])

Check warning on line 173 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L172-L173

Added lines #L172 - L173 were not covered by tests

childObj, err := headFromInformer(r, addr)
if err != nil {
return err

Check warning on line 177 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L175-L177

Added lines #L175 - L177 were not covered by tests
}

if stop := handler(childObj); stop {
return nil

Check warning on line 181 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L180-L181

Added lines #L180 - L181 were not covered by tests
}
}

return nil
}

func headFromInformer(r SplitInformer, addr oid.Address) (*object.Object, error) {
res, err := r.Head(addr)
if err != nil {
return nil, fmt.Errorf("fetching information about %s", addr)

Check warning on line 191 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L188-L191

Added lines #L188 - L191 were not covered by tests
}

switch v := res.(type) {
case *object.Object:
return v, nil
default:
return nil, fmt.Errorf("unexpected information: %T", res)

Check warning on line 198 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L194-L198

Added lines #L194 - L198 were not covered by tests
}
}

// TraverseSplitChain is an iterator over object split-tree leaves.
//
// Traversal occurs in one of two directions, which depends on what pslit info was received:
// * in direct order for link part;
// * in reverse order for last part.
func TraverseSplitChain(r HeadReceiver, addr oid.Address, h SplitMemberHandler) error {
func TraverseSplitChain(r SplitInformer, addr oid.Address, h SplitMemberHandler) error {

Check warning on line 207 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L207

Added line #L207 was not covered by tests
_, err := traverseSplitChain(r, addr, h)
return err
}

func traverseSplitChain(r HeadReceiver, addr oid.Address, h SplitMemberHandler) (bool, error) {
func traverseSplitChain(r SplitInformer, addr oid.Address, h SplitMemberHandler) (bool, error) {

Check warning on line 212 in pkg/services/object/util/chain.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/util/chain.go#L212

Added line #L212 was not covered by tests
v, err := r.Head(addr)
if err != nil {
return false, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/object_manager/storagegroup/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ var (
)

// CollectMembers creates new storage group structure and fills it
// with information about members collected via HeadReceiver.
// with information about members collected via SplitInformer.
//
// Resulting storage group consists of physically stored objects only.
func CollectMembers(r objutil.HeadReceiver, cnr cid.ID, members []oid.ID, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
func CollectMembers(r objutil.SplitInformer, cnr cid.ID, members []oid.ID, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
var (
sumPhySize uint64
phyMembers []oid.ID
Expand Down
4 changes: 4 additions & 0 deletions pkg/services/object_manager/storagegroup/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type mockedObjects struct {
hdr *object.Object
}

func (x *mockedObjects) Get(address oid.Address) (object.Object, error) {
return *x.hdr, nil
}

func (x *mockedObjects) Head(_ oid.Address) (any, error) {
return x.hdr, nil
}
Expand Down

0 comments on commit 40ffda7

Please sign in to comment.