Skip to content

Commit

Permalink
More testing support
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Aug 3, 2022
1 parent 4dff48a commit 638f2f6
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 65 deletions.
8 changes: 8 additions & 0 deletions blockstore/idstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ func (b *idstore) Put(ctx context.Context, blk blocks.Block) error {
return b.bs.Put(ctx, blk)
}

func (b *idstore) ForEachKey(f func(cid.Cid) error) error {
iterBstore, ok := b.bs.(BlockstoreIterator)
if !ok {
return xerrors.Errorf("underlying blockstore (type %T) doesn't support fast iteration", b.bs)
}
return iterBstore.ForEachKey(f)
}

func isTestCid(c cid.Cid) bool {
testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
Expand Down
25 changes: 4 additions & 21 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package splitstore
import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cidtype "github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -45,6 +43,9 @@ var (
// compactionIndexKey stores the compaction index (serial number)
compactionIndexKey = dstore.NewKey("/splitstore/compactionIndex")

// stores the prune index (serial number)
pruneIndexKey = dstore.NewKey("/splitstore/pruneIndex")

log = logging.Logger("splitstore")

errClosing = errors.New("splitstore is closing")
Expand Down Expand Up @@ -155,6 +156,7 @@ type SplitStore struct {
markSetSize int64

compactionIndex int64
pruneIndex int64

ctx context.Context
cancel func()
Expand Down Expand Up @@ -316,12 +318,6 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
}

func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {
fmt.Printf("[SS] Get cid: %s\n", cid)
}
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {
defer fmt.Printf("[SS] finish get: %s\n", cid)
}
if isIdentiyCid(cid) {
data, err := decodeIdentityCid(cid)
if err != nil {
Expand All @@ -336,10 +332,6 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

// critical section
if s.txnMarkSet != nil {
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {
fmt.Printf("[SS] Get in critical section\n")
}

has, err := s.txnMarkSet.Has(cid)
if err != nil {
return nil, err
Expand All @@ -362,10 +354,6 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

switch {
case err == nil:
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {

fmt.Printf("[SS] hot had it\n")
}
s.trackTxnRef(cid)
return blk, nil

Expand All @@ -376,11 +364,6 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

blk, err = s.cold.Get(ctx, cid)
if err == nil {
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {

fmt.Printf("[SS] cold had it\n")
}

s.trackTxnRef(cid)
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
Expand Down
1 change: 1 addition & 0 deletions blockstore/splitstore/splitstore_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *SplitStore) Info() map[string]interface{} {
info["base epoch"] = s.baseEpoch
info["warmup epoch"] = s.warmupEpoch
info["compactions"] = s.compactionIndex
info["prunes"] = s.pruneIndex
info["compacting"] = s.compacting == 1

sizer, ok := s.hot.(bstore.BlockstoreSize)
Expand Down
42 changes: 0 additions & 42 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,6 @@ const (
batchSize = 16384
)

func isTestCid(c cid.Cid) bool {
testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
panic(err)
}

return c.Hash().HexString() == testCid.Hash().HexString()
}

func (s *SplitStore) fmtDebug(extra string) {
hHas, cHas := s.lookupTestCid()
fmt.Printf("[COMPACT]: %s, has test cid (hot %t) (cold %t))", extra, hHas, cHas)
}

func (s *SplitStore) lookupTestCid() (bool, bool) {
c, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
Expand Down Expand Up @@ -149,9 +135,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
}

if epoch-s.baseEpoch > CompactionThreshold {
fmt.Printf("YYYYYYYYY\n Begin compaction \nYYYYYYY\n")
// it's time to compact -- prepare the transaction and go!
s.fmtDebug("right before compaction")
s.beginTxnProtect()
s.compactType = hot
go func() {
Expand Down Expand Up @@ -571,8 +555,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}

s.fmtDebug("before chain walk")

// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
Expand All @@ -581,9 +563,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
count := new(int64)
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error {
if isTestCid(c) {
fmt.Printf("\n\n[COMPACT] found the cid %s during chain traversal!\n\n", c.String())
}
if isUnitaryObject(c) {
return errStopWalk
}
Expand All @@ -605,8 +584,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error marking: %w", err)
}

s.fmtDebug("after chain walk")

s.markSetSize = *count + *count>>2 // overestimate a bit

log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
Expand Down Expand Up @@ -637,11 +614,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

// some stats for logging
var hotCnt, coldCnt int
fmt.Printf("\n\n[COMPACT]: traversing hot store of type %T\n\n", s.hot)
err = s.hot.ForEachKey(func(c cid.Cid) error {
if isTestCid(c) {
fmt.Printf("\n\n[COMPACT]: found %s(%s) in hot store during compact\n\n", c, c.String())
}
// was it marked?
mark, err := markSet.Has(c)
if err != nil {
Expand All @@ -661,8 +634,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

return nil
})

s.fmtDebug("after traversing hot store")
if err != nil {
return xerrors.Errorf("error collecting cold objects: %w", err)
}
Expand Down Expand Up @@ -714,7 +685,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error resetting coldset: %w", err)
}
}
s.fmtDebug("after moving to cold store")

// 4. Purge cold objects with checkpointing for recovery.
// This is the critical section of compaction, whereby any cold object not in the markSet is
Expand All @@ -738,8 +708,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}

s.fmtDebug("inside critical section")

checkpoint, err := NewCheckpoint(s.checkpointPath())
if err != nil {
return xerrors.Errorf("error creating checkpoint: %w", err)
Expand All @@ -755,7 +723,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))

s.fmtDebug("after purge")
s.endCriticalSection()

if err := checkpoint.Close(); err != nil {
Expand All @@ -773,9 +740,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

// we are done; do some housekeeping
s.endTxnProtect()
s.fmtDebug("critical section done")
s.gcHotstore()
s.fmtDebug("hot store gc")

err = s.setBaseEpoch(boundaryEpoch)
if err != nil {
Expand Down Expand Up @@ -1205,10 +1170,6 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
if err := s.checkClosing(); err != nil {
return err
}
if isTestCid(c) {
fmt.Printf("[COMPACT]: foudn test cid while moving cold blocks %s\n\n", c)
}

blk, err := s.hot.Get(s.ctx, c)
if err != nil {
if ipld.IsNotFound(err) {
Expand Down Expand Up @@ -1276,9 +1237,6 @@ func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet

err := coldr.ForEach(func(c cid.Cid) error {
batch = append(batch, c)
if isTestCid(c) {
fmt.Printf("[COMPACT]: found test cid while purging\n\n")
}
if len(batch) == batchSize {
return deleteBatch()
}
Expand Down
31 changes: 31 additions & 0 deletions blockstore/splitstore/splitstore_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package splitstore

import (
"bytes"
"fmt"
"os"
"runtime"
"sync"
Expand Down Expand Up @@ -97,6 +98,7 @@ func (s *SplitStore) PruneChain(opts map[string]interface{}) error {
}

if _, ok := s.cold.(bstore.BlockstoreIterator); !ok {
fmt.Printf("cold store type: %T\n", s.cold)
return xerrors.Errorf("coldstore does not support efficient iteration")
}

Expand Down Expand Up @@ -146,12 +148,22 @@ func (s *SplitStore) pruneChain(retainStateP func(int64) bool, doGC func() error
return nil
}

func isTestCid(c cid.Cid) bool {
testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
panic(err)
}

return c.Hash().HexString() == testCid.Hash().HexString()
}

func (s *SplitStore) prune(curTs *types.TipSet, retainStateP func(int64) bool, doGC func() error) {
log.Info("waiting for active views to complete")
start := time.Now()
s.viewWait()
log.Infow("waiting for active views done", "took", time.Since(start))

fmt.Printf("[PRUNE]: starting\n")
err := s.doPrune(curTs, retainStateP, doGC)
if err != nil {
log.Errorf("PRUNE ERROR: %s", err)
Expand Down Expand Up @@ -185,6 +197,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
// and chain headers; state and reciepts are retained only if it is within retention policy scope
log.Info("marking reachable objects")
startMark := time.Now()
fmt.Printf("[PRUNE]: marking\n")

count := new(int64)
err = s.walkChainDeep(curTs, retainStateP,
Expand Down Expand Up @@ -226,6 +239,8 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
return err
}

fmt.Printf("[PRUNE]: iter cold store to find dead\n")

// 2. iterate through the coldstore to collect dead objects
log.Info("collecting dead objects")
startCollect := time.Now()
Expand All @@ -241,13 +256,19 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,

err = s.cold.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error {
// was it marked?
if isTestCid(c) {
fmt.Printf("[PRUNE] found test cid %s in deadset collection iteration\n", c)
}
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
}

if mark {
liveCnt++
if isTestCid(c) {
fmt.Printf("[PRUNE] huh found the test cid as marked, no prune for us...\n")
}
return nil
}

Expand Down Expand Up @@ -295,6 +316,8 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
}
defer deadr.Close() //nolint:errcheck

fmt.Printf("[PRUNE]: purge the dead\n")

// 3. Purge dead objects with checkpointing for recovery.
// This is the critical section of prune, whereby any dead object not in the markSet is
// considered already deleted.
Expand All @@ -305,6 +328,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
if err := s.beginCriticalSection(markSet); err != nil {
return xerrors.Errorf("error beginning critical section: %w", err)
}
fmt.Printf("[PRUNE]: critical\n")

if err := s.checkClosing(); err != nil {
return err
Expand All @@ -325,6 +349,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
log.Infow("purging dead objects from coldstore done", "took", time.Since(startPurge))

s.endCriticalSection()
fmt.Printf("[PRUNE]: done with critical\n")

if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
Expand All @@ -346,6 +371,12 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
log.Warnf("error garbage collecting cold store: %s", err)
}

s.pruneIndex++
err = s.ds.Put(s.ctx, pruneIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion itests/kit/ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (n *Ensemble) Start() *Ensemble {
for i, full := range n.inactive.fullnodes {

var r repo.Repo
if full.options.fsrepo {
if !full.options.fsrepo {
rmem := repo.NewMemory(nil)
n.t.Cleanup(rmem.Cleanup)
r = rmem
Expand Down
Loading

0 comments on commit 638f2f6

Please sign in to comment.