Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 14, 2025
1 parent b9c6492 commit 9624b00
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 2 deletions.
9 changes: 9 additions & 0 deletions pkg/core/object/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package object

import oid "github.com/nspcc-dev/neofs-sdk-go/object/id"

// TODO: docs
// TODO: reuse type from SDK if possible
type SearchResultItem struct {
ID oid.ID
}
235 changes: 235 additions & 0 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package meta

import (
"bytes"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"math/big"

objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"go.etcd.io/bbolt"
)

const (
metaPrefixA = byte(iota)
metaPrefixBI // integer attributes
metaPrefixBS // all other attributes
metaPrefixC
)

var (
maxUint256 = new(big.Int).SetBytes([]byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255})
maxUint256Neg = new(big.Int).Neg(maxUint256)
)

// TODO: fill on Init
// TODO: system attributes
func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner user.ID, typ object.Type, creationEpoch uint64,
payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute) error {
mb, err := tx.CreateBucketIfNotExists(metaBucketKey(cnr))
if err != nil {
return fmt.Errorf("create meta bucket for container: %w", err)
}
idk := [1 + oid.Size]byte{metaPrefixA}
copy(idk[1:], id[:])
if err := mb.Put(idk[:], nil); err != nil {
return fmt.Errorf("put object ID to container's meta bucket: %w", err)
}

var k []byte
// TODO: move to global funcs
makeKeyB := func(prefix byte, attr string, valLen int) (int, int) {
ln := 1 + oid.Size + len(attr) + valLen + len(utf8Delimiter)*2 // TODO: constantize some stuff
if len(k) < ln {
k = make([]byte, ln)
}
k[0] = prefix
off := 1 + copy(k[1:], attr)
off += copy(k[off:], utf8Delimiter)
valOff := off
off += valLen
off += copy(k[off:], utf8Delimiter)
copy(k[off:], id[:])
return ln, valOff
}
makeKeyC := func(attr string, valLen int) (int, int) {
ln := 1 + oid.Size + len(attr) + valLen + len(utf8Delimiter) // TODO: constantize some stuff
if len(k) < ln {
k = make([]byte, ln)
}
k[0] = metaPrefixC
off := 1 + copy(k[1:], id[:])
off += copy(k[off:], attr)
off += copy(k[off:], utf8Delimiter)
return ln, off
}
// TODO: make generic to pass []byte directly
putPlain := func(attr, val string) error {
kLn, vOff := makeKeyB(metaPrefixBS, attr, len(val))
copy(k[vOff:], val)
if err := mb.Put(k[:kLn], nil); err != nil {
return fmt.Errorf("put object attribute %q to container's meta bucket: %w", attr, err)
}
kLn, vOff = makeKeyC(attr, len(val))
copy(k[vOff:], val)
if err := mb.Put(k[:kLn], nil); err != nil {
return fmt.Errorf("put object attribute %q to container's meta bucket: %w", attr, err) // TODO: distinguishable context
}
return nil
}
putInt := func(attr string, n *big.Int) error {
// TODO: check uint256 overflow
kLn, vOff := makeKeyB(metaPrefixBI, attr, 1+32) // sign + 256-bit. Sign makes some sensible order
if n.Sign() >= 0 {
k[vOff] = 1
} else {
k[vOff] = 0
}
vOff++
n.FillBytes(k[vOff : vOff+32])
if err := mb.Put(k[:kLn], nil); err != nil {
return fmt.Errorf("put integer object attribute %q to container's meta bucket: %w", attr, err)
}
kLn, vOff = makeKeyC(attr, 1+32)
if n.Sign() >= 0 {
k[vOff] = 1
} else {
k[vOff] = 0
}
vOff++
n.FillBytes(k[vOff : vOff+32])
if err := mb.Put(k[:kLn], nil); err != nil {
return fmt.Errorf("put integer object attribute %q to container's meta bucket: %w", attr, err) // TODO: distinguishable context
}
return nil
}

if err := putPlain(object.FilterVersion, ver.String()); err != nil {
return err
}
if err := putPlain(object.FilterOwnerID, owner.String()); err != nil {
return err
}
if err := putPlain(object.FilterType, typ.String()); err != nil {
return err
}
if err := putInt(object.FilterCreationEpoch, new(big.Int).SetUint64(creationEpoch)); err != nil {
return err
}
if err := putInt(object.FilterPayloadSize, new(big.Int).SetUint64(payloadLen)); err != nil {
return err
}
if err := putPlain(object.FilterPayloadChecksum, hex.EncodeToString(pldHash)); err != nil {
return err
}
if err := putPlain(object.FilterPayloadHomomorphicHash, hex.EncodeToString(pldHmmHash)); err != nil {
return err
}
if err := putPlain(object.FilterSplitID, string(splitID)); err != nil {
return err
}
if err := putPlain(object.FilterFirstSplitObject, firstID.String()); err != nil {
return err
}
if err := putPlain(object.FilterParentID, parentID.String()); err != nil {
return err
}

for i := range attrs {
ak, av := attrs[i].Key(), attrs[i].Value()
if n, isNum := parseInt(av); isNum && n.Cmp(maxUint256Neg) >= 0 && n.Cmp(maxUint256) <= 0 {
if err := putInt(ak, n); err != nil {
return err
}
continue
}
if err := putPlain(ak, av); err != nil {
return err
}
}

return nil
}

// TODO: docs
func (db *DB) Search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor string, count uint32) ([]objectcore.SearchResultItem, string, error) {
if cnr.IsZero() {
return nil, "", cid.ErrZero
}
if count == 0 {
return nil, "", errors.New("zero count")
}

if len(fs) == 0 {
if len(attrs) > 0 {
return nil, "", errors.New("attributes are set without filters")
}
return db.searchUnfiltered(cnr, cursor, count)
}

return nil, "", errors.New("filters are not supported yet") // TODO
}

func (db *DB) searchUnfiltered(cnr cid.ID, cursor string, count uint32) ([]objectcore.SearchResultItem, string, error) {
var cursorKey []byte
var err error
if cursor != "" {
cursorKey = make([]byte, 1+base64.StdEncoding.DecodedLen(len(cursor)))
n, err := base64.StdEncoding.Decode(cursorKey[1:], []byte(cursor))
if err != nil {
return nil, "", fmt.Errorf("decode cursor from base64: %w", err)
}
cursorKey[0] = metaPrefixA
cursorKey = cursorKey[:1+n]
}

res := make([]objectcore.SearchResultItem, count)
var n uint32
err = db.boltDB.View(func(tx *bbolt.Tx) error {
mb := tx.Bucket(metaBucketKey(cnr))
if mb == nil {
return nil
}

mbc := mb.Cursor()
k, _ := mbc.Seek(cursorKey)
if cursor != "" && bytes.Equal(k, cursorKey) { // cursor is the last response element, so go next
k, _ = mbc.Next()
}
for ; k != nil; k, _ = mbc.Next() {
if k[0] != metaPrefixA { // TODO: can k be []byte{}?
continue
}
if n == count { // there are still elements
cursor = base64.StdEncoding.EncodeToString(res[n-1].ID[:])
return nil
}
k = k[1:]
if len(k) != oid.Size {
return fmt.Errorf("unexpected object key len %d, expected 33", len(k))
}
copy(res[n].ID[:], k)
n++
}
cursor = ""
return nil
})
if err != nil {
return nil, "", fmt.Errorf("view BoltDB: %w", err)
}
return res[:n], cursor, nil
}

func metaBucketKey(cnr cid.ID) []byte {
k := [1 + cid.Size]byte{metadataPrefix}
copy(k[1:], cnr[:])
return k[:]
}
Loading

0 comments on commit 9624b00

Please sign in to comment.