Skip to content

Commit

Permalink
Merge pull request #8029 from filecoin-project/feat/splistore-cold-ob…
Browse files Browse the repository at this point in the history
…ject-reification-redux

splistore cold object reification redux
  • Loading branch information
magik6k authored Feb 14, 2022
2 parents 3b5b55d + 6bcade5 commit 7efed66
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 2 deletions.
21 changes: 21 additions & 0 deletions blockstore/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package blockstore

import (
"context"
)

type hotViewKey struct{}

var hotView = hotViewKey{}

// WithHotView constructs a new context with an option that provides a hint to the blockstore
// (e.g. the splitstore) that the object (and its ipld references) should be kept hot.
func WithHotView(ctx context.Context) context.Context {
return context.WithValue(ctx, hotView, struct{}{})
}

// IsHotView returns true if the hot view option is set in the context
func IsHotView(ctx context.Context) bool {
v := ctx.Value(hotView)
return v != nil
}
37 changes: 35 additions & 2 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ type SplitStore struct {
txnSyncCond sync.Cond
txnSync bool

// background cold object reification
reifyWorkers sync.WaitGroup
reifyMx sync.Mutex
reifyCond sync.Cond
reifyPend map[cid.Cid]struct{}
reifyInProgress map[cid.Cid]struct{}

// registered protectors
protectors []func(func(cid.Cid) error) error
}
Expand Down Expand Up @@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss.txnSyncCond.L = &ss.txnSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

ss.reifyCond.L = &ss.reifyMx
ss.reifyPend = make(map[cid.Cid]struct{})
ss.reifyInProgress = make(map[cid.Cid]struct{})

if enableDebugLog {
ss.debug, err = openDebugLog(path)
if err != nil {
Expand Down Expand Up @@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
return true, nil
}

return s.cold.Has(ctx, cid)
has, err = s.cold.Has(ctx, cid)
if has && bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

return has, err

}

func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
Expand Down Expand Up @@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

blk, err = s.cold.Get(ctx, cid)
if err == nil {
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return blk, err

Expand Down Expand Up @@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {

size, err = s.cold.GetSize(ctx, cid)
if err == nil {
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return size, err
Expand Down Expand Up @@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro

err = s.cold.View(ctx, cid, cb)
if err == nil {
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return err
Expand Down Expand Up @@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
}
}

// spawn the reifier
go s.reifyOrchestrator()

// watch the chain
chain.SubscribeHeadChanges(s.HeadChange)

Expand Down Expand Up @@ -676,6 +707,8 @@ func (s *SplitStore) Close() error {
}
}

s.reifyCond.Broadcast()
s.reifyWorkers.Wait()
s.cancel()
return multierr.Combine(s.markSetEnv.Close(), s.debug.Close())
}
Expand Down
193 changes: 193 additions & 0 deletions blockstore/splitstore/splitstore_reify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package splitstore

import (
"runtime"
"sync/atomic"

"golang.org/x/xerrors"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)

func (s *SplitStore) reifyColdObject(c cid.Cid) {
if !s.isWarm() {
return
}

if isUnitaryObject(c) {
return
}

s.reifyMx.Lock()
defer s.reifyMx.Unlock()

_, ok := s.reifyInProgress[c]
if ok {
return
}

s.reifyPend[c] = struct{}{}
s.reifyCond.Broadcast()
}

func (s *SplitStore) reifyOrchestrator() {
workers := runtime.NumCPU() / 4
if workers < 2 {
workers = 2
}

workch := make(chan cid.Cid, workers)
defer close(workch)

for i := 0; i < workers; i++ {
s.reifyWorkers.Add(1)
go s.reifyWorker(workch)
}

for {
s.reifyMx.Lock()
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
s.reifyCond.Wait()
}

if atomic.LoadInt32(&s.closing) != 0 {
s.reifyMx.Unlock()
return
}

reifyPend := s.reifyPend
s.reifyPend = make(map[cid.Cid]struct{})
s.reifyMx.Unlock()

for c := range reifyPend {
select {
case workch <- c:
case <-s.ctx.Done():
return
}
}
}
}

func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
defer s.reifyWorkers.Done()
for c := range workch {
s.doReify(c)
}
}

func (s *SplitStore) doReify(c cid.Cid) {
var toreify, totrack, toforget []cid.Cid

defer func() {
s.reifyMx.Lock()
defer s.reifyMx.Unlock()

for _, c := range toreify {
delete(s.reifyInProgress, c)
}
for _, c := range totrack {
delete(s.reifyInProgress, c)
}
for _, c := range toforget {
delete(s.reifyInProgress, c)
}
}()

s.txnLk.RLock()
defer s.txnLk.RUnlock()

err := s.walkObject(c, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

s.reifyMx.Lock()
_, inProgress := s.reifyInProgress[c]
if !inProgress {
s.reifyInProgress[c] = struct{}{}
}
s.reifyMx.Unlock()

if inProgress {
return errStopWalk
}

has, err := s.hot.Has(s.ctx, c)
if err != nil {
return xerrors.Errorf("error checking hotstore: %w", err)
}

if has {
if s.txnMarkSet != nil {
hasMark, err := s.txnMarkSet.Has(c)
if err != nil {
log.Warnf("error checking markset: %s", err)
} else if hasMark {
toforget = append(toforget, c)
return errStopWalk
}
} else {
totrack = append(totrack, c)
return errStopWalk
}
}

toreify = append(toreify, c)
return nil
})

if err != nil {
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
return
}

log.Debugf("reifying %d objects rooted at %s", len(toreify), c)

// this should not get too big, maybe some 100s of objects.
batch := make([]blocks.Block, 0, len(toreify))
for _, c := range toreify {
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
continue
}

if err := s.checkClosing(); err != nil {
return
}

batch = append(batch, blk)
}

if len(batch) > 0 {
err = s.hot.PutMany(s.ctx, batch)
if err != nil {
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
return
}
}

if s.txnMarkSet != nil {
if len(toreify) > 0 {
if err := s.txnMarkSet.MarkMany(toreify); err != nil {
log.Warnf("error marking reified objects: %s", err)
}
}
if len(totrack) > 0 {
if err := s.txnMarkSet.MarkMany(totrack); err != nil {
log.Warnf("error marking tracked objects: %s", err)
}
}
} else {
// if txnActive is false these are noops
if len(toreify) > 0 {
s.trackTxnRefMany(toreify)
}
if len(totrack) > 0 {
s.trackTxnRefMany(totrack)
}
}
}
Loading

0 comments on commit 7efed66

Please sign in to comment.