From 638f2f64e13ffb882bc781f9ba768a0a8dee4054 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Wed, 3 Aug 2022 12:45:11 -0400 Subject: [PATCH] More testing support --- blockstore/idstore.go | 8 ++++ blockstore/splitstore/splitstore.go | 25 ++---------- blockstore/splitstore/splitstore_check.go | 1 + blockstore/splitstore/splitstore_compact.go | 42 --------------------- blockstore/splitstore/splitstore_prune.go | 31 +++++++++++++++ itests/kit/ensemble.go | 2 +- itests/splitstore_test.go | 25 +++++++++++- 7 files changed, 69 insertions(+), 65 deletions(-) diff --git a/blockstore/idstore.go b/blockstore/idstore.go index 5d3f40740b4..5eb25972bd4 100644 --- a/blockstore/idstore.go +++ b/blockstore/idstore.go @@ -103,6 +103,14 @@ func (b *idstore) Put(ctx context.Context, blk blocks.Block) error { return b.bs.Put(ctx, blk) } +func (b *idstore) ForEachKey(f func(cid.Cid) error) error { + iterBstore, ok := b.bs.(BlockstoreIterator) + if !ok { + return xerrors.Errorf("underlying blockstore (type %T) doesn't support fast iteration", b.bs) + } + return iterBstore.ForEachKey(f) +} + func isTestCid(c cid.Cid) bool { testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu") if err != nil { diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 5b8ebc5faaf..f3dae1df1ad 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -3,7 +3,6 @@ package splitstore import ( "context" "errors" - "fmt" "os" "sync" "sync/atomic" @@ -11,7 +10,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - cidtype "github.com/ipfs/go-cid" dstore "github.com/ipfs/go-datastore" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" @@ -45,6 +43,9 @@ var ( // compactionIndexKey stores the compaction index (serial number) compactionIndexKey = dstore.NewKey("/splitstore/compactionIndex") + // stores the prune index (serial number) + pruneIndexKey = dstore.NewKey("/splitstore/pruneIndex") + log = logging.Logger("splitstore") errClosing = errors.New("splitstore is closing") @@ -155,6 +156,7 @@ type SplitStore struct { markSetSize int64 compactionIndex int64 + pruneIndex int64 ctx context.Context cancel func() @@ -316,12 +318,6 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) { } func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { - if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid { - fmt.Printf("[SS] Get cid: %s\n", cid) - } - if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid { - defer fmt.Printf("[SS] finish get: %s\n", cid) - } if isIdentiyCid(cid) { data, err := decodeIdentityCid(cid) if err != nil { @@ -336,10 +332,6 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) // critical section if s.txnMarkSet != nil { - if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid { - fmt.Printf("[SS] Get in critical section\n") - } - has, err := s.txnMarkSet.Has(cid) if err != nil { return nil, err @@ -362,10 +354,6 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) switch { case err == nil: - if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid { - - fmt.Printf("[SS] hot had it\n") - } s.trackTxnRef(cid) return blk, nil @@ -376,11 +364,6 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) blk, err = s.cold.Get(ctx, cid) if err == nil { - if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid { - - fmt.Printf("[SS] cold had it\n") - } - s.trackTxnRef(cid) if bstore.IsHotView(ctx) { s.reifyColdObject(cid) diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index 585b8391fd3..6452b3ee2f6 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -147,6 +147,7 @@ func (s *SplitStore) Info() map[string]interface{} { info["base epoch"] = s.baseEpoch info["warmup epoch"] = s.warmupEpoch info["compactions"] = s.compactionIndex + info["prunes"] = s.pruneIndex info["compacting"] = s.compacting == 1 sizer, ok := s.hot.(bstore.BlockstoreSize) diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 8f75b85c7ec..d5201e23899 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -70,20 +70,6 @@ const ( batchSize = 16384 ) -func isTestCid(c cid.Cid) bool { - testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu") - if err != nil { - panic(err) - } - - return c.Hash().HexString() == testCid.Hash().HexString() -} - -func (s *SplitStore) fmtDebug(extra string) { - hHas, cHas := s.lookupTestCid() - fmt.Printf("[COMPACT]: %s, has test cid (hot %t) (cold %t))", extra, hHas, cHas) -} - func (s *SplitStore) lookupTestCid() (bool, bool) { c, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu") if err != nil { @@ -149,9 +135,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { } if epoch-s.baseEpoch > CompactionThreshold { - fmt.Printf("YYYYYYYYY\n Begin compaction \nYYYYYYY\n") // it's time to compact -- prepare the transaction and go! - s.fmtDebug("right before compaction") s.beginTxnProtect() s.compactType = hot go func() { @@ -571,8 +555,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } - s.fmtDebug("before chain walk") - // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // and messages until the boundary epoch. log.Info("marking reachable objects") @@ -581,9 +563,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { count := new(int64) err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, func(c cid.Cid) error { - if isTestCid(c) { - fmt.Printf("\n\n[COMPACT] found the cid %s during chain traversal!\n\n", c.String()) - } if isUnitaryObject(c) { return errStopWalk } @@ -605,8 +584,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error marking: %w", err) } - s.fmtDebug("after chain walk") - s.markSetSize = *count + *count>>2 // overestimate a bit log.Infow("marking done", "took", time.Since(startMark), "marked", *count) @@ -637,11 +614,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // some stats for logging var hotCnt, coldCnt int - fmt.Printf("\n\n[COMPACT]: traversing hot store of type %T\n\n", s.hot) err = s.hot.ForEachKey(func(c cid.Cid) error { - if isTestCid(c) { - fmt.Printf("\n\n[COMPACT]: found %s(%s) in hot store during compact\n\n", c, c.String()) - } // was it marked? mark, err := markSet.Has(c) if err != nil { @@ -661,8 +634,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil }) - - s.fmtDebug("after traversing hot store") if err != nil { return xerrors.Errorf("error collecting cold objects: %w", err) } @@ -714,7 +685,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error resetting coldset: %w", err) } } - s.fmtDebug("after moving to cold store") // 4. Purge cold objects with checkpointing for recovery. // This is the critical section of compaction, whereby any cold object not in the markSet is @@ -738,8 +708,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } - s.fmtDebug("inside critical section") - checkpoint, err := NewCheckpoint(s.checkpointPath()) if err != nil { return xerrors.Errorf("error creating checkpoint: %w", err) @@ -755,7 +723,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) - s.fmtDebug("after purge") s.endCriticalSection() if err := checkpoint.Close(); err != nil { @@ -773,9 +740,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // we are done; do some housekeeping s.endTxnProtect() - s.fmtDebug("critical section done") s.gcHotstore() - s.fmtDebug("hot store gc") err = s.setBaseEpoch(boundaryEpoch) if err != nil { @@ -1205,10 +1170,6 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error { if err := s.checkClosing(); err != nil { return err } - if isTestCid(c) { - fmt.Printf("[COMPACT]: foudn test cid while moving cold blocks %s\n\n", c) - } - blk, err := s.hot.Get(s.ctx, c) if err != nil { if ipld.IsNotFound(err) { @@ -1276,9 +1237,6 @@ func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet err := coldr.ForEach(func(c cid.Cid) error { batch = append(batch, c) - if isTestCid(c) { - fmt.Printf("[COMPACT]: found test cid while purging\n\n") - } if len(batch) == batchSize { return deleteBatch() } diff --git a/blockstore/splitstore/splitstore_prune.go b/blockstore/splitstore/splitstore_prune.go index 4934a331a33..6f92f13a867 100644 --- a/blockstore/splitstore/splitstore_prune.go +++ b/blockstore/splitstore/splitstore_prune.go @@ -2,6 +2,7 @@ package splitstore import ( "bytes" + "fmt" "os" "runtime" "sync" @@ -97,6 +98,7 @@ func (s *SplitStore) PruneChain(opts map[string]interface{}) error { } if _, ok := s.cold.(bstore.BlockstoreIterator); !ok { + fmt.Printf("cold store type: %T\n", s.cold) return xerrors.Errorf("coldstore does not support efficient iteration") } @@ -146,12 +148,22 @@ func (s *SplitStore) pruneChain(retainStateP func(int64) bool, doGC func() error return nil } +func isTestCid(c cid.Cid) bool { + testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu") + if err != nil { + panic(err) + } + + return c.Hash().HexString() == testCid.Hash().HexString() +} + func (s *SplitStore) prune(curTs *types.TipSet, retainStateP func(int64) bool, doGC func() error) { log.Info("waiting for active views to complete") start := time.Now() s.viewWait() log.Infow("waiting for active views done", "took", time.Since(start)) + fmt.Printf("[PRUNE]: starting\n") err := s.doPrune(curTs, retainStateP, doGC) if err != nil { log.Errorf("PRUNE ERROR: %s", err) @@ -185,6 +197,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, // and chain headers; state and reciepts are retained only if it is within retention policy scope log.Info("marking reachable objects") startMark := time.Now() + fmt.Printf("[PRUNE]: marking\n") count := new(int64) err = s.walkChainDeep(curTs, retainStateP, @@ -226,6 +239,8 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, return err } + fmt.Printf("[PRUNE]: iter cold store to find dead\n") + // 2. iterate through the coldstore to collect dead objects log.Info("collecting dead objects") startCollect := time.Now() @@ -241,6 +256,9 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, err = s.cold.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error { // was it marked? + if isTestCid(c) { + fmt.Printf("[PRUNE] found test cid %s in deadset collection iteration\n", c) + } mark, err := markSet.Has(c) if err != nil { return xerrors.Errorf("error checking mark set for %s: %w", c, err) @@ -248,6 +266,9 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, if mark { liveCnt++ + if isTestCid(c) { + fmt.Printf("[PRUNE] huh found the test cid as marked, no prune for us...\n") + } return nil } @@ -295,6 +316,8 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, } defer deadr.Close() //nolint:errcheck + fmt.Printf("[PRUNE]: purge the dead\n") + // 3. Purge dead objects with checkpointing for recovery. // This is the critical section of prune, whereby any dead object not in the markSet is // considered already deleted. @@ -305,6 +328,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, if err := s.beginCriticalSection(markSet); err != nil { return xerrors.Errorf("error beginning critical section: %w", err) } + fmt.Printf("[PRUNE]: critical\n") if err := s.checkClosing(); err != nil { return err @@ -325,6 +349,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, log.Infow("purging dead objects from coldstore done", "took", time.Since(startPurge)) s.endCriticalSection() + fmt.Printf("[PRUNE]: done with critical\n") if err := checkpoint.Close(); err != nil { log.Warnf("error closing checkpoint: %s", err) @@ -346,6 +371,12 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, log.Warnf("error garbage collecting cold store: %s", err) } + s.pruneIndex++ + err = s.ds.Put(s.ctx, pruneIndexKey, int64ToBytes(s.compactionIndex)) + if err != nil { + return xerrors.Errorf("error saving compaction index: %w", err) + } + return nil } diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 7957e5a9ee7..2185485d376 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -338,7 +338,7 @@ func (n *Ensemble) Start() *Ensemble { for i, full := range n.inactive.fullnodes { var r repo.Repo - if full.options.fsrepo { + if !full.options.fsrepo { rmem := repo.NewMemory(nil) n.t.Cleanup(rmem.Cleanup) r = rmem diff --git a/itests/splitstore_test.go b/itests/splitstore_test.go index 28a5f8cf88d..df812dc58b3 100644 --- a/itests/splitstore_test.go +++ b/itests/splitstore_test.go @@ -99,7 +99,7 @@ func TestHotstoreCompactCleansGarbage(t *testing.T) { // Create unreachable state // Check that it moves to coldstore // Prune coldstore and check that it is deleted -func TestColdStore(t *testing.T) { +func TestColdStorePrune(t *testing.T) { ctx := context.Background() // disable sync checking because efficient itests require that the node is out of sync : / splitstore.CheckSyncGap = false @@ -150,6 +150,10 @@ func TestColdStore(t *testing.T) { assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore") bm.Restart() + require.NoError(t, full.ChainPrune(ctx, nil)) + waitForPrune(ctx, t, 1, full) + assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store after prune but it's still there") + } func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) { @@ -161,6 +165,15 @@ func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.Tes } } +func waitForPrune(ctx context.Context, t *testing.T, pIdx int64, n *kit.TestFullNode) { + for { + if splitStorePruneIndex(ctx, t, n) >= pIdx { + break + } + time.Sleep(1 * time.Second) + } +} + func splitStoreCompacting(ctx context.Context, t *testing.T, n *kit.TestFullNode) bool { info, err := n.ChainBlockstoreInfo(ctx) require.NoError(t, err) @@ -191,6 +204,16 @@ func splitStoreCompactionIndex(ctx context.Context, t *testing.T, n *kit.TestFul return compactionIndex } +func splitStorePruneIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 { + info, err := n.ChainBlockstoreInfo(ctx) + require.NoError(t, err) + prune, ok := info["prunes"] + require.True(t, ok, "prunes not on blockstore info") + pruneIndex, ok := prune.(int64) + require.True(t, ok, "prune key on blockstore info wrong type") + return pruneIndex +} + // Create on chain unreachable garbage for a network to exercise splitstore // one garbage cid created at a time //