diff --git a/blockstore/context.go b/blockstore/context.go new file mode 100644 index 00000000000..ebb6fafe337 --- /dev/null +++ b/blockstore/context.go @@ -0,0 +1,21 @@ +package blockstore + +import ( + "context" +) + +type hotViewKey struct{} + +var hotView = hotViewKey{} + +// WithHotView constructs a new context with an option that provides a hint to the blockstore +// (e.g. the splitstore) that the object (and its ipld references) should be kept hot. +func WithHotView(ctx context.Context) context.Context { + return context.WithValue(ctx, hotView, struct{}{}) +} + +// IsHotView returns true if the hot view option is set in the context +func IsHotView(ctx context.Context) bool { + v := ctx.Value(hotView) + return v != nil +} diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 6a65e01df36..a351df76a46 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -161,6 +161,13 @@ type SplitStore struct { txnSyncCond sync.Cond txnSync bool + // background cold object reification + reifyWorkers sync.WaitGroup + reifyMx sync.Mutex + reifyCond sync.Cond + reifyPend map[cid.Cid]struct{} + reifyInProgress map[cid.Cid]struct{} + // registered protectors protectors []func(func(cid.Cid) error) error } @@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.txnSyncCond.L = &ss.txnSyncMx ss.ctx, ss.cancel = context.WithCancel(context.Background()) + ss.reifyCond.L = &ss.reifyMx + ss.reifyPend = make(map[cid.Cid]struct{}) + ss.reifyInProgress = make(map[cid.Cid]struct{}) + if enableDebugLog { ss.debug, err = openDebugLog(path) if err != nil { @@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) { return true, nil } - return s.cold.Has(ctx, cid) + has, err = s.cold.Has(ctx, cid) + if has && bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + + return has, err + } func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { @@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) blk, err = s.cold.Get(ctx, cid) if err == nil { - stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return blk, err @@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { size, err = s.cold.GetSize(ctx, cid) if err == nil { + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return size, err @@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro err = s.cold.View(ctx, cid, cb) if err == nil { + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return err @@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error } } + // spawn the reifier + go s.reifyOrchestrator() + // watch the chain chain.SubscribeHeadChanges(s.HeadChange) @@ -676,6 +707,8 @@ func (s *SplitStore) Close() error { } } + s.reifyCond.Broadcast() + s.reifyWorkers.Wait() s.cancel() return multierr.Combine(s.markSetEnv.Close(), s.debug.Close()) } diff --git a/blockstore/splitstore/splitstore_reify.go b/blockstore/splitstore/splitstore_reify.go new file mode 100644 index 00000000000..14648a65217 --- /dev/null +++ b/blockstore/splitstore/splitstore_reify.go @@ -0,0 +1,193 @@ +package splitstore + +import ( + "runtime" + "sync/atomic" + + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +func (s *SplitStore) reifyColdObject(c cid.Cid) { + if !s.isWarm() { + return + } + + if isUnitaryObject(c) { + return + } + + s.reifyMx.Lock() + defer s.reifyMx.Unlock() + + _, ok := s.reifyInProgress[c] + if ok { + return + } + + s.reifyPend[c] = struct{}{} + s.reifyCond.Broadcast() +} + +func (s *SplitStore) reifyOrchestrator() { + workers := runtime.NumCPU() / 4 + if workers < 2 { + workers = 2 + } + + workch := make(chan cid.Cid, workers) + defer close(workch) + + for i := 0; i < workers; i++ { + s.reifyWorkers.Add(1) + go s.reifyWorker(workch) + } + + for { + s.reifyMx.Lock() + for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 { + s.reifyCond.Wait() + } + + if atomic.LoadInt32(&s.closing) != 0 { + s.reifyMx.Unlock() + return + } + + reifyPend := s.reifyPend + s.reifyPend = make(map[cid.Cid]struct{}) + s.reifyMx.Unlock() + + for c := range reifyPend { + select { + case workch <- c: + case <-s.ctx.Done(): + return + } + } + } +} + +func (s *SplitStore) reifyWorker(workch chan cid.Cid) { + defer s.reifyWorkers.Done() + for c := range workch { + s.doReify(c) + } +} + +func (s *SplitStore) doReify(c cid.Cid) { + var toreify, totrack, toforget []cid.Cid + + defer func() { + s.reifyMx.Lock() + defer s.reifyMx.Unlock() + + for _, c := range toreify { + delete(s.reifyInProgress, c) + } + for _, c := range totrack { + delete(s.reifyInProgress, c) + } + for _, c := range toforget { + delete(s.reifyInProgress, c) + } + }() + + s.txnLk.RLock() + defer s.txnLk.RUnlock() + + err := s.walkObject(c, newTmpVisitor(), + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + s.reifyMx.Lock() + _, inProgress := s.reifyInProgress[c] + if !inProgress { + s.reifyInProgress[c] = struct{}{} + } + s.reifyMx.Unlock() + + if inProgress { + return errStopWalk + } + + has, err := s.hot.Has(s.ctx, c) + if err != nil { + return xerrors.Errorf("error checking hotstore: %w", err) + } + + if has { + if s.txnMarkSet != nil { + hasMark, err := s.txnMarkSet.Has(c) + if err != nil { + log.Warnf("error checking markset: %s", err) + } else if hasMark { + toforget = append(toforget, c) + return errStopWalk + } + } else { + totrack = append(totrack, c) + return errStopWalk + } + } + + toreify = append(toreify, c) + return nil + }) + + if err != nil { + log.Warnf("error walking cold object for reification (cid: %s): %s", c, err) + return + } + + log.Debugf("reifying %d objects rooted at %s", len(toreify), c) + + // this should not get too big, maybe some 100s of objects. + batch := make([]blocks.Block, 0, len(toreify)) + for _, c := range toreify { + blk, err := s.cold.Get(s.ctx, c) + if err != nil { + log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err) + continue + } + + if err := s.checkClosing(); err != nil { + return + } + + batch = append(batch, blk) + } + + if len(batch) > 0 { + err = s.hot.PutMany(s.ctx, batch) + if err != nil { + log.Warnf("error reifying cold object (cid: %s): %s", c, err) + return + } + } + + if s.txnMarkSet != nil { + if len(toreify) > 0 { + if err := s.txnMarkSet.MarkMany(toreify); err != nil { + log.Warnf("error marking reified objects: %s", err) + } + } + if len(totrack) > 0 { + if err := s.txnMarkSet.MarkMany(totrack); err != nil { + log.Warnf("error marking tracked objects: %s", err) + } + } + } else { + // if txnActive is false these are noops + if len(toreify) > 0 { + s.trackTxnRefMany(toreify) + } + if len(totrack) > 0 { + s.trackTxnRefMany(totrack) + } + } +} diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 27d58bf1043..6b7e60e6c50 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "math/rand" "os" "sync" "sync/atomic" @@ -387,6 +388,135 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { } } +func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + hot := newMockStore() + cold := newMockStore() + + mkRandomBlock := func() blocks.Block { + data := make([]byte, 128) + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + return blocks.NewBlock(data) + } + + block1 := mkRandomBlock() + block2 := mkRandomBlock() + block3 := mkRandomBlock() + + hdr := mock.MkBlock(nil, 0, 0) + hdr.Messages = block1.Cid() + hdr.ParentMessageReceipts = block2.Cid() + hdr.ParentStateRoot = block3.Cid() + block4, err := hdr.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + + allBlocks := []blocks.Block{block1, block2, block3, block4} + for _, blk := range allBlocks { + err := cold.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + } + + path, err := ioutil.TempDir("", "splitstore.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + if err != nil { + t.Fatal(err) + } + defer ss.Close() //nolint + + ss.warmupEpoch = 1 + go ss.reifyOrchestrator() + + waitForReification := func() { + for { + ss.reifyMx.Lock() + ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0 + ss.reifyMx.Unlock() + + if ready { + return + } + + time.Sleep(time.Millisecond) + } + } + + // first access using the standard view + err = f(context.Background(), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + // nothing should be reified + waitForReification() + for _, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("block unexpectedly reified") + } + } + + // now make the hot/reifying view and ensure access reifies + err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + // everything should be reified + waitForReification() + for i, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatalf("block%d was not reified", i+1) + } + } +} + +func TestSplitStoreReification(t *testing.T) { + t.Log("test reification with Has") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Has(ctx, c) + return err + }) + t.Log("test reification with Get") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Get(ctx, c) + return err + }) + t.Log("test reification with GetSize") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.GetSize(ctx, c) + return err + }) + t.Log("test reification with View") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + return s.View(ctx, c, func(_ []byte) error { return nil }) + }) +} + type mockChain struct { t testing.TB diff --git a/chain/consensus/filcns/compute_state.go b/chain/consensus/filcns/compute_state.go index f7f6284d0a0..44b79285420 100644 --- a/chain/consensus/filcns/compute_state.go +++ b/chain/consensus/filcns/compute_state.go @@ -32,6 +32,7 @@ import ( /* inline-gen end */ + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin" @@ -92,6 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, sm *stmgr.StateManager partDone() }() + ctx = blockstore.WithHotView(ctx) makeVmWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (*vm.VM, error) { vmopt := &vm.VMOpts{ StateBase: base,