Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

splistore cold object reification redux #8029

Merged
merged 12 commits into from
Feb 14, 2022
21 changes: 21 additions & 0 deletions blockstore/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package blockstore

import (
"context"
)

type hotViewKey struct{}

var hotView = hotViewKey{}

// WithHotView constructs a new context with an option that provides a hint to the blockstore
// (e.g. the splitstore) that the object (and its ipld references) should be kept hot.
func WithHotView(ctx context.Context) context.Context {
return context.WithValue(ctx, hotView, struct{}{})
}

// GetHotView returns true if the hot view option is set in the context
func GetHotView(ctx context.Context) bool {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
v := ctx.Value(hotView)
return v != nil
}
35 changes: 33 additions & 2 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ type SplitStore struct {
txnSyncCond sync.Cond
txnSync bool

// background cold object reification
reifyMx sync.Mutex
reifyCond sync.Cond
reifyPend map[cid.Cid]struct{}
reifyInProgress map[cid.Cid]struct{}

// registered protectors
protectors []func(func(cid.Cid) error) error
}
Expand Down Expand Up @@ -202,6 +208,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss.txnSyncCond.L = &ss.txnSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

ss.reifyCond.L = &ss.reifyMx
ss.reifyPend = make(map[cid.Cid]struct{})
ss.reifyInProgress = make(map[cid.Cid]struct{})

if enableDebugLog {
ss.debug, err = openDebugLog(path)
if err != nil {
Expand Down Expand Up @@ -264,7 +274,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
return true, nil
}

return s.cold.Has(ctx, cid)
has, err = s.cold.Has(ctx, cid)
if has && bstore.GetHotView(ctx) {
s.reifyColdObject(cid)
}

return has, err

}

func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
Expand Down Expand Up @@ -308,8 +324,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

blk, err = s.cold.Get(ctx, cid)
if err == nil {
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
if bstore.GetHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return blk, err

Expand Down Expand Up @@ -359,6 +378,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {

size, err = s.cold.GetSize(ctx, cid)
if err == nil {
if bstore.GetHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return size, err
Expand Down Expand Up @@ -536,6 +559,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro

err = s.cold.View(ctx, cid, cb)
if err == nil {
if bstore.GetHotView(ctx) {
s.reifyColdObject(cid)
}

stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return err
Expand Down Expand Up @@ -645,6 +672,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
}
}

// spawn the reifier
go s.reifyOrchestrator()

// watch the chain
chain.SubscribeHeadChanges(s.HeadChange)

Expand Down Expand Up @@ -676,6 +706,7 @@ func (s *SplitStore) Close() error {
}
}

s.reifyCond.Broadcast()
vyzo marked this conversation as resolved.
Show resolved Hide resolved
s.cancel()
return multierr.Combine(s.markSetEnv.Close(), s.debug.Close())
}
Expand Down
185 changes: 185 additions & 0 deletions blockstore/splitstore/splitstore_reify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package splitstore

import (
"runtime"
"sync/atomic"

"golang.org/x/xerrors"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)

func (s *SplitStore) reifyColdObject(c cid.Cid) {
if isUnitaryObject(c) {
return
}

s.reifyMx.Lock()
defer s.reifyMx.Unlock()

_, ok := s.reifyInProgress[c]
if ok {
return
}

s.reifyPend[c] = struct{}{}
s.reifyCond.Broadcast()
}

func (s *SplitStore) reifyOrchestrator() {
workers := runtime.NumCPU() / 4
if workers < 2 {
workers = 2
}

workch := make(chan cid.Cid, workers)
defer close(workch)

for i := 0; i < workers; i++ {
go s.reifyWorker(workch)
}

for {
s.reifyMx.Lock()
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
s.reifyCond.Wait()
}

if atomic.LoadInt32(&s.closing) != 0 {
s.reifyMx.Unlock()
return
}

reifyPend := s.reifyPend
s.reifyPend = make(map[cid.Cid]struct{})
s.reifyMx.Unlock()

for c := range reifyPend {
select {
case workch <- c:
case <-s.ctx.Done():
return
}
}
}
}

func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
for c := range workch {
s.doReify(c)
}
}

func (s *SplitStore) doReify(c cid.Cid) {
var toreify, totrack, toforget []cid.Cid

defer func() {
s.reifyMx.Lock()
defer s.reifyMx.Unlock()

for _, c := range toreify {
delete(s.reifyInProgress, c)
}
for _, c := range totrack {
delete(s.reifyInProgress, c)
}
for _, c := range toforget {
delete(s.reifyInProgress, c)
}
}()

s.txnLk.RLock()
defer s.txnLk.RUnlock()

err := s.walkObject(c, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}

s.reifyMx.Lock()
_, inProgress := s.reifyInProgress[c]
if !inProgress {
s.reifyInProgress[c] = struct{}{}
}
s.reifyMx.Unlock()

if inProgress {
return errStopWalk
}

has, err := s.hot.Has(s.ctx, c)
if err != nil {
return xerrors.Errorf("error checking hotstore: %w", err)
}

if has {
if s.txnMarkSet != nil {
hasMark, err := s.txnMarkSet.Has(c)
if err != nil {
log.Warnf("error checking markset: %s", err)
} else if hasMark {
toforget = append(toforget, c)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
return errStopWalk
}
} else {
totrack = append(totrack, c)
return errStopWalk
}
}

toreify = append(toreify, c)
return nil
})

if err != nil {
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
return
}

log.Debugf("reifying %d objects rooted at %s", len(toreify), c)

batch := make([]blocks.Block, 0, len(toreify))
vyzo marked this conversation as resolved.
Show resolved Hide resolved
for _, c := range toreify {
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
continue
}

if err := s.checkClosing(); err != nil {
return
}

batch = append(batch, blk)
}

if len(batch) > 0 {
err = s.hot.PutMany(s.ctx, batch)
if err != nil {
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
return
}
}

if s.txnMarkSet != nil {
if len(toreify) > 0 {
if err := s.txnMarkSet.MarkMany(toreify); err != nil {
log.Warnf("error marking reified objects: %s", err)
}
}
if len(totrack) > 0 {
if err := s.txnMarkSet.MarkMany(totrack); err != nil {
log.Warnf("error marking tracked objects: %s", err)
}
}
} else {
if len(toreify) > 0 {
s.trackTxnRefMany(toreify)
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}
if len(totrack) > 0 {
s.trackTxnRefMany(totrack)
}
}
}
Loading