Skip to content

Commit

Permalink
[db] implement CommitToDB() for BoltDBVersioned
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinxie committed Dec 15, 2024
1 parent 6ac9264 commit 5f4539f
Show file tree
Hide file tree
Showing 2 changed files with 510 additions and 61 deletions.
238 changes: 238 additions & 0 deletions db/db_versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"context"
"fmt"
"math"
"syscall"

"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"

"github.com/iotexproject/iotex-core/v2/db/batch"
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/log"
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

Expand Down Expand Up @@ -196,6 +199,241 @@ func (b *BoltDBVersioned) Version(ns string, key []byte) (uint64, error) {
return last, err
}

// CommitToDB write a batch to DB, where the batch can contain keys for
// both versioned and non-versioned namespace
func (b *BoltDBVersioned) CommitToDB(version uint64, vns map[string]bool, kvsb batch.KVStoreBatch) error {
vnsize, ve, nve, err := dedup(vns, kvsb)
if err != nil {
return errors.Wrapf(err, "BoltDBVersioned failed to write batch")
}
return b.commitToDB(version, vnsize, ve, nve)
}

func (b *BoltDBVersioned) commitToDB(version uint64, vnsize map[string]int, ve, nve []*batch.WriteInfo) error {
var (
err error
nonDBErr bool
)
for c := uint8(0); c < b.db.config.NumRetries; c++ {
buckets := make(map[string]*bolt.Bucket)
if err = b.db.db.Update(func(tx *bolt.Tx) error {
// create/check metadata of all namespaces
for ns, size := range vnsize {
bucket, ok := buckets[ns]
if !ok {
bucket, err = tx.CreateBucketIfNotExists([]byte(ns))
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", ns)
}
buckets[ns] = bucket
}
var vn *versionedNamespace
if val := bucket.Get(_minKey); val == nil {
// namespace not created yet
vn = &versionedNamespace{
keyLen: uint32(size),
}
ve = append(ve, batch.NewWriteInfo(
batch.Put, ns, _minKey, vn.serialize(),
fmt.Sprintf("failed to create metadata for namespace %s", ns),
))
} else {
if vn, err = deserializeVersionedNamespace(val); err != nil {
nonDBErr = true
return errors.Wrapf(err, "failed to get metadata of bucket %s", ns)
}
if vn.keyLen != uint32(size) {
nonDBErr = true
return errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", vn.keyLen, size)
}
}
}
// keep order of the writes same as the original batch
for i := len(ve) - 1; i >= 0; i-- {
var (
write = ve[i]
ns = write.Namespace()
key = write.Key()
val = write.Value()
)
// get bucket
bucket, ok := buckets[ns]
if !ok {
bucket, err = tx.CreateBucketIfNotExists([]byte(ns))
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", ns)
}
buckets[ns] = bucket
}
// check key's last version
var (
last uint64
notexist, isDelete bool
actualKey = keyForWrite(key, version)
)
c := bucket.Cursor()
k, _ := c.Seek(actualKey)
if k == nil || bytes.Compare(k, actualKey) == 1 {
k, _ = c.Prev()
if k == nil || bytes.Compare(k, keyForDelete(key, 0)) <= 0 {
// cursor is at the beginning/end of the bucket or smaller than minimum key
notexist = true
}
}
if !notexist {
isDelete, last = parseKey(k)
}
switch write.WriteType() {
case batch.Put:
if bytes.Equal(key, _minKey) {
// create namespace
if err = bucket.Put(key, val); err != nil {
return errors.Wrap(err, write.Error())
}
} else {
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if isDelete && version <= last {
// not allowed to perform write on an earlier version
nonDBErr = true
return ErrInvalid
}
if err = bucket.Put(keyForWrite(key, version), val); err != nil {
return errors.Wrap(err, write.Error())
}
}
case batch.Delete:
if notexist {
continue
}
// wrong-size key should be caught in dedup(), but check anyway
if vnsize[ns] != len(key) {
panic(fmt.Sprintf("BoltDBVersioned.commitToDB(), expect vnsize[%s] = %d, got %d", ns, vnsize[ns], len(key)))
}
if version < last {
// not allowed to perform delete on an earlier version
nonDBErr = true
return ErrInvalid
}
if err = bucket.Put(keyForDelete(key, version), nil); err != nil {
return errors.Wrap(err, write.Error())
}
if err = bucket.Delete(keyForWrite(key, version)); err != nil {
return errors.Wrap(err, write.Error())
}
}
}
// write non-versioned keys
for i := len(nve) - 1; i >= 0; i-- {
var (
write = nve[i]
ns = write.Namespace()
)
switch write.WriteType() {
case batch.Put:
// get bucket
bucket, ok := buckets[ns]
if !ok {
bucket, err = tx.CreateBucketIfNotExists([]byte(ns))
if err != nil {
return errors.Wrapf(err, "failed to create bucket %s", ns)
}
buckets[ns] = bucket
}
if err = bucket.Put(write.Key(), write.Value()); err != nil {
return errors.Wrap(err, write.Error())
}
case batch.Delete:
bucket := tx.Bucket([]byte(ns))
if bucket == nil {
continue
}
if err = bucket.Delete(write.Key()); err != nil {
return errors.Wrap(err, write.Error())
}
}
}
return nil
}); err == nil || nonDBErr {
break
}
}
if nonDBErr {
return err
}
if err != nil {
if errors.Is(err, syscall.ENOSPC) {
log.L().Fatal("BoltDBVersioned failed to write batch", zap.Error(err))
}
return errors.Wrap(ErrIO, err.Error())
}
return nil
}

// dedup does 3 things:
// 1. deduplicate entries in the batch, only keep the last write for each key
// 2. splits entries into 2 slices according to the input namespace map
// 3. return a map of input namespace's keyLength
func dedup(vns map[string]bool, kvsb batch.KVStoreBatch) (map[string]int, []*batch.WriteInfo, []*batch.WriteInfo, error) {
kvsb.Lock()
defer kvsb.Unlock()

type doubleKey struct {
ns string
key string
}

var (
entryKeySet = make(map[doubleKey]bool)
nsKeyLen = make(map[string]int)
nsInMap = make([]*batch.WriteInfo, 0)
other = make([]*batch.WriteInfo, 0)
pickAll = len(vns) == 0
)
for i := kvsb.Size() - 1; i >= 0; i-- {
write, e := kvsb.Entry(i)
if e != nil {
return nil, nil, nil, e
}
// only handle Put and Delete
var (
writeType = write.WriteType()
ns = write.Namespace()
key = write.Key()
)
if writeType != batch.Put && writeType != batch.Delete {
continue
}
k := doubleKey{ns: ns, key: string(key)}
if entryKeySet[k] {
continue
}
if writeType == batch.Put {
// for a later DELETE, we want to capture the earlier PUT
// otherwise, the DELETE might return not-exist
entryKeySet[k] = true
}
if pickAll || vns[k.ns] {
nsInMap = append(nsInMap, write)
} else {
other = append(other, write)
}
// check key size
if pickAll || vns[k.ns] {
if n, ok := nsKeyLen[k.ns]; !ok {
nsKeyLen[k.ns] = len(write.Key())
} else {
if n != len(write.Key()) {
return nil, nil, nil, errors.Wrapf(ErrInvalid, "invalid key length, expecting %d, got %d", n, len(write.Key()))
}
}
}
}
return nsKeyLen, nsInMap, other, nil
}

func isNotExist(err error) bool {
return err == ErrNotExist || err == ErrBucketNotExist
}
Expand Down
Loading

0 comments on commit 5f4539f

Please sign in to comment.