From 723cfeebd2dd5857d36a9778b2d1194a0bf7da7f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 9 Jul 2021 22:30:13 -0700 Subject: [PATCH] final review You know, I'm actually pretty confident in this code. --- blockstore/splitstore/splitstore.go | 110 +++++++++++++++++++++++----- 1 file changed, 90 insertions(+), 20 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 6a13be19cc5..f689ca5e1d3 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -163,6 +163,12 @@ func init() { // attached to the ChainStore with Start in order to trigger compaction. func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) { // hot blockstore must support BlockstoreIterator + // nit: can we just define a local type? + // So we can store this instead of casting multiple times? + type hotStore interface { + bstore.Blockstore + bstore.BlockstoreIterator + } if _, ok := hot.(bstore.BlockstoreIterator); !ok { return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot) } @@ -181,6 +187,8 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co cold: cold, markSetEnv: markSetEnv, + // nit: this doesn't need to be allocated as long as we don't move it. + // fyi, you _can_ return &s.txnViews. txnViews: new(sync.WaitGroup), coldPurgeSize: defaultColdPurgeSize, @@ -263,6 +271,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { blk, err = s.cold.Get(cid) if err == nil { + // nit: use the stored context? stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) } @@ -393,20 +402,46 @@ func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, err } - ch := make(chan cid.Cid) + // arbitrary but >= 1 helps avoid context switching in this case + ch := make(chan cid.Cid, 8) go func() { defer cancel() defer close(ch) - for _, in := range []<-chan cid.Cid{chHot, chCold} { - for cid := range in { - select { - case ch <- cid: - case <-ctx.Done(): - return + // uber nit unless we have a reason to do this in order? + // Of course, this code is more complicated and also probably overkill. + // I actually kind of prefer the old code. + // But I'll just leave this here because I already wrote it. + for chHot != nil && chCold != nil { + var c cid.Cid + var ok bool + select { + case c, ok = <-chHot: + if !ok { + chHot = nil + continue + } + case c, ok = <-chCold: + if !ok { + chCold = nil + continue } + case <-ctx.Done(): + return + } + + select { + case ch <- c: + case <-ctx.Done(): + return } } + + // Also note: We're _supposed_ to guarantee that CIDs are only returned once. We + // _probably_ need to put the CIDs in a set to check that. + // + // That might actually be a reason to read through the hotstore before the + // coldstore? }() return ch, nil @@ -435,14 +470,18 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { // view can't have its data pointer deleted, which would be catastrophic. // Note that we can't just RLock for the duration of the view, as this could // lead to deadlock with recursive views. - wg := s.protectView(cid) - if wg != nil { - defer wg.Done() - } + + // What happens if the transaction ends while a view is open? What happens if a transaction + // ends then re-opens while a view is open? Highly unlikely... but I wonder if we should + // just use the wait-group regardless? + + // ultra-nit: you _can_ make this _slightly_ simpler by returning a possibly no-op function: + defer s.protectView(cid)() err := s.hot.View(cid, cb) switch err { case bstore.ErrNotFound: + // I see we repeat this pattern a lot. Can we put it in a helper? if s.debug != nil { s.mx.Lock() warm := s.warmupEpoch > 0 @@ -529,6 +568,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error { func (s *SplitStore) Close() error { atomic.StoreInt32(&s.closing, 1) + // I'm _pretty_ sure we can just use a lock here now. Doesn't even need to be a rwmutex. if atomic.LoadInt32(&s.critsection) == 1 { log.Warn("ongoing compaction in critical section; waiting for it to finish...") for atomic.LoadInt32(&s.critsection) == 1 { @@ -551,6 +591,15 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { // we are currently compacting -- protect the new tipset(s) + + // race: we call beginTxnProtect _after_ setting compacting. + // 1. HeadChange triggers a compaction but stops immediately after setting this + // flag. + // 2. HeadChange tries ties to trigger another compaction, sees one is already in + // progress, and calls protectTipSets. + // 3. protectTipSets doesn't do anything because txnActive is still false. + // + // Fix: use a lock and only release it once everything is "setup" correctly. s.protectTipSets(apply) return nil } @@ -676,6 +725,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) { s.txnRefs[c] = struct{}{} } + // defer? s.txnRefsMx.Unlock() return } @@ -729,6 +779,8 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { close(workch) + // FYI, https://pkg.go.dev/golang.org/x/sync/errgroup is a wonderful package. + // Example: https://github.com/filecoin-project/specs-actors/blob/ab02f795d994cd9105eefe2760a1846cfcfef192/actors/migration/nv13/top.go#L62-L221 worker := func(wg *sync.WaitGroup) { if wg != nil { defer wg.Done() @@ -737,6 +789,9 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { for c := range workch { err := s.doTxnProtect(c, markSet) if err != nil { + // I would _NOT_ proceed if we fail here. Any errors like + // this in GC should cause us to abort and (to avoid + // deleting anything). log.Warnf("error protecting transactional references: %s", err) return } @@ -751,6 +806,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { } wg.Wait() } else { + // This really isn't a necessary optimization. worker(nil) } @@ -1063,6 +1119,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("sorting done", "took", time.Since(startSort)) + // Can we add a comment explaining why we might want to do this here instead of just waiting till we purge? // 4.1 protect transactional refs once more err = s.protectTxnRefs(markSet) if err != nil { @@ -1093,6 +1150,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) // we are done; do some housekeeping + // We have a bunch of ways to return early without invoking this. Maybe we need to defer it? Or set some kind of dirty bit? s.endTxnProtect() s.gcHotstore() @@ -1126,6 +1184,7 @@ func (s *SplitStore) beginTxnProtect() *sync.WaitGroup { } func (s *SplitStore) beginTxnMarking(markSet MarkSet) { + // No lock? Have we tested this with the golang race detector? markSet.SetConcurrent() s.txnLk.Lock() @@ -1259,6 +1318,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro return nil } +// Maybe check closing in here? This could run for a while. // like walkObject, but the object may be potentially incomplete (references missing) func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error { if !walked.Visit(c) { @@ -1522,24 +1582,31 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { func(cids []cid.Cid) error { deadCids := deadCids[:0] - again: - if err := s.checkClosing(); err != nil { - return err - } + for { + if err := s.checkClosing(); err != nil { + return err + } - s.txnLk.Lock() - if len(s.txnRefs) > 0 { + s.txnLk.Lock() + if len(s.txnRefs) == 0 { + // keep the lock when we hit the desired condition. + break + } s.txnLk.Unlock() + // Doesn't really matter but... + // + // It's slightly strange that we're effectively looping _twice_. + // Maybe break protectTxnRefs into two functions? One for the loop, + // and one to do the actual thing? + // err := s.protectTxnRefs(markSet) if err != nil { return xerrors.Errorf("error protecting transactional refs: %w", err) } - - goto again } - defer s.txnLk.Unlock() + for _, c := range cids { live, err := markSet.Has(c) if err != nil { @@ -1635,6 +1702,8 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { } } + // Do we really want to proceed here? Maybe we should just abort and try again later? + // Also, aren't these errors? if len(missing) > 0 { log.Warnf("still missing %d references", len(missing)) for c := range missing { @@ -1699,6 +1768,7 @@ func bytesToUint64(buf []byte) uint64 { return i } +// So, I get the name... I think? But maybe flip it and say `isStateObject` (or `notStateObject`)? func isUnitaryObject(c cid.Cid) bool { pre := c.Prefix() switch pre.Codec {