diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index e03266ab7f9..17ebbd7eab9 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -5,7 +5,7 @@ import ( "fmt" "io" "runtime" - "sync/atomic" + "sync" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" @@ -73,20 +73,16 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) { } const ( - stateOpen int64 = iota + stateOpen = iota stateClosing stateClosed ) // Blockstore is a badger-backed IPLD blockstore. -// -// NOTE: once Close() is called, methods will try their best to return -// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent -// operation calls after Close() has returned, but it may not happen for -// operations in progress. Those are likely to fail with a different error. type Blockstore struct { - // state is accessed atomically - state int64 + stateLk sync.RWMutex + state int + viewers sync.WaitGroup DB *badger.DB @@ -97,6 +93,7 @@ type Blockstore struct { var _ blockstore.Blockstore = (*Blockstore)(nil) var _ blockstore.Viewer = (*Blockstore)(nil) +var _ blockstore.BlockstoreIterator = (*Blockstore)(nil) var _ io.Closer = (*Blockstore)(nil) // Open creates a new badger-backed blockstore, with the supplied options. @@ -124,19 +121,51 @@ func Open(opts Options) (*Blockstore, error) { // Close closes the store. If the store has already been closed, this noops and // returns an error, even if the first closure resulted in error. func (b *Blockstore) Close() error { - if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) { + b.stateLk.Lock() + if b.state != stateOpen { + b.stateLk.Unlock() return nil } + b.state = stateClosing + b.stateLk.Unlock() + + defer func() { + b.stateLk.Lock() + b.state = stateClosed + b.stateLk.Unlock() + }() + + // wait for all accesses to complete + b.viewers.Wait() - defer atomic.StoreInt64(&b.state, stateClosed) return b.DB.Close() } +func (b *Blockstore) access() error { + b.stateLk.RLock() + defer b.stateLk.RUnlock() + + if b.state != stateOpen { + return ErrBlockstoreClosed + } + + b.viewers.Add(1) + return nil +} + +func (b *Blockstore) isOpen() bool { + b.stateLk.RLock() + defer b.stateLk.RUnlock() + + return b.state == stateOpen +} + // CollectGarbage runs garbage collection on the value log func (b *Blockstore) CollectGarbage() error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() var err error for err == nil { @@ -153,9 +182,10 @@ func (b *Blockstore) CollectGarbage() error { // Compact runs a synchronous compaction func (b *Blockstore) Compact() error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() nworkers := runtime.NumCPU() / 2 if nworkers < 2 { @@ -168,9 +198,10 @@ func (b *Blockstore) Compact() error { // View implements blockstore.Viewer, which leverages zero-copy read-only // access to values. func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -191,9 +222,10 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { // Has implements Blockstore.Has. func (b *Blockstore) Has(cid cid.Cid) (bool, error) { - if atomic.LoadInt64(&b.state) != stateOpen { - return false, ErrBlockstoreClosed + if err := b.access(); err != nil { + return false, err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -221,9 +253,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { return nil, blockstore.ErrNotFound } - if atomic.LoadInt64(&b.state) != stateOpen { - return nil, ErrBlockstoreClosed + if err := b.access(); err != nil { + return nil, err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -250,9 +283,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { // GetSize implements Blockstore.GetSize. func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { - if atomic.LoadInt64(&b.state) != stateOpen { - return -1, ErrBlockstoreClosed + if err := b.access(); err != nil { + return 0, err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -279,9 +313,10 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { // Put implements Blockstore.Put. func (b *Blockstore) Put(block blocks.Block) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(block.Cid()) if pooled { @@ -299,9 +334,10 @@ func (b *Blockstore) Put(block blocks.Block) error { // PutMany implements Blockstore.PutMany. func (b *Blockstore) PutMany(blocks []blocks.Block) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because @@ -338,9 +374,10 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { // DeleteBlock implements Blockstore.DeleteBlock. func (b *Blockstore) DeleteBlock(cid cid.Cid) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -353,9 +390,10 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error { } func (b *Blockstore) DeleteMany(cids []cid.Cid) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because @@ -392,8 +430,8 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error { // AllKeysChan implements Blockstore.AllKeysChan. func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - if atomic.LoadInt64(&b.state) != stateOpen { - return nil, ErrBlockstoreClosed + if err := b.access(); err != nil { + return nil, err } txn := b.DB.NewTransaction(false) @@ -405,6 +443,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid) go func() { + defer b.viewers.Done() defer close(ch) defer iter.Close() @@ -415,7 +454,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { if ctx.Err() != nil { return // context has fired. } - if atomic.LoadInt64(&b.state) != stateOpen { + if !b.isOpen() { // open iterators will run even after the database is closed... return // closing, yield. } @@ -442,6 +481,56 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return ch, nil } +// Implementation of BlockstoreIterator interface +func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { + if err := b.access(); err != nil { + return err + } + defer b.viewers.Done() + + txn := b.DB.NewTransaction(false) + defer txn.Discard() + + opts := badger.IteratorOptions{PrefetchSize: 100} + if b.prefixing { + opts.Prefix = b.prefix + } + + iter := txn.NewIterator(opts) + defer iter.Close() + + var buf []byte + for iter.Rewind(); iter.Valid(); iter.Next() { + if !b.isOpen() { + return ErrBlockstoreClosed + } + + k := iter.Item().Key() + if b.prefixing { + k = k[b.prefixLen:] + } + + klen := base32.RawStdEncoding.DecodedLen(len(k)) + if klen > len(buf) { + buf = make([]byte, klen) + } + + n, err := base32.RawStdEncoding.Decode(buf, k) + if err != nil { + return err + } + + c := cid.NewCidV1(cid.Raw, buf[:n]) + + err = f(c) + if err != nil { + return err + } + } + + return nil +} + // HashOnRead implements Blockstore.HashOnRead. It is not supported by this // blockstore. func (b *Blockstore) HashOnRead(_ bool) { diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 23f0bd7546c..084bbaecc57 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -30,6 +30,11 @@ type BatchDeleter interface { DeleteMany(cids []cid.Cid) error } +// BlockstoreIterator is a trait for efficient iteration +type BlockstoreIterator interface { + ForEachKey(func(cid.Cid) error) error +} + // WrapIDStore wraps the underlying blockstore in an "identity" blockstore. // The ID store filters out all puts for blocks with CIDs using the "identity" // hash function. It also extracts inlined blocks from CIDs using the identity diff --git a/blockstore/discard.go b/blockstore/discard.go new file mode 100644 index 00000000000..afd0651bc07 --- /dev/null +++ b/blockstore/discard.go @@ -0,0 +1,66 @@ +package blockstore + +import ( + "context" + "io" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +var _ Blockstore = (*discardstore)(nil) + +type discardstore struct { + bs Blockstore +} + +func NewDiscardStore(bs Blockstore) Blockstore { + return &discardstore{bs: bs} +} + +func (b *discardstore) Has(cid cid.Cid) (bool, error) { + return b.bs.Has(cid) +} + +func (b *discardstore) HashOnRead(hor bool) { + b.bs.HashOnRead(hor) +} + +func (b *discardstore) Get(cid cid.Cid) (blocks.Block, error) { + return b.bs.Get(cid) +} + +func (b *discardstore) GetSize(cid cid.Cid) (int, error) { + return b.bs.GetSize(cid) +} + +func (b *discardstore) View(cid cid.Cid, f func([]byte) error) error { + return b.bs.View(cid, f) +} + +func (b *discardstore) Put(blk blocks.Block) error { + return nil +} + +func (b *discardstore) PutMany(blks []blocks.Block) error { + return nil +} + +func (b *discardstore) DeleteBlock(cid cid.Cid) error { + return nil +} + +func (b *discardstore) DeleteMany(cids []cid.Cid) error { + return nil +} + +func (b *discardstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.bs.AllKeysChan(ctx) +} + +func (b *discardstore) Close() error { + if c, ok := b.bs.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/blockstore/splitstore/README.md b/blockstore/splitstore/README.md new file mode 100644 index 00000000000..1c6569a34e7 --- /dev/null +++ b/blockstore/splitstore/README.md @@ -0,0 +1,72 @@ +# SplitStore: An actively scalable blockstore for the Filecoin chain + +The SplitStore was first introduced in lotus v1.5.1, as an experiment +in reducing the performance impact of large blockstores. + +With lotus v1.11.1, we introduce the next iteration in design and +implementation, which we call SplitStore v1. + +The new design (see [#6474](https://github.com/filecoin-project/lotus/pull/6474) +evolves the splitstore to be a freestanding compacting blockstore that +allows us to keep a small (60-100GB) working set in a hot blockstore +and reliably archive out of scope objects in a coldstore. The +coldstore can also be a discard store, whereby out of scope objects +are discarded or a regular badger blockstore (the default), which can +be periodically garbage collected according to configurable user +retention policies. + +To enable the splitstore, edit `.lotus/config.toml` and add the following: +``` +[Chainstore] + EnableSplitstore = true +``` + +If you intend to use the discard coldstore, your also need to add the following: +``` + [Chainstore.Splitstore] + ColdStoreType = "discard" +``` +In general you _should not_ have to use the discard store, unless you +are running a network booster or have very constrained hardware with +not enough disk space to maintain a coldstore, even with garbage +collection. + + +## Operation + +When the splitstore is first enabled, the existing blockstore becomes +the coldstore and a fresh hotstore is initialized. + +The hotstore is warmed up on first startup so as to load all chain +headers and state roots in the current head. This allows us to +immediately gain the performance benefits of a smallerblockstore which +can be substantial for full archival nodes. + +All new writes are directed to the hotstore, while reads first hit the +hotstore, with fallback to the coldstore. + +Once 5 finalities have ellapsed, and every finality henceforth, the +blockstore _compacts_. Compaction is the process of moving all +unreachable objects within the last 4 finalities from the hotstore to +the coldstore. If the system is configured with a discard coldstore, +these objects are discarded. Note that chain headers, all the way to +genesis, are considered reachable. Stateroots and messages are +considered reachable only within the last 4 finalities, unless there +is a live reference to them. + +## Compaction + +Compaction works transactionally with the following algorithm: +- We prepare a transaction, whereby all i/o referenced objects through the API are tracked. +- We walk the chain and mark reachable objects, keeping 4 finalities of state roots and messages and all headers all the way to genesis. +- Once the chain walk is complete, we begin full transaction protection with concurrent marking; we walk and mark all references created during the chain walk. On the same time, all I/O through the API concurrently marks objects as live references. +- We collect cold objects by iterating through the hotstore and checking the mark set; if an object is not marked, then it is candidate for purge. +- When running with a coldstore, we next copy all cold objects to the coldstore. +- At this point we are ready to begin purging: + - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references) + - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live +- We then end the transaction and compact/gc the hotstore. + +## Coldstore Garbage Collection + +TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577) diff --git a/blockstore/splitstore/debug.go b/blockstore/splitstore/debug.go new file mode 100644 index 00000000000..2be85ebfe8d --- /dev/null +++ b/blockstore/splitstore/debug.go @@ -0,0 +1,273 @@ +package splitstore + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime/debug" + "strings" + "sync" + "time" + + "go.uber.org/multierr" + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +type debugLog struct { + readLog, writeLog, deleteLog, stackLog *debugLogOp + + stackMx sync.Mutex + stackMap map[string]string +} + +type debugLogOp struct { + path string + mx sync.Mutex + log *os.File + count int +} + +func openDebugLog(path string) (*debugLog, error) { + basePath := filepath.Join(path, "debug") + err := os.MkdirAll(basePath, 0755) + if err != nil { + return nil, err + } + + readLog, err := openDebugLogOp(basePath, "read.log") + if err != nil { + return nil, err + } + + writeLog, err := openDebugLogOp(basePath, "write.log") + if err != nil { + _ = readLog.Close() + return nil, err + } + + deleteLog, err := openDebugLogOp(basePath, "delete.log") + if err != nil { + _ = readLog.Close() + _ = writeLog.Close() + return nil, err + } + + stackLog, err := openDebugLogOp(basePath, "stack.log") + if err != nil { + _ = readLog.Close() + _ = writeLog.Close() + _ = deleteLog.Close() + return nil, xerrors.Errorf("error opening stack log: %w", err) + } + + return &debugLog{ + readLog: readLog, + writeLog: writeLog, + deleteLog: deleteLog, + stackLog: stackLog, + stackMap: make(map[string]string), + }, nil +} + +func (d *debugLog) LogReadMiss(cid cid.Cid) { + if d == nil { + return + } + + stack := d.getStack() + err := d.readLog.Log("%s %s %s\n", d.timestamp(), cid, stack) + if err != nil { + log.Warnf("error writing read log: %s", err) + } +} + +func (d *debugLog) LogWrite(blk blocks.Block) { + if d == nil { + return + } + + var stack string + if enableDebugLogWriteTraces { + stack = " " + d.getStack() + } + + err := d.writeLog.Log("%s %s%s\n", d.timestamp(), blk.Cid(), stack) + if err != nil { + log.Warnf("error writing write log: %s", err) + } +} + +func (d *debugLog) LogWriteMany(blks []blocks.Block) { + if d == nil { + return + } + + var stack string + if enableDebugLogWriteTraces { + stack = " " + d.getStack() + } + + now := d.timestamp() + for _, blk := range blks { + err := d.writeLog.Log("%s %s%s\n", now, blk.Cid(), stack) + if err != nil { + log.Warnf("error writing write log: %s", err) + break + } + } +} + +func (d *debugLog) LogDelete(cids []cid.Cid) { + if d == nil { + return + } + + now := d.timestamp() + for _, c := range cids { + err := d.deleteLog.Log("%s %s\n", now, c) + if err != nil { + log.Warnf("error writing delete log: %s", err) + break + } + } +} + +func (d *debugLog) Flush() { + if d == nil { + return + } + + // rotate non-empty logs + d.readLog.Rotate() + d.writeLog.Rotate() + d.deleteLog.Rotate() + d.stackLog.Rotate() +} + +func (d *debugLog) Close() error { + if d == nil { + return nil + } + + err1 := d.readLog.Close() + err2 := d.writeLog.Close() + err3 := d.deleteLog.Close() + err4 := d.stackLog.Close() + + return multierr.Combine(err1, err2, err3, err4) +} + +func (d *debugLog) getStack() string { + sk := d.getNormalizedStackTrace() + hash := sha256.Sum256([]byte(sk)) + key := string(hash[:]) + + d.stackMx.Lock() + repr, ok := d.stackMap[key] + if !ok { + repr = hex.EncodeToString(hash[:]) + d.stackMap[key] = repr + + err := d.stackLog.Log("%s\n%s\n", repr, sk) + if err != nil { + log.Warnf("error writing stack trace for %s: %s", repr, err) + } + } + d.stackMx.Unlock() + + return repr +} + +func (d *debugLog) getNormalizedStackTrace() string { + sk := string(debug.Stack()) + + // Normalization for deduplication + // skip first line -- it's the goroutine + // for each line that ends in a ), remove the call args -- these are the registers + lines := strings.Split(sk, "\n")[1:] + for i, line := range lines { + if len(line) > 0 && line[len(line)-1] == ')' { + idx := strings.LastIndex(line, "(") + if idx < 0 { + continue + } + lines[i] = line[:idx] + } + } + + return strings.Join(lines, "\n") +} + +func (d *debugLog) timestamp() string { + ts, _ := time.Now().MarshalText() + return string(ts) +} + +func openDebugLogOp(basePath, name string) (*debugLogOp, error) { + path := filepath.Join(basePath, name) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, xerrors.Errorf("error opening %s: %w", name, err) + } + + return &debugLogOp{path: path, log: file}, nil +} + +func (d *debugLogOp) Close() error { + d.mx.Lock() + defer d.mx.Unlock() + + return d.log.Close() +} + +func (d *debugLogOp) Log(template string, arg ...interface{}) error { + d.mx.Lock() + defer d.mx.Unlock() + + d.count++ + _, err := fmt.Fprintf(d.log, template, arg...) + return err +} + +func (d *debugLogOp) Rotate() { + d.mx.Lock() + defer d.mx.Unlock() + + if d.count == 0 { + return + } + + err := d.log.Close() + if err != nil { + log.Warnf("error closing log (file: %s): %s", d.path, err) + return + } + + arxivPath := fmt.Sprintf("%s-%d", d.path, time.Now().Unix()) + err = os.Rename(d.path, arxivPath) + if err != nil { + log.Warnf("error moving log (file: %s): %s", d.path, err) + return + } + + go func() { + cmd := exec.Command("gzip", arxivPath) + err := cmd.Run() + if err != nil { + log.Warnf("error compressing log (file: %s): %s", arxivPath, err) + } + }() + + d.count = 0 + d.log, err = os.OpenFile(d.path, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Warnf("error opening log (file: %s): %s", d.path, err) + return + } +} diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go index ef14a2fc668..a644e727955 100644 --- a/blockstore/splitstore/markset.go +++ b/blockstore/splitstore/markset.go @@ -1,26 +1,26 @@ package splitstore import ( - "path/filepath" + "errors" "golang.org/x/xerrors" cid "github.com/ipfs/go-cid" ) +var errMarkSetClosed = errors.New("markset closed") + // MarkSet is a utility to keep track of seen CID, and later query for them. // // * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt). -// * If a probabilistic result is acceptable, it can be backed by a bloom filter (default). +// * If a probabilistic result is acceptable, it can be backed by a bloom filter type MarkSet interface { Mark(cid.Cid) error Has(cid.Cid) (bool, error) Close() error + SetConcurrent() } -// markBytes is deliberately a non-nil empty byte slice for serialization. -var markBytes = []byte{} - type MarkSetEnv interface { Create(name string, sizeHint int64) (MarkSet, error) Close() error @@ -28,10 +28,10 @@ type MarkSetEnv interface { func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { switch mtype { - case "", "bloom": + case "bloom": return NewBloomMarkSetEnv() - case "bolt": - return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt")) + case "map": + return NewMapMarkSetEnv() default: return nil, xerrors.Errorf("unknown mark set type %s", mtype) } diff --git a/blockstore/splitstore/markset_bloom.go b/blockstore/splitstore/markset_bloom.go index c213436c898..9261de7c753 100644 --- a/blockstore/splitstore/markset_bloom.go +++ b/blockstore/splitstore/markset_bloom.go @@ -3,6 +3,7 @@ package splitstore import ( "crypto/rand" "crypto/sha256" + "sync" "golang.org/x/xerrors" @@ -21,7 +22,9 @@ var _ MarkSetEnv = (*BloomMarkSetEnv)(nil) type BloomMarkSet struct { salt []byte + mx sync.RWMutex bf *bbloom.Bloom + ts bool } var _ MarkSet = (*BloomMarkSet)(nil) @@ -64,14 +67,41 @@ func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte { } func (s *BloomMarkSet) Mark(cid cid.Cid) error { + if s.ts { + s.mx.Lock() + defer s.mx.Unlock() + } + + if s.bf == nil { + return errMarkSetClosed + } + s.bf.Add(s.saltedKey(cid)) return nil } func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) { + if s.ts { + s.mx.RLock() + defer s.mx.RUnlock() + } + + if s.bf == nil { + return false, errMarkSetClosed + } + return s.bf.Has(s.saltedKey(cid)), nil } func (s *BloomMarkSet) Close() error { + if s.ts { + s.mx.Lock() + defer s.mx.Unlock() + } + s.bf = nil return nil } + +func (s *BloomMarkSet) SetConcurrent() { + s.ts = true +} diff --git a/blockstore/splitstore/markset_bolt.go b/blockstore/splitstore/markset_bolt.go deleted file mode 100644 index cab0dd74af9..00000000000 --- a/blockstore/splitstore/markset_bolt.go +++ /dev/null @@ -1,81 +0,0 @@ -package splitstore - -import ( - "time" - - "golang.org/x/xerrors" - - cid "github.com/ipfs/go-cid" - bolt "go.etcd.io/bbolt" -) - -type BoltMarkSetEnv struct { - db *bolt.DB -} - -var _ MarkSetEnv = (*BoltMarkSetEnv)(nil) - -type BoltMarkSet struct { - db *bolt.DB - bucketId []byte -} - -var _ MarkSet = (*BoltMarkSet)(nil) - -func NewBoltMarkSetEnv(path string) (*BoltMarkSetEnv, error) { - db, err := bolt.Open(path, 0644, - &bolt.Options{ - Timeout: 1 * time.Second, - NoSync: true, - }) - if err != nil { - return nil, err - } - - return &BoltMarkSetEnv{db: db}, nil -} - -func (e *BoltMarkSetEnv) Create(name string, hint int64) (MarkSet, error) { - bucketId := []byte(name) - err := e.db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(bucketId) - if err != nil { - return xerrors.Errorf("error creating bolt db bucket %s: %w", name, err) - } - return nil - }) - - if err != nil { - return nil, err - } - - return &BoltMarkSet{db: e.db, bucketId: bucketId}, nil -} - -func (e *BoltMarkSetEnv) Close() error { - return e.db.Close() -} - -func (s *BoltMarkSet) Mark(cid cid.Cid) error { - return s.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - return b.Put(cid.Hash(), markBytes) - }) -} - -func (s *BoltMarkSet) Has(cid cid.Cid) (result bool, err error) { - err = s.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - v := b.Get(cid.Hash()) - result = v != nil - return nil - }) - - return result, err -} - -func (s *BoltMarkSet) Close() error { - return s.db.Update(func(tx *bolt.Tx) error { - return tx.DeleteBucket(s.bucketId) - }) -} diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go new file mode 100644 index 00000000000..197c824242a --- /dev/null +++ b/blockstore/splitstore/markset_map.go @@ -0,0 +1,75 @@ +package splitstore + +import ( + "sync" + + cid "github.com/ipfs/go-cid" +) + +type MapMarkSetEnv struct{} + +var _ MarkSetEnv = (*MapMarkSetEnv)(nil) + +type MapMarkSet struct { + mx sync.RWMutex + set map[string]struct{} + + ts bool +} + +var _ MarkSet = (*MapMarkSet)(nil) + +func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { + return &MapMarkSetEnv{}, nil +} + +func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { + return &MapMarkSet{ + set: make(map[string]struct{}, sizeHint), + }, nil +} + +func (e *MapMarkSetEnv) Close() error { + return nil +} + +func (s *MapMarkSet) Mark(cid cid.Cid) error { + if s.ts { + s.mx.Lock() + defer s.mx.Unlock() + } + + if s.set == nil { + return errMarkSetClosed + } + + s.set[string(cid.Hash())] = struct{}{} + return nil +} + +func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) { + if s.ts { + s.mx.RLock() + defer s.mx.RUnlock() + } + + if s.set == nil { + return false, errMarkSetClosed + } + + _, ok := s.set[string(cid.Hash())] + return ok, nil +} + +func (s *MapMarkSet) Close() error { + if s.ts { + s.mx.Lock() + defer s.mx.Unlock() + } + s.set = nil + return nil +} + +func (s *MapMarkSet) SetConcurrent() { + s.ts = true +} diff --git a/blockstore/splitstore/markset_test.go b/blockstore/splitstore/markset_test.go index 367ab8d06e7..d5c01e22029 100644 --- a/blockstore/splitstore/markset_test.go +++ b/blockstore/splitstore/markset_test.go @@ -8,8 +8,8 @@ import ( "github.com/multiformats/go-multihash" ) -func TestBoltMarkSet(t *testing.T) { - testMarkSet(t, "bolt") +func TestMapMarkSet(t *testing.T) { + testMarkSet(t, "map") } func TestBloomMarkSet(t *testing.T) { diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index f6d26bbdd60..75989b53c7e 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -1,20 +1,27 @@ package splitstore import ( + "bytes" "context" "encoding/binary" "errors" + "os" + "runtime" + "sort" "sync" "sync/atomic" "time" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" dstore "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" + mh "github.com/multiformats/go-multihash" + cbg "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/go-state-types/abi" @@ -31,25 +38,24 @@ var ( // from the previously compacted epoch to trigger a new compaction. // // |················· CompactionThreshold ··················| - // | | - // =======‖≡≡≡≡≡≡≡‖-----------------------|------------------------» - // | | | chain --> ↑__ current epoch - // |·······| | - // ↑________ CompactionCold ↑________ CompactionBoundary + // | | + // =======‖≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡≡‖------------------------» + // | | chain --> ↑__ current epoch + // | archived epochs ___↑ + // ↑________ CompactionBoundary // // === :: cold (already archived) // ≡≡≡ :: to be archived in this compaction // --- :: hot CompactionThreshold = 5 * build.Finality - // CompactionCold is the number of epochs that will be archived to the - // cold store on compaction. See diagram on CompactionThreshold for a - // better sense. - CompactionCold = build.Finality - // CompactionBoundary is the number of epochs from the current epoch at which - // we will walk the chain for live objects - CompactionBoundary = 2 * build.Finality + // we will walk the chain for live objects. + CompactionBoundary = 4 * build.Finality + + // SyncGapTime is the time delay from a tipset's min timestamp before we decide + // there is a sync gap + SyncGapTime = time.Minute ) var ( @@ -67,36 +73,34 @@ var ( markSetSizeKey = dstore.NewKey("/splitstore/markSetSize") log = logging.Logger("splitstore") + + // used to signal end of walk + errStopWalk = errors.New("stop walk") + + // set this to true if you are debugging the splitstore to enable debug logging + enableDebugLog = false + // set this to true if you want to track origin stack traces in the write log + enableDebugLogWriteTraces = false ) const ( batchSize = 16384 defaultColdPurgeSize = 7_000_000 - defaultDeadPurgeSize = 1_000_000 ) type Config struct { - // TrackingStore is the type of tracking store to use. - // - // Supported values are: "bolt" (default if omitted), "mem" (for tests and readonly access). - TrackingStoreType string - // MarkSetType is the type of mark set to use. // - // Supported values are: "bloom" (default if omitted), "bolt". + // Only current sane value is "map", but we may add an option for a disk-backed + // markset for memory-constrained situations. MarkSetType string - // perform full reachability analysis (expensive) for compaction - // You should enable this option if you plan to use the splitstore without a backing coldstore - EnableFullCompaction bool - // EXPERIMENTAL enable pruning of unreachable objects. - // This has not been sufficiently tested yet; only enable if you know what you are doing. - // Only applies if you enable full compaction. - EnableGC bool - // full archival nodes should enable this if EnableFullCompaction is enabled - // do NOT enable this if you synced from a snapshot. - // Only applies if you enabled full compaction - Archival bool + + // DiscardColdBlocks indicates whether to skip moving cold blocks to the coldstore. + // If the splitstore is running with a noop coldstore then this option is set to true + // which skips moving (as it is a noop, but still takes time to read all the cold objects) + // and directly purges cold blocks. + DiscardColdBlocks bool } // ChainAccessor allows the Splitstore to access the chain. It will most likely @@ -105,76 +109,107 @@ type ChainAccessor interface { GetTipsetByHeight(context.Context, abi.ChainEpoch, *types.TipSet, bool) (*types.TipSet, error) GetHeaviestTipSet() *types.TipSet SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) - WalkSnapshot(context.Context, *types.TipSet, abi.ChainEpoch, bool, bool, func(cid.Cid) error) error } -type SplitStore struct { - compacting int32 // compaction (or warmp up) in progress - critsection int32 // compaction critical section - closing int32 // the split store is closing +// hotstore is the interface that must be satisfied by the hot blockstore; it is an extension +// of the Blockstore interface with the traits we need for compaction. +type hotstore interface { + bstore.Blockstore + bstore.BlockstoreIterator +} - fullCompaction bool - enableGC bool - skipOldMsgs bool - skipMsgReceipts bool +type SplitStore struct { + compacting int32 // compaction (or warmp up) in progress + closing int32 // the splitstore is closing - baseEpoch abi.ChainEpoch - warmupEpoch abi.ChainEpoch + cfg *Config - coldPurgeSize int - deadPurgeSize int + mx sync.Mutex + warmupEpoch abi.ChainEpoch // protected by mx + baseEpoch abi.ChainEpoch // protected by compaction lock - mx sync.Mutex - curTs *types.TipSet + headChangeMx sync.Mutex - chain ChainAccessor - ds dstore.Datastore - hot bstore.Blockstore - cold bstore.Blockstore - tracker TrackingStore + coldPurgeSize int - env MarkSetEnv + chain ChainAccessor + ds dstore.Datastore + cold bstore.Blockstore + hot hotstore + markSetEnv MarkSetEnv markSetSize int64 + + ctx context.Context + cancel func() + + debug *debugLog + + // transactional protection for concurrent read/writes during compaction + txnLk sync.RWMutex + txnViewsMx sync.Mutex + txnViewsCond sync.Cond + txnViews int + txnViewsWaiting bool + txnActive bool + txnProtect MarkSet + txnRefsMx sync.Mutex + txnRefs map[cid.Cid]struct{} + txnMissing map[cid.Cid]struct{} } var _ bstore.Blockstore = (*SplitStore)(nil) +func init() { + if os.Getenv("LOTUS_SPLITSTORE_DEBUG_LOG") == "1" { + enableDebugLog = true + } + + if os.Getenv("LOTUS_SPLITSTORE_DEBUG_LOG_WRITE_TRACES") == "1" { + enableDebugLogWriteTraces = true + } +} + // Open opens an existing splistore, or creates a new splitstore. The splitstore // is backed by the provided hot and cold stores. The returned SplitStore MUST be // attached to the ChainStore with Start in order to trigger compaction. func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) { - // the tracking store - tracker, err := OpenTrackingStore(path, cfg.TrackingStoreType) - if err != nil { - return nil, err + // hot blockstore must support the hotstore interface + hots, ok := hot.(hotstore) + if !ok { + // be specific about what is missing + if _, ok := hot.(bstore.BlockstoreIterator); !ok { + return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot) + } + + return nil, xerrors.Errorf("hot blockstore does not support the necessary traits: %T", hot) } // the markset env - env, err := OpenMarkSetEnv(path, cfg.MarkSetType) + markSetEnv, err := OpenMarkSetEnv(path, cfg.MarkSetType) if err != nil { - _ = tracker.Close() return nil, err } // and now we can make a SplitStore ss := &SplitStore{ - ds: ds, - hot: hot, - cold: cold, - tracker: tracker, - env: env, - - fullCompaction: cfg.EnableFullCompaction, - enableGC: cfg.EnableGC, - skipOldMsgs: !(cfg.EnableFullCompaction && cfg.Archival), - skipMsgReceipts: !(cfg.EnableFullCompaction && cfg.Archival), + cfg: cfg, + ds: ds, + cold: cold, + hot: hots, + markSetEnv: markSetEnv, coldPurgeSize: defaultColdPurgeSize, } - if cfg.EnableGC { - ss.deadPurgeSize = defaultDeadPurgeSize + ss.txnViewsCond.L = &ss.txnViewsMx + ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + if enableDebugLog { + ss.debug, err = openDebugLog(path) + if err != nil { + return nil, err + } } return ss, nil @@ -192,26 +227,56 @@ func (s *SplitStore) DeleteMany(_ []cid.Cid) error { } func (s *SplitStore) Has(cid cid.Cid) (bool, error) { + if isIdentiyCid(cid) { + return true, nil + } + + s.txnLk.RLock() + defer s.txnLk.RUnlock() + has, err := s.hot.Has(cid) - if err != nil || has { + if err != nil { return has, err } + if has { + s.trackTxnRef(cid) + return true, nil + } + return s.cold.Has(cid) } func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { + if isIdentiyCid(cid) { + data, err := decodeIdentityCid(cid) + if err != nil { + return nil, err + } + + return blocks.NewBlockWithCid(data, cid) + } + + s.txnLk.RLock() + defer s.txnLk.RUnlock() + blk, err := s.hot.Get(cid) switch err { case nil: + s.trackTxnRef(cid) return blk, nil case bstore.ErrNotFound: + if s.isWarm() { + s.debug.LogReadMiss(cid) + } + blk, err = s.cold.Get(cid) if err == nil { - stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) + } return blk, err @@ -221,16 +286,33 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { } func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { + if isIdentiyCid(cid) { + data, err := decodeIdentityCid(cid) + if err != nil { + return 0, err + } + + return len(data), nil + } + + s.txnLk.RLock() + defer s.txnLk.RUnlock() + size, err := s.hot.GetSize(cid) switch err { case nil: + s.trackTxnRef(cid) return size, nil case bstore.ErrNotFound: + if s.isWarm() { + s.debug.LogReadMiss(cid) + } + size, err = s.cold.GetSize(cid) if err == nil { - stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return size, err @@ -240,46 +322,67 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { } func (s *SplitStore) Put(blk blocks.Block) error { - s.mx.Lock() - if s.curTs == nil { - s.mx.Unlock() - return s.cold.Put(blk) + if isIdentiyCid(blk.Cid()) { + return nil } - epoch := s.curTs.Height() - s.mx.Unlock() + s.txnLk.RLock() + defer s.txnLk.RUnlock() - err := s.tracker.Put(blk.Cid(), epoch) + err := s.hot.Put(blk) if err != nil { - log.Errorf("error tracking CID in hotstore: %s; falling back to coldstore", err) - return s.cold.Put(blk) + return err } - return s.hot.Put(blk) + s.debug.LogWrite(blk) + + s.trackTxnRef(blk.Cid()) + return nil } func (s *SplitStore) PutMany(blks []blocks.Block) error { - s.mx.Lock() - if s.curTs == nil { - s.mx.Unlock() - return s.cold.PutMany(blks) + // filter identites + idcids := 0 + for _, blk := range blks { + if isIdentiyCid(blk.Cid()) { + idcids++ + } } - epoch := s.curTs.Height() - s.mx.Unlock() + if idcids > 0 { + if idcids == len(blks) { + // it's all identities + return nil + } + + filtered := make([]blocks.Block, 0, len(blks)-idcids) + for _, blk := range blks { + if isIdentiyCid(blk.Cid()) { + continue + } + filtered = append(filtered, blk) + } + + blks = filtered + } batch := make([]cid.Cid, 0, len(blks)) for _, blk := range blks { batch = append(batch, blk.Cid()) } - err := s.tracker.PutBatch(batch, epoch) + s.txnLk.RLock() + defer s.txnLk.RUnlock() + + err := s.hot.PutMany(blks) if err != nil { - log.Errorf("error tracking CIDs in hotstore: %s; falling back to coldstore", err) - return s.cold.PutMany(blks) + return err } - return s.hot.PutMany(blks) + s.debug.LogWriteMany(blks) + + s.trackTxnRefMany(batch) + return nil } func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { @@ -297,15 +400,21 @@ func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, err } - ch := make(chan cid.Cid) + seen := cid.NewSet() + ch := make(chan cid.Cid, 8) // buffer is arbitrary, just enough to avoid context switches go func() { defer cancel() defer close(ch) for _, in := range []<-chan cid.Cid{chHot, chCold} { - for cid := range in { + for c := range in { + // ensure we only emit each key once + if !seen.Visit(c) { + continue + } + select { - case ch <- cid: + case ch <- c: case <-ctx.Done(): return } @@ -322,20 +431,54 @@ func (s *SplitStore) HashOnRead(enabled bool) { } func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { + if isIdentiyCid(cid) { + data, err := decodeIdentityCid(cid) + if err != nil { + return err + } + + return cb(data) + } + + // views are (optimistically) protected two-fold: + // - if there is an active transaction, then the reference is protected. + // - if there is no active transaction, active views are tracked in a + // wait group and compaction is inhibited from starting until they + // have all completed. this is necessary to ensure that a (very) long-running + // view can't have its data pointer deleted, which would be catastrophic. + // Note that we can't just RLock for the duration of the view, as this could + // lead to deadlock with recursive views. + s.protectView(cid) + defer s.viewDone() + err := s.hot.View(cid, cb) switch err { case bstore.ErrNotFound: - return s.cold.View(cid, cb) + if s.isWarm() { + s.debug.LogReadMiss(cid) + } + + err = s.cold.View(cid, cb) + if err == nil { + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) + } + return err default: return err } } +func (s *SplitStore) isWarm() bool { + s.mx.Lock() + defer s.mx.Unlock() + return s.warmupEpoch > 0 +} + // State tracking func (s *SplitStore) Start(chain ChainAccessor) error { s.chain = chain - s.curTs = chain.GetHeaviestTipSet() + curTs := chain.GetHeaviestTipSet() // load base epoch from metadata ds // if none, then use current epoch because it's a fresh start @@ -345,12 +488,12 @@ func (s *SplitStore) Start(chain ChainAccessor) error { s.baseEpoch = bytesToEpoch(bs) case dstore.ErrNotFound: - if s.curTs == nil { + if curTs == nil { // this can happen in some tests break } - err = s.setBaseEpoch(s.curTs.Height()) + err = s.setBaseEpoch(curTs.Height()) if err != nil { return xerrors.Errorf("error saving base epoch: %w", err) } @@ -360,20 +503,23 @@ func (s *SplitStore) Start(chain ChainAccessor) error { } // load warmup epoch from metadata ds - // if none, then the splitstore will warm up the hotstore at first head change notif - // by walking the current tipset bs, err = s.ds.Get(warmupEpochKey) switch err { case nil: s.warmupEpoch = bytesToEpoch(bs) case dstore.ErrNotFound: + // the hotstore hasn't warmed up, start a concurrent warm up + err = s.warmup(curTs) + if err != nil { + return xerrors.Errorf("error warming up: %w", err) + } + default: return xerrors.Errorf("error loading warmup epoch: %w", err) } - // load markSetSize from metadata ds - // if none, the splitstore will compute it during warmup and update in every compaction + // load markSetSize from metadata ds to provide a size hint for marksets bs, err = s.ds.Get(markSetSizeKey) switch err { case nil: @@ -393,55 +539,67 @@ func (s *SplitStore) Start(chain ChainAccessor) error { } func (s *SplitStore) Close() error { - atomic.StoreInt32(&s.closing, 1) + if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) { + // already closing + return nil + } - if atomic.LoadInt32(&s.critsection) == 1 { - log.Warn("ongoing compaction in critical section; waiting for it to finish...") - for atomic.LoadInt32(&s.critsection) == 1 { + if atomic.LoadInt32(&s.compacting) == 1 { + log.Warn("close with ongoing compaction in progress; waiting for it to finish...") + for atomic.LoadInt32(&s.compacting) == 1 { time.Sleep(time.Second) } } - return multierr.Combine(s.tracker.Close(), s.env.Close()) + s.cancel() + return multierr.Combine(s.markSetEnv.Close(), s.debug.Close()) } func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { + s.headChangeMx.Lock() + defer s.headChangeMx.Unlock() + // Revert only. if len(apply) == 0 { return nil } - s.mx.Lock() curTs := apply[len(apply)-1] epoch := curTs.Height() - s.curTs = curTs - s.mx.Unlock() + + // NOTE: there is an implicit invariant assumption that HeadChange is invoked + // synchronously and no other HeadChange can be invoked while one is in + // progress. + // this is guaranteed by the chainstore, and it is pervasive in all lotus + // -- if that ever changes then all hell will break loose in general and + // we will have a rance to protectTipSets here. + // Reagrdless, we put a mutex in HeadChange just to be safe if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { - // we are currently compacting, do nothing and wait for the next head change + // we are currently compacting -- protect the new tipset(s) + s.protectTipSets(apply) return nil } - if s.warmupEpoch == 0 { - // splitstore needs to warm up - go func() { - defer atomic.StoreInt32(&s.compacting, 0) - - log.Info("warming up hotstore") - start := time.Now() - - s.warmup(curTs) - - log.Infow("warm up done", "took", time.Since(start)) - }() + // check if we are actually closing first + if atomic.LoadInt32(&s.closing) == 1 { + atomic.StoreInt32(&s.compacting, 0) + return nil + } + timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) + if time.Since(timestamp) > SyncGapTime { + // don't attempt compaction before we have caught up syncing + atomic.StoreInt32(&s.compacting, 0) return nil } if epoch-s.baseEpoch > CompactionThreshold { - // it's time to compact + // it's time to compact -- prepare the transaction and go! + s.beginTxnProtect() go func() { defer atomic.StoreInt32(&s.compacting, 0) + defer s.endTxnProtect() log.Info("compacting splitstore") start := time.Now() @@ -458,266 +616,528 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { return nil } -func (s *SplitStore) warmup(curTs *types.TipSet) { - epoch := curTs.Height() - - batchHot := make([]blocks.Block, 0, batchSize) - batchSnoop := make([]cid.Cid, 0, batchSize) - - count := int64(0) - err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, - func(cid cid.Cid) error { - count++ +// transactionally protect incoming tipsets +func (s *SplitStore) protectTipSets(apply []*types.TipSet) { + s.txnLk.RLock() + defer s.txnLk.RUnlock() - has, err := s.hot.Has(cid) - if err != nil { - return err - } + if !s.txnActive { + return + } - if has { - return nil - } + var cids []cid.Cid + for _, ts := range apply { + cids = append(cids, ts.Cids()...) + } - blk, err := s.cold.Get(cid) - if err != nil { - return err - } + s.trackTxnRefMany(cids) +} - batchHot = append(batchHot, blk) - batchSnoop = append(batchSnoop, cid) +// transactionally protect a view +func (s *SplitStore) protectView(c cid.Cid) { + s.txnLk.RLock() + defer s.txnLk.RUnlock() - if len(batchHot) == batchSize { - err = s.tracker.PutBatch(batchSnoop, epoch) - if err != nil { - return err - } - batchSnoop = batchSnoop[:0] + if s.txnActive { + s.trackTxnRef(c) + } - err = s.hot.PutMany(batchHot) - if err != nil { - return err - } - batchHot = batchHot[:0] - } + s.txnViewsMx.Lock() + s.txnViews++ + s.txnViewsMx.Unlock() +} - return nil - }) +func (s *SplitStore) viewDone() { + s.txnViewsMx.Lock() + defer s.txnViewsMx.Unlock() - if err != nil { - log.Errorf("error warming up splitstore: %s", err) - return + s.txnViews-- + if s.txnViews == 0 && s.txnViewsWaiting { + s.txnViewsCond.Signal() } +} - if len(batchHot) > 0 { - err = s.tracker.PutBatch(batchSnoop, epoch) - if err != nil { - log.Errorf("error warming up splitstore: %s", err) - return - } - - err = s.hot.PutMany(batchHot) - if err != nil { - log.Errorf("error warming up splitstore: %s", err) - return - } - } +func (s *SplitStore) viewWait() { + s.txnViewsMx.Lock() + defer s.txnViewsMx.Unlock() - if count > s.markSetSize { - s.markSetSize = count + count>>2 // overestimate a bit + s.txnViewsWaiting = true + for s.txnViews > 0 { + s.txnViewsCond.Wait() } + s.txnViewsWaiting = false +} - // save the warmup epoch - s.warmupEpoch = epoch - err = s.ds.Put(warmupEpochKey, epochToBytes(epoch)) - if err != nil { - log.Errorf("error saving warmup epoch: %s", err) +// transactionally protect a reference to an object +func (s *SplitStore) trackTxnRef(c cid.Cid) { + if !s.txnActive { + // not compacting + return } - err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) - if err != nil { - log.Errorf("error saving mark set size: %s", err) + if isUnitaryObject(c) { + return } -} -// Compaction/GC Algorithm -func (s *SplitStore) compact(curTs *types.TipSet) { - var err error - if s.markSetSize == 0 { - start := time.Now() - log.Info("estimating mark set size") - err = s.estimateMarkSetSize(curTs) + if s.txnProtect != nil { + mark, err := s.txnProtect.Has(c) if err != nil { - log.Errorf("error estimating mark set size: %s; aborting compaction", err) + log.Warnf("error checking markset: %s", err) + // track it anyways + } else if mark { return } - log.Infow("estimating mark set size done", "took", time.Since(start), "size", s.markSetSize) - } else { - log.Infow("current mark set size estimate", "size", s.markSetSize) - } - - start := time.Now() - if s.fullCompaction { - err = s.compactFull(curTs) - } else { - err = s.compactSimple(curTs) } - took := time.Since(start).Milliseconds() - stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) - if err != nil { - log.Errorf("COMPACTION ERROR: %s", err) - } + s.txnRefsMx.Lock() + s.txnRefs[c] = struct{}{} + s.txnRefsMx.Unlock() } -func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error { - var count int64 - err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, - func(cid cid.Cid) error { - count++ - return nil - }) - - if err != nil { - return err +// transactionally protect a batch of references +func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) { + if !s.txnActive { + // not compacting + return } - s.markSetSize = count + count>>2 // overestimate a bit - return nil -} - -func (s *SplitStore) compactSimple(curTs *types.TipSet) error { - coldEpoch := s.baseEpoch + CompactionCold - currentEpoch := curTs.Height() - boundaryEpoch := currentEpoch - CompactionBoundary + s.txnRefsMx.Lock() + defer s.txnRefsMx.Unlock() - log.Infow("running simple compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch) + quiet := false + for _, c := range cids { + if isUnitaryObject(c) { + continue + } - coldSet, err := s.env.Create("cold", s.markSetSize) - if err != nil { - return xerrors.Errorf("error creating mark set: %w", err) - } - defer coldSet.Close() //nolint:errcheck + if s.txnProtect != nil { + mark, err := s.txnProtect.Has(c) + if err != nil { + if !quiet { + quiet = true + log.Warnf("error checking markset: %s", err) + } + continue + } - // 1. mark reachable cold objects by looking at the objects reachable only from the cold epoch - log.Infow("marking reachable cold blocks", "boundaryEpoch", boundaryEpoch) - startMark := time.Now() + if mark { + continue + } + } - boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true) - if err != nil { - return xerrors.Errorf("error getting tipset at boundary epoch: %w", err) + s.txnRefs[c] = struct{}{} } - var count int64 - err = s.chain.WalkSnapshot(context.Background(), boundaryTs, 1, s.skipOldMsgs, s.skipMsgReceipts, - func(cid cid.Cid) error { - count++ - return coldSet.Mark(cid) - }) + return +} - if err != nil { - return xerrors.Errorf("error marking cold blocks: %w", err) - } +// protect all pending transactional references +func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { + for { + var txnRefs map[cid.Cid]struct{} - if count > s.markSetSize { - s.markSetSize = count + count>>2 // overestimate a bit - } + s.txnRefsMx.Lock() + if len(s.txnRefs) > 0 { + txnRefs = s.txnRefs + s.txnRefs = make(map[cid.Cid]struct{}) + } + s.txnRefsMx.Unlock() - log.Infow("marking done", "took", time.Since(startMark)) + if len(txnRefs) == 0 { + return nil + } - // 2. move cold unreachable objects to the coldstore - log.Info("collecting cold objects") - startCollect := time.Now() + log.Infow("protecting transactional references", "refs", len(txnRefs)) + count := 0 + workch := make(chan cid.Cid, len(txnRefs)) + startProtect := time.Now() - cold := make([]cid.Cid, 0, s.coldPurgeSize) + for c := range txnRefs { + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset: %w", err) + } - // some stats for logging - var hotCnt, coldCnt int + if mark { + continue + } - // 2.1 iterate through the tracking store and collect unreachable cold objects - err = s.tracker.ForEach(func(cid cid.Cid, writeEpoch abi.ChainEpoch) error { - // is the object still hot? - if writeEpoch > coldEpoch { - // yes, stay in the hotstore - hotCnt++ + workch <- c + count++ + } + close(workch) + + if count == 0 { return nil } - // check whether it is reachable in the cold boundary - mark, err := coldSet.Has(cid) - if err != nil { - return xerrors.Errorf("error checkiing cold set for %s: %w", cid, err) + workers := runtime.NumCPU() / 2 + if workers < 2 { + workers = 2 + } + if workers > count { + workers = count } - if mark { - hotCnt++ + worker := func() error { + for c := range workch { + err := s.doTxnProtect(c, markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional references to %s: %w", c, err) + } + } return nil } - // it's cold, mark it for move - cold = append(cold, cid) - coldCnt++ - return nil - }) + g := new(errgroup.Group) + for i := 0; i < workers; i++ { + g.Go(worker) + } - if err != nil { - return xerrors.Errorf("error collecting cold objects: %w", err) + if err := g.Wait(); err != nil { + return err + } + + log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count) } +} - if coldCnt > 0 { - s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit +// transactionally protect a reference by walking the object and marking. +// concurrent markings are short circuited by checking the markset. +func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { + if err := s.checkClosing(); err != nil { + return err } - log.Infow("collection done", "took", time.Since(startCollect)) - log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) - stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) - stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + // Note: cold objects are deleted heaviest first, so the consituents of an object + // cannot be deleted before the object itself. + return s.walkObjectIncomplete(root, cid.NewSet(), + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } - // Enter critical section - atomic.StoreInt32(&s.critsection, 1) - defer atomic.StoreInt32(&s.critsection, 0) + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset: %w", err) + } - // check to see if we are closing first; if that's the case just return - if atomic.LoadInt32(&s.closing) == 1 { - log.Info("splitstore is closing; aborting compaction") - return xerrors.Errorf("compaction aborted") - } + // it's marked, nothing to do + if mark { + return errStopWalk + } - // 2.2 copy the cold objects to the coldstore - log.Info("moving cold blocks to the coldstore") - startMove := time.Now() - err = s.moveColdBlocks(cold) - if err != nil { - return xerrors.Errorf("error moving cold blocks: %w", err) - } - log.Infow("moving done", "took", time.Since(startMove)) + return markSet.Mark(c) + }, + func(c cid.Cid) error { + if s.txnMissing != nil { + log.Warnf("missing object reference %s in %s", c, root) + s.txnRefsMx.Lock() + s.txnMissing[c] = struct{}{} + s.txnRefsMx.Unlock() + } + return errStopWalk + }) +} + +// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore; +// this is necessary when we sync from a snapshot or when we enable the splitstore +// on top of an existing blockstore (which becomes the coldstore). +func (s *SplitStore) warmup(curTs *types.TipSet) error { + if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { + return xerrors.Errorf("error locking compaction") + } + + go func() { + defer atomic.StoreInt32(&s.compacting, 0) + + log.Info("warming up hotstore") + start := time.Now() + + err := s.doWarmup(curTs) + if err != nil { + log.Errorf("error warming up hotstore: %s", err) + return + } + + log.Infow("warm up done", "took", time.Since(start)) + }() + + return nil +} + +// the actual warmup procedure; it walks the chain loading all state roots at the boundary +// and headers all the way up to genesis. +// objects are written in batches so as to minimize overhead. +func (s *SplitStore) doWarmup(curTs *types.TipSet) error { + epoch := curTs.Height() + batchHot := make([]blocks.Block, 0, batchSize) + count := int64(0) + xcount := int64(0) + missing := int64(0) + err := s.walkChain(curTs, epoch, false, + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + count++ + + has, err := s.hot.Has(c) + if err != nil { + return err + } + + if has { + return nil + } + + blk, err := s.cold.Get(c) + if err != nil { + if err == bstore.ErrNotFound { + missing++ + return nil + } + return err + } + + xcount++ + + batchHot = append(batchHot, blk) + if len(batchHot) == batchSize { + err = s.hot.PutMany(batchHot) + if err != nil { + return err + } + batchHot = batchHot[:0] + } + + return nil + }) - // 2.3 delete cold objects from the hotstore - log.Info("purging cold objects from the hotstore") - startPurge := time.Now() - err = s.purgeBlocks(cold) if err != nil { - return xerrors.Errorf("error purging cold blocks: %w", err) + return err + } + + if len(batchHot) > 0 { + err = s.hot.PutMany(batchHot) + if err != nil { + return err + } } - log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) - // 2.4 remove the tracker tracking for cold objects - startPurge = time.Now() - log.Info("purging cold objects from tracker") - err = s.purgeTracking(cold) + log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing) + + s.markSetSize = count + count>>2 // overestimate a bit + err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) if err != nil { - return xerrors.Errorf("error purging tracking for cold blocks: %w", err) + log.Warnf("error saving mark set size: %s", err) } - log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) - // we are done; do some housekeeping - err = s.tracker.Sync() + // save the warmup epoch + err = s.ds.Put(warmupEpochKey, epochToBytes(epoch)) + if err != nil { + return xerrors.Errorf("error saving warm up epoch: %w", err) + } + s.mx.Lock() + s.warmupEpoch = epoch + s.mx.Unlock() + + return nil +} + +// --- Compaction --- +// Compaction works transactionally with the following algorithm: +// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked. +// - We walk the chain and mark reachable objects, keeping 4 finalities of state roots and messages and all headers all the way to genesis. +// - Once the chain walk is complete, we begin full transaction protection with concurrent marking; we walk and mark all references created during the chain walk. On the same time, all I/O through the API concurrently marks objects as live references. +// - We collect cold objects by iterating through the hotstore and checking the mark set; if an object is not marked, then it is candidate for purge. +// - When running with a coldstore, we next copy all cold objects to the coldstore. +// - At this point we are ready to begin purging: +// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references) +// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live +// - We then end the transaction and compact/gc the hotstore. +func (s *SplitStore) compact(curTs *types.TipSet) { + log.Info("waiting for active views to complete") + start := time.Now() + s.viewWait() + log.Infow("waiting for active views done", "took", time.Since(start)) + + start = time.Now() + err := s.doCompact(curTs) + took := time.Since(start).Milliseconds() + stats.Record(s.ctx, metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) + + if err != nil { + log.Errorf("COMPACTION ERROR: %s", err) + } +} + +func (s *SplitStore) doCompact(curTs *types.TipSet) error { + currentEpoch := curTs.Height() + boundaryEpoch := currentEpoch - CompactionBoundary + + log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch) + + markSet, err := s.markSetEnv.Create("live", s.markSetSize) + if err != nil { + return xerrors.Errorf("error creating mark set: %w", err) + } + defer markSet.Close() //nolint:errcheck + defer s.debug.Flush() + + if err := s.checkClosing(); err != nil { + return err + } + + // we are ready for concurrent marking + s.beginTxnMarking(markSet) + + // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots + // and messages until the boundary epoch. + log.Info("marking reachable objects") + startMark := time.Now() + + var count int64 + err = s.walkChain(curTs, boundaryEpoch, true, + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + count++ + return markSet.Mark(c) + }) + + if err != nil { + return xerrors.Errorf("error marking: %w", err) + } + + s.markSetSize = count + count>>2 // overestimate a bit + + log.Infow("marking done", "took", time.Since(startMark), "marked", count) + + if err := s.checkClosing(); err != nil { + return err + } + + // 1.1 protect transactional refs + err = s.protectTxnRefs(markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional refs: %w", err) + } + + if err := s.checkClosing(); err != nil { + return err + } + + // 2. iterate through the hotstore to collect cold objects + log.Info("collecting cold objects") + startCollect := time.Now() + + // some stats for logging + var hotCnt, coldCnt int + + cold := make([]cid.Cid, 0, s.coldPurgeSize) + err = s.hot.ForEachKey(func(c cid.Cid) error { + // was it marked? + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking mark set for %s: %w", c, err) + } + + if mark { + hotCnt++ + return nil + } + + // it's cold, mark it as candidate for move + cold = append(cold, c) + coldCnt++ + + return nil + }) + + if err != nil { + return xerrors.Errorf("error collecting cold objects: %w", err) + } + + log.Infow("cold collection done", "took", time.Since(startCollect)) + + if coldCnt > 0 { + s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit + } + + log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) + stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt))) + stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + + if err := s.checkClosing(); err != nil { + return err + } + + // now that we have collected cold objects, check for missing references from transactional i/o + // and disable further collection of such references (they will not be acted upon as we can't + // possibly delete objects we didn't have when we were collecting cold objects) + s.waitForMissingRefs(markSet) + + if err := s.checkClosing(); err != nil { + return err + } + + // 3. copy the cold objects to the coldstore -- if we have one + if !s.cfg.DiscardColdBlocks { + log.Info("moving cold objects to the coldstore") + startMove := time.Now() + err = s.moveColdBlocks(cold) + if err != nil { + return xerrors.Errorf("error moving cold objects: %w", err) + } + log.Infow("moving done", "took", time.Since(startMove)) + + if err := s.checkClosing(); err != nil { + return err + } + } + + // 4. sort cold objects so that the dags with most references are deleted first + // this ensures that we can't refer to a dag with its consituents already deleted, ie + // we lave no dangling references. + log.Info("sorting cold objects") + startSort := time.Now() + err = s.sortObjects(cold) if err != nil { - return xerrors.Errorf("error syncing tracker: %w", err) + return xerrors.Errorf("error sorting objects: %w", err) } + log.Infow("sorting done", "took", time.Since(startSort)) + // 4.1 protect transactional refs once more + // strictly speaking, this is not necessary as purge will do it before deleting each + // batch. however, there is likely a largish number of references accumulated during + // ths sort and this protects before entering pruge context. + err = s.protectTxnRefs(markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional refs: %w", err) + } + + if err := s.checkClosing(); err != nil { + return err + } + + // 5. purge cold objects from the hotstore, taking protected references into account + log.Info("purging cold objects from the hotstore") + startPurge := time.Now() + err = s.purge(cold, markSet) + if err != nil { + return xerrors.Errorf("error purging cold blocks: %w", err) + } + log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) + + // we are done; do some housekeeping + s.endTxnProtect() s.gcHotstore() - err = s.setBaseEpoch(coldEpoch) + err = s.setBaseEpoch(boundaryEpoch) if err != nil { return xerrors.Errorf("error saving base epoch: %w", err) } @@ -730,330 +1150,579 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) error { return nil } -func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { - batch := make([]blocks.Block, 0, batchSize) +func (s *SplitStore) beginTxnProtect() { + log.Info("preparing compaction transaction") + + s.txnLk.Lock() + defer s.txnLk.Unlock() + + s.txnActive = true + s.txnRefs = make(map[cid.Cid]struct{}) + s.txnMissing = make(map[cid.Cid]struct{}) +} + +func (s *SplitStore) beginTxnMarking(markSet MarkSet) { + markSet.SetConcurrent() + + s.txnLk.Lock() + s.txnProtect = markSet + s.txnLk.Unlock() +} + +func (s *SplitStore) endTxnProtect() { + s.txnLk.Lock() + defer s.txnLk.Unlock() + + if !s.txnActive { + return + } + + // release markset memory + if s.txnProtect != nil { + _ = s.txnProtect.Close() + } + + s.txnActive = false + s.txnProtect = nil + s.txnRefs = nil + s.txnMissing = nil +} + +func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, + f func(cid.Cid) error) error { + visited := cid.NewSet() + walked := cid.NewSet() + toWalk := ts.Cids() + walkCnt := 0 + scanCnt := 0 + + walkBlock := func(c cid.Cid) error { + if !visited.Visit(c) { + return nil + } + + walkCnt++ + + if err := f(c); err != nil { + return err + } + + var hdr types.BlockHeader + err := s.view(c, func(data []byte) error { + return hdr.UnmarshalCBOR(bytes.NewBuffer(data)) + }) - for _, cid := range cold { - blk, err := s.hot.Get(cid) if err != nil { - if err == dstore.ErrNotFound { - // this can happen if the node is killed after we have deleted the block from the hotstore - // but before we have deleted it from the tracker; just delete the tracker. - err = s.tracker.Delete(cid) - if err != nil { - return xerrors.Errorf("error deleting unreachable cid %s from tracker: %w", cid, err) + return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err) + } + + // we only scan the block if it is at or above the boundary + if hdr.Height >= boundary || hdr.Height == 0 { + scanCnt++ + if inclMsgs && hdr.Height > 0 { + if err := s.walkObject(hdr.Messages, walked, f); err != nil { + return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) + } + + if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil { + return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } - } else { - return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w", cid, err) } - continue + if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil { + return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) + } } - batch = append(batch, blk) - if len(batch) == batchSize { - err = s.cold.PutMany(batch) - if err != nil { - return xerrors.Errorf("error putting batch to coldstore: %w", err) - } - batch = batch[:0] + if hdr.Height > 0 { + toWalk = append(toWalk, hdr.Parents...) } + + return nil } - if len(batch) > 0 { - err := s.cold.PutMany(batch) - if err != nil { - return xerrors.Errorf("error putting cold to coldstore: %w", err) + for len(toWalk) > 0 { + // walking can take a while, so check this with every opportunity + if err := s.checkClosing(); err != nil { + return err + } + + walking := toWalk + toWalk = nil + for _, c := range walking { + if err := walkBlock(c); err != nil { + return xerrors.Errorf("error walking block (cid: %s): %w", c, err) + } } } + log.Infow("chain walk done", "walked", walkCnt, "scanned", scanCnt) + return nil } -func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error { - if len(cids) == 0 { +func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error { + if !walked.Visit(c) { return nil } - // don't delete one giant batch of 7M objects, but rather do smaller batches - done := false - for i := 0; !done; i++ { - start := i * batchSize - end := start + batchSize - if end >= len(cids) { - end = len(cids) - done = true + if err := f(c); err != nil { + if err == errStopWalk { + return nil } - err := deleteBatch(cids[start:end]) - if err != nil { - return xerrors.Errorf("error deleting batch: %w", err) - } + return err } - return nil -} + if c.Prefix().Codec != cid.DagCBOR { + return nil + } -func (s *SplitStore) purgeBlocks(cids []cid.Cid) error { - return s.purgeBatch(cids, s.hot.DeleteMany) -} + // check this before recursing + if err := s.checkClosing(); err != nil { + return err + } -func (s *SplitStore) purgeTracking(cids []cid.Cid) error { - return s.purgeBatch(cids, s.tracker.DeleteBatch) -} + var links []cid.Cid + err := s.view(c, func(data []byte) error { + return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { + links = append(links, c) + }) + }) -func (s *SplitStore) gcHotstore() { - if compact, ok := s.hot.(interface{ Compact() error }); ok { - log.Infof("compacting hotstore") - startCompact := time.Now() - err := compact.Compact() - if err != nil { - log.Warnf("error compacting hotstore: %s", err) - return - } - log.Infow("hotstore compaction done", "took", time.Since(startCompact)) + if err != nil { + return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) } - if gc, ok := s.hot.(interface{ CollectGarbage() error }); ok { - log.Infof("garbage collecting hotstore") - startGC := time.Now() - err := gc.CollectGarbage() + for _, c := range links { + err := s.walkObject(c, walked, f) if err != nil { - log.Warnf("error garbage collecting hotstore: %s", err) - return + return xerrors.Errorf("error walking link (cid: %s): %w", c, err) } - log.Infow("hotstore garbage collection done", "took", time.Since(startGC)) } + + return nil } -func (s *SplitStore) compactFull(curTs *types.TipSet) error { - currentEpoch := curTs.Height() - coldEpoch := s.baseEpoch + CompactionCold - boundaryEpoch := currentEpoch - CompactionBoundary +// like walkObject, but the object may be potentially incomplete (references missing) +func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error { + if !walked.Visit(c) { + return nil + } + + // occurs check -- only for DAGs + if c.Prefix().Codec == cid.DagCBOR { + has, err := s.has(c) + if err != nil { + return xerrors.Errorf("error occur checking %s: %w", c, err) + } - log.Infow("running full compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch) + if !has { + err = missing(c) + if err == errStopWalk { + return nil + } - // create two mark sets, one for marking the cold finality region - // and one for marking the hot region - hotSet, err := s.env.Create("hot", s.markSetSize) - if err != nil { - return xerrors.Errorf("error creating hot mark set: %w", err) + return err + } } - defer hotSet.Close() //nolint:errcheck - coldSet, err := s.env.Create("cold", s.markSetSize) - if err != nil { - return xerrors.Errorf("error creating cold mark set: %w", err) + if err := f(c); err != nil { + if err == errStopWalk { + return nil + } + + return err } - defer coldSet.Close() //nolint:errcheck - // Phase 1: marking - log.Info("marking live blocks") - startMark := time.Now() + if c.Prefix().Codec != cid.DagCBOR { + return nil + } - // Phase 1a: mark all reachable CIDs in the hot range - boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true) - if err != nil { - return xerrors.Errorf("error getting tipset at boundary epoch: %w", err) + // check this before recursing + if err := s.checkClosing(); err != nil { + return err } - count := int64(0) - err = s.chain.WalkSnapshot(context.Background(), boundaryTs, boundaryEpoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts, - func(cid cid.Cid) error { - count++ - return hotSet.Mark(cid) + var links []cid.Cid + err := s.view(c, func(data []byte) error { + return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { + links = append(links, c) }) + }) if err != nil { - return xerrors.Errorf("error marking hot blocks: %w", err) + return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) } - if count > s.markSetSize { - s.markSetSize = count + count>>2 // overestimate a bit + for _, c := range links { + err := s.walkObjectIncomplete(c, walked, f, missing) + if err != nil { + return xerrors.Errorf("error walking link (cid: %s): %w", c, err) + } } - // Phase 1b: mark all reachable CIDs in the cold range - coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true) - if err != nil { - return xerrors.Errorf("error getting tipset at cold epoch: %w", err) + return nil +} + +// internal version used by walk +func (s *SplitStore) view(c cid.Cid, cb func([]byte) error) error { + if isIdentiyCid(c) { + data, err := decodeIdentityCid(c) + if err != nil { + return err + } + + return cb(data) } - count = 0 - err = s.chain.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts, - func(cid cid.Cid) error { - count++ - return coldSet.Mark(cid) - }) + err := s.hot.View(c, cb) + switch err { + case bstore.ErrNotFound: + return s.cold.View(c, cb) - if err != nil { - return xerrors.Errorf("error marking cold blocks: %w", err) + default: + return err } +} - if count > s.markSetSize { - s.markSetSize = count + count>>2 // overestimate a bit +func (s *SplitStore) has(c cid.Cid) (bool, error) { + if isIdentiyCid(c) { + return true, nil } - log.Infow("marking done", "took", time.Since(startMark)) + has, err := s.hot.Has(c) - // Phase 2: sweep cold objects: - // - If a cold object is reachable in the hot range, it stays in the hotstore. - // - If a cold object is reachable in the cold range, it is moved to the coldstore. - // - If a cold object is unreachable, it is deleted if GC is enabled, otherwise moved to the coldstore. - log.Info("collecting cold objects") - startCollect := time.Now() + if has || err != nil { + return has, err + } - // some stats for logging - var hotCnt, coldCnt, deadCnt int + return s.cold.Has(c) +} - cold := make([]cid.Cid, 0, s.coldPurgeSize) - dead := make([]cid.Cid, 0, s.deadPurgeSize) +func (s *SplitStore) checkClosing() error { + if atomic.LoadInt32(&s.closing) == 1 { + log.Info("splitstore is closing; aborting compaction") + return xerrors.Errorf("compaction aborted") + } - // 2.1 iterate through the tracker and collect cold and dead objects - err = s.tracker.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error { - // is the object stil hot? - if wrEpoch > coldEpoch { - // yes, stay in the hotstore - hotCnt++ - return nil + return nil +} + +func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { + batch := make([]blocks.Block, 0, batchSize) + + for _, c := range cold { + if err := s.checkClosing(); err != nil { + return err } - // the object is cold -- check whether it is reachable in the hot range - mark, err := hotSet.Has(cid) + blk, err := s.hot.Get(c) if err != nil { - return xerrors.Errorf("error checking live mark for %s: %w", cid, err) + if err == bstore.ErrNotFound { + log.Warnf("hotstore missing block %s", c) + continue + } + + return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err) } - if mark { - // the object is reachable in the hot range, stay in the hotstore - hotCnt++ - return nil + batch = append(batch, blk) + if len(batch) == batchSize { + err = s.cold.PutMany(batch) + if err != nil { + return xerrors.Errorf("error putting batch to coldstore: %w", err) + } + batch = batch[:0] } + } - // check whether it is reachable in the cold range - mark, err = coldSet.Has(cid) + if len(batch) > 0 { + err := s.cold.PutMany(batch) if err != nil { - return xerrors.Errorf("error checkiing cold set for %s: %w", cid, err) + return xerrors.Errorf("error putting batch to coldstore: %w", err) } + } - if s.enableGC { - if mark { - // the object is reachable in the cold range, move it to the cold store - cold = append(cold, cid) - coldCnt++ - } else { - // the object is dead and will be deleted - dead = append(dead, cid) - deadCnt++ - } - } else { - // if GC is disabled, we move both cold and dead objects to the coldstore - cold = append(cold, cid) - if mark { - coldCnt++ - } else { - deadCnt++ - } + return nil +} + +// sorts a slice of objects heaviest first -- it's a little expensive but worth the +// guarantee that we don't leave dangling references behind, e.g. if we die in the middle +// of a purge. +func (s *SplitStore) sortObjects(cids []cid.Cid) error { + // we cache the keys to avoid making a gazillion of strings + keys := make(map[cid.Cid]string) + key := func(c cid.Cid) string { + s, ok := keys[c] + if !ok { + s = string(c.Hash()) + keys[c] = s } + return s + } - return nil - }) + // compute sorting weights as the cumulative number of DAG links + weights := make(map[string]int) + for _, c := range cids { + // this can take quite a while, so check for shutdown with every opportunity + if err := s.checkClosing(); err != nil { + return err + } - if err != nil { - return xerrors.Errorf("error collecting cold objects: %w", err) + w := s.getObjectWeight(c, weights, key) + weights[key(c)] = w } - if coldCnt > 0 { - s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit - } - if deadCnt > 0 { - s.deadPurgeSize = deadCnt + deadCnt>>2 // overestimate a bit + // sort! + sort.Slice(cids, func(i, j int) bool { + wi := weights[key(cids[i])] + wj := weights[key(cids[j])] + if wi == wj { + return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0 + } + + return wi > wj + }) + + return nil +} + +func (s *SplitStore) getObjectWeight(c cid.Cid, weights map[string]int, key func(cid.Cid) string) int { + w, ok := weights[key(c)] + if ok { + return w } - log.Infow("collection done", "took", time.Since(startCollect)) - log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "dead", deadCnt) - stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) - stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) - stats.Record(context.Background(), metrics.SplitstoreCompactionDead.M(int64(deadCnt))) + // we treat block headers specially to avoid walking the entire chain + var hdr types.BlockHeader + err := s.view(c, func(data []byte) error { + return hdr.UnmarshalCBOR(bytes.NewBuffer(data)) + }) + if err == nil { + w1 := s.getObjectWeight(hdr.ParentStateRoot, weights, key) + weights[key(hdr.ParentStateRoot)] = w1 - // Enter critical section - atomic.StoreInt32(&s.critsection, 1) - defer atomic.StoreInt32(&s.critsection, 0) + w2 := s.getObjectWeight(hdr.Messages, weights, key) + weights[key(hdr.Messages)] = w2 - // check to see if we are closing first; if that's the case just return - if atomic.LoadInt32(&s.closing) == 1 { - log.Info("splitstore is closing; aborting compaction") - return xerrors.Errorf("compaction aborted") + return 1 + w1 + w2 } - // 2.2 copy the cold objects to the coldstore - log.Info("moving cold objects to the coldstore") - startMove := time.Now() - err = s.moveColdBlocks(cold) + var links []cid.Cid + err = s.view(c, func(data []byte) error { + return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { + links = append(links, c) + }) + }) if err != nil { - return xerrors.Errorf("error moving cold blocks: %w", err) + return 1 } - log.Infow("moving done", "took", time.Since(startMove)) - // 2.3 delete cold objects from the hotstore - log.Info("purging cold objects from the hotstore") - startPurge := time.Now() - err = s.purgeBlocks(cold) - if err != nil { - return xerrors.Errorf("error purging cold blocks: %w", err) + w = 1 + for _, c := range links { + // these are internal refs, so dags will be dags + if c.Prefix().Codec != cid.DagCBOR { + w++ + continue + } + + wc := s.getObjectWeight(c, weights, key) + weights[key(c)] = wc + + w += wc } - log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) - // 2.4 remove the tracker tracking for cold objects - startPurge = time.Now() - log.Info("purging cold objects from tracker") - err = s.purgeTracking(cold) - if err != nil { - return xerrors.Errorf("error purging tracking for cold blocks: %w", err) + return w +} + +func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error { + if len(cids) == 0 { + return nil } - log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) - // 3. if we have dead objects, delete them from the hotstore and remove the tracking - if len(dead) > 0 { - log.Info("deleting dead objects") - err = s.purgeBlocks(dead) - if err != nil { - return xerrors.Errorf("error purging dead blocks: %w", err) + // we don't delete one giant batch of millions of objects, but rather do smaller batches + // so that we don't stop the world for an extended period of time + done := false + for i := 0; !done; i++ { + start := i * batchSize + end := start + batchSize + if end >= len(cids) { + end = len(cids) + done = true } - // remove the tracker tracking - startPurge := time.Now() - log.Info("purging dead objects from tracker") - err = s.purgeTracking(dead) + err := deleteBatch(cids[start:end]) if err != nil { - return xerrors.Errorf("error purging tracking for dead blocks: %w", err) + return xerrors.Errorf("error deleting batch: %w", err) } - log.Infow("purging dead from tracker done", "took", time.Since(startPurge)) } - // we are done; do some housekeeping - err = s.tracker.Sync() - if err != nil { - return xerrors.Errorf("error syncing tracker: %w", err) + return nil +} + +func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { + deadCids := make([]cid.Cid, 0, batchSize) + var purgeCnt, liveCnt int + defer func() { + log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt) + }() + + return s.purgeBatch(cids, + func(cids []cid.Cid) error { + deadCids := deadCids[:0] + + for { + if err := s.checkClosing(); err != nil { + return err + } + + s.txnLk.Lock() + if len(s.txnRefs) == 0 { + // keep the lock! + break + } + + // unlock and protect + s.txnLk.Unlock() + + err := s.protectTxnRefs(markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional refs: %w", err) + } + } + + defer s.txnLk.Unlock() + + for _, c := range cids { + live, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking for liveness: %w", err) + } + + if live { + liveCnt++ + continue + } + + deadCids = append(deadCids, c) + } + + err := s.hot.DeleteMany(deadCids) + if err != nil { + return xerrors.Errorf("error purging cold objects: %w", err) + } + + s.debug.LogDelete(deadCids) + + purgeCnt += len(deadCids) + return nil + }) +} + +// I really don't like having this code, but we seem to have some occasional DAG references with +// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared +// after a little bit. +// We need to figure out where they are coming from and eliminate that vector, but until then we +// have this gem[TM]. +// My best guess is that they are parent message receipts or yet to be computed state roots; magik +// thinks the cause may be block validation. +func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { + s.txnLk.Lock() + missing := s.txnMissing + s.txnMissing = nil + s.txnLk.Unlock() + + if len(missing) == 0 { + return } - s.gcHotstore() + log.Info("waiting for missing references") + start := time.Now() + count := 0 + defer func() { + log.Infow("waiting for missing references done", "took", time.Since(start), "marked", count) + }() - err = s.setBaseEpoch(coldEpoch) - if err != nil { - return xerrors.Errorf("error saving base epoch: %w", err) + for i := 0; i < 3 && len(missing) > 0; i++ { + if err := s.checkClosing(); err != nil { + return + } + + wait := time.Duration(i) * time.Minute + log.Infof("retrying for %d missing references in %s (attempt: %d)", len(missing), wait, i+1) + if wait > 0 { + time.Sleep(wait) + } + + towalk := missing + walked := cid.NewSet() + missing = make(map[cid.Cid]struct{}) + + for c := range towalk { + err := s.walkObjectIncomplete(c, walked, + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset for %s: %w", c, err) + } + + if mark { + return errStopWalk + } + + count++ + return markSet.Mark(c) + }, + func(c cid.Cid) error { + missing[c] = struct{}{} + return errStopWalk + }) + + if err != nil { + log.Warnf("error marking: %s", err) + } + } } - err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) - if err != nil { - return xerrors.Errorf("error saving mark set size: %w", err) + if len(missing) > 0 { + log.Warnf("still missing %d references", len(missing)) + for c := range missing { + log.Warnf("unresolved missing reference: %s", c) + } } +} - return nil +func (s *SplitStore) gcHotstore() { + if compact, ok := s.hot.(interface{ Compact() error }); ok { + log.Infof("compacting hotstore") + startCompact := time.Now() + err := compact.Compact() + if err != nil { + log.Warnf("error compacting hotstore: %s", err) + return + } + log.Infow("hotstore compaction done", "took", time.Since(startCompact)) + } + + if gc, ok := s.hot.(interface{ CollectGarbage() error }); ok { + log.Infof("garbage collecting hotstore") + startGC := time.Now() + err := gc.CollectGarbage() + if err != nil { + log.Warnf("error garbage collecting hotstore: %s", err) + return + } + log.Infow("hotstore garbage collection done", "took", time.Since(startGC)) + } } func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error { s.baseEpoch = epoch - // write to datastore return s.ds.Put(baseEpochKey, epochToBytes(epoch)) } @@ -1083,3 +1752,31 @@ func bytesToUint64(buf []byte) uint64 { i, _ := binary.Uvarint(buf) return i } + +func isUnitaryObject(c cid.Cid) bool { + pre := c.Prefix() + switch pre.Codec { + case cid.FilCommitmentSealed, cid.FilCommitmentUnsealed: + return true + default: + return pre.MhType == mh.IDENTITY + } +} + +func isIdentiyCid(c cid.Cid) bool { + return c.Prefix().MhType == mh.IDENTITY +} + +func decodeIdentityCid(c cid.Cid) ([]byte, error) { + dmh, err := mh.Decode(c.Hash()) + if err != nil { + return nil, xerrors.Errorf("error decoding identity cid %s: %w", c, err) + } + + // sanity check + if dmh.Code != mh.IDENTITY { + return nil, xerrors.Errorf("error decoding identity cid %s: hash type is not identity", c) + } + + return dmh.Digest, nil +} diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index dcaf276474d..423a765368c 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -2,6 +2,7 @@ package splitstore import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -13,6 +14,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" @@ -21,23 +23,35 @@ import ( func init() { CompactionThreshold = 5 - CompactionCold = 1 CompactionBoundary = 2 logging.SetLogLevel("splitstore", "DEBUG") } func testSplitStore(t *testing.T, cfg *Config) { chain := &mockChain{t: t} + + // the myriads of stores + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + hot := newMockStore() + cold := newMockStore() + + // this is necessary to avoid the garbage mock puts in the blocks + garbage := blocks.NewBlock([]byte{1, 2, 3}) + err := cold.Put(garbage) + if err != nil { + t.Fatal(err) + } + // genesis genBlock := mock.MkBlock(nil, 0, 0) + genBlock.Messages = garbage.Cid() + genBlock.ParentMessageReceipts = garbage.Cid() + genBlock.ParentStateRoot = garbage.Cid() + genBlock.Timestamp = uint64(time.Now().Unix()) + genTs := mock.TipSet(genBlock) chain.push(genTs) - // the myriads of stores - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - hot := blockstore.NewMemorySync() - cold := blockstore.NewMemorySync() - // put the genesis block to cold store blk, err := genBlock.ToStorageBlock() if err != nil { @@ -62,12 +76,22 @@ func testSplitStore(t *testing.T, cfg *Config) { } // make some tipsets, but not enough to cause compaction - mkBlock := func(curTs *types.TipSet, i int) *types.TipSet { + mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet { blk := mock.MkBlock(curTs, uint64(i), uint64(i)) + + blk.Messages = garbage.Cid() + blk.ParentMessageReceipts = garbage.Cid() + blk.ParentStateRoot = stateRoot.Cid() + blk.Timestamp = uint64(time.Now().Unix()) + sblk, err := blk.ToStorageBlock() if err != nil { t.Fatal(err) } + err = ss.Put(stateRoot) + if err != nil { + t.Fatal(err) + } err = ss.Put(sblk) if err != nil { t.Fatal(err) @@ -78,18 +102,6 @@ func testSplitStore(t *testing.T, cfg *Config) { return ts } - mkGarbageBlock := func(curTs *types.TipSet, i int) { - blk := mock.MkBlock(curTs, uint64(i), uint64(i)) - sblk, err := blk.ToStorageBlock() - if err != nil { - t.Fatal(err) - } - err = ss.Put(sblk) - if err != nil { - t.Fatal(err) - } - } - waitForCompaction := func() { for atomic.LoadInt32(&ss.compacting) == 1 { time.Sleep(100 * time.Millisecond) @@ -98,105 +110,63 @@ func testSplitStore(t *testing.T, cfg *Config) { curTs := genTs for i := 1; i < 5; i++ { - curTs = mkBlock(curTs, i) + stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7}) + curTs = mkBlock(curTs, i, stateRoot) waitForCompaction() } - mkGarbageBlock(genTs, 1) - // count objects in the cold and hot stores - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - countBlocks := func(bs blockstore.Blockstore) int { count := 0 - ch, err := bs.AllKeysChan(ctx) - if err != nil { - t.Fatal(err) - } - for range ch { + _ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error { count++ - } + return nil + }) return count } coldCnt := countBlocks(cold) hotCnt := countBlocks(hot) - if coldCnt != 1 { - t.Errorf("expected %d blocks, but got %d", 1, coldCnt) + if coldCnt != 2 { + t.Errorf("expected %d blocks, but got %d", 2, coldCnt) } - if hotCnt != 5 { - t.Errorf("expected %d blocks, but got %d", 5, hotCnt) + if hotCnt != 10 { + t.Errorf("expected %d blocks, but got %d", 10, hotCnt) } // trigger a compaction for i := 5; i < 10; i++ { - curTs = mkBlock(curTs, i) + stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7}) + curTs = mkBlock(curTs, i, stateRoot) waitForCompaction() } coldCnt = countBlocks(cold) hotCnt = countBlocks(hot) - if !cfg.EnableFullCompaction { - if coldCnt != 5 { - t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt) - } - - if hotCnt != 5 { - t.Errorf("expected %d hot blocks, but got %d", 5, hotCnt) - } + if coldCnt != 5 { + t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt) } - if cfg.EnableFullCompaction && !cfg.EnableGC { - if coldCnt != 3 { - t.Errorf("expected %d cold blocks, but got %d", 3, coldCnt) - } - - if hotCnt != 7 { - t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt) - } - } - - if cfg.EnableFullCompaction && cfg.EnableGC { - if coldCnt != 2 { - t.Errorf("expected %d cold blocks, but got %d", 2, coldCnt) - } - - if hotCnt != 7 { - t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt) - } + if hotCnt != 17 { + t.Errorf("expected %d hot blocks, but got %d", 17, hotCnt) } // Make sure we can revert without panicking. chain.revert(2) } -func TestSplitStoreSimpleCompaction(t *testing.T) { - testSplitStore(t, &Config{TrackingStoreType: "mem"}) -} - -func TestSplitStoreFullCompactionWithoutGC(t *testing.T) { - testSplitStore(t, &Config{ - TrackingStoreType: "mem", - EnableFullCompaction: true, - }) -} - -func TestSplitStoreFullCompactionWithGC(t *testing.T) { - testSplitStore(t, &Config{ - TrackingStoreType: "mem", - EnableFullCompaction: true, - EnableGC: true, - }) +func TestSplitStoreCompaction(t *testing.T) { + testSplitStore(t, &Config{MarkSetType: "map"}) } type mockChain struct { t testing.TB sync.Mutex + genesis *types.BlockHeader tipsets []*types.TipSet listener func(revert []*types.TipSet, apply []*types.TipSet) error } @@ -204,6 +174,9 @@ type mockChain struct { func (c *mockChain) push(ts *types.TipSet) { c.Lock() c.tipsets = append(c.tipsets, ts) + if c.genesis == nil { + c.genesis = ts.Blocks()[0] + } c.Unlock() if c.listener != nil { @@ -242,7 +215,7 @@ func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ return nil, fmt.Errorf("bad epoch %d", epoch) } - return c.tipsets[iEpoch-1], nil + return c.tipsets[iEpoch], nil } func (c *mockChain) GetHeaviestTipSet() *types.TipSet { @@ -256,24 +229,105 @@ func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, app c.listener = change } -func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error { - c.Lock() - defer c.Unlock() +type mockStore struct { + mx sync.Mutex + set map[cid.Cid]blocks.Block +} + +func newMockStore() *mockStore { + return &mockStore{set: make(map[cid.Cid]blocks.Block)} +} + +func (b *mockStore) Has(cid cid.Cid) (bool, error) { + b.mx.Lock() + defer b.mx.Unlock() + _, ok := b.set[cid] + return ok, nil +} + +func (b *mockStore) HashOnRead(hor bool) {} + +func (b *mockStore) Get(cid cid.Cid) (blocks.Block, error) { + b.mx.Lock() + defer b.mx.Unlock() - start := int(ts.Height()) - 1 - end := start - int(epochs) - if end < 0 { - end = -1 + blk, ok := b.set[cid] + if !ok { + return nil, blockstore.ErrNotFound } - for i := start; i > end; i-- { - ts := c.tipsets[i] - for _, cid := range ts.Cids() { - err := f(cid) - if err != nil { - return err - } + return blk, nil +} + +func (b *mockStore) GetSize(cid cid.Cid) (int, error) { + blk, err := b.Get(cid) + if err != nil { + return 0, err + } + + return len(blk.RawData()), nil +} + +func (b *mockStore) View(cid cid.Cid, f func([]byte) error) error { + blk, err := b.Get(cid) + if err != nil { + return err + } + return f(blk.RawData()) +} + +func (b *mockStore) Put(blk blocks.Block) error { + b.mx.Lock() + defer b.mx.Unlock() + + b.set[blk.Cid()] = blk + return nil +} + +func (b *mockStore) PutMany(blks []blocks.Block) error { + b.mx.Lock() + defer b.mx.Unlock() + + for _, blk := range blks { + b.set[blk.Cid()] = blk + } + return nil +} + +func (b *mockStore) DeleteBlock(cid cid.Cid) error { + b.mx.Lock() + defer b.mx.Unlock() + + delete(b.set, cid) + return nil +} + +func (b *mockStore) DeleteMany(cids []cid.Cid) error { + b.mx.Lock() + defer b.mx.Unlock() + + for _, c := range cids { + delete(b.set, c) + } + return nil +} + +func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, errors.New("not implemented") +} + +func (b *mockStore) ForEachKey(f func(cid.Cid) error) error { + b.mx.Lock() + defer b.mx.Unlock() + + for c := range b.set { + err := f(c) + if err != nil { + return err } } + return nil +} +func (b *mockStore) Close() error { return nil } diff --git a/blockstore/splitstore/tracking.go b/blockstore/splitstore/tracking.go deleted file mode 100644 index d57fd45ef6a..00000000000 --- a/blockstore/splitstore/tracking.go +++ /dev/null @@ -1,109 +0,0 @@ -package splitstore - -import ( - "path/filepath" - "sync" - - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" - cid "github.com/ipfs/go-cid" -) - -// TrackingStore is a persistent store that tracks blocks that are added -// to the hotstore, tracking the epoch at which they are written. -type TrackingStore interface { - Put(cid.Cid, abi.ChainEpoch) error - PutBatch([]cid.Cid, abi.ChainEpoch) error - Get(cid.Cid) (abi.ChainEpoch, error) - Delete(cid.Cid) error - DeleteBatch([]cid.Cid) error - ForEach(func(cid.Cid, abi.ChainEpoch) error) error - Sync() error - Close() error -} - -// OpenTrackingStore opens a tracking store of the specified type in the -// specified path. -func OpenTrackingStore(path string, ttype string) (TrackingStore, error) { - switch ttype { - case "", "bolt": - return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt")) - case "mem": - return NewMemTrackingStore(), nil - default: - return nil, xerrors.Errorf("unknown tracking store type %s", ttype) - } -} - -// NewMemTrackingStore creates an in-memory tracking store. -// This is only useful for test or situations where you don't want to open the -// real tracking store (eg concurrent read only access on a node's datastore) -func NewMemTrackingStore() *MemTrackingStore { - return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)} -} - -// MemTrackingStore is a simple in-memory tracking store -type MemTrackingStore struct { - sync.Mutex - tab map[cid.Cid]abi.ChainEpoch -} - -var _ TrackingStore = (*MemTrackingStore)(nil) - -func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { - s.Lock() - defer s.Unlock() - s.tab[cid] = epoch - return nil -} - -func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { - s.Lock() - defer s.Unlock() - for _, cid := range cids { - s.tab[cid] = epoch - } - return nil -} - -func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) { - s.Lock() - defer s.Unlock() - epoch, ok := s.tab[cid] - if ok { - return epoch, nil - } - return 0, xerrors.Errorf("missing tracking epoch for %s", cid) -} - -func (s *MemTrackingStore) Delete(cid cid.Cid) error { - s.Lock() - defer s.Unlock() - delete(s.tab, cid) - return nil -} - -func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error { - s.Lock() - defer s.Unlock() - for _, cid := range cids { - delete(s.tab, cid) - } - return nil -} - -func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { - s.Lock() - defer s.Unlock() - for cid, epoch := range s.tab { - err := f(cid, epoch) - if err != nil { - return err - } - } - return nil -} - -func (s *MemTrackingStore) Sync() error { return nil } -func (s *MemTrackingStore) Close() error { return nil } diff --git a/blockstore/splitstore/tracking_bolt.go b/blockstore/splitstore/tracking_bolt.go deleted file mode 100644 index c5c451e1570..00000000000 --- a/blockstore/splitstore/tracking_bolt.go +++ /dev/null @@ -1,120 +0,0 @@ -package splitstore - -import ( - "time" - - "golang.org/x/xerrors" - - cid "github.com/ipfs/go-cid" - bolt "go.etcd.io/bbolt" - - "github.com/filecoin-project/go-state-types/abi" -) - -type BoltTrackingStore struct { - db *bolt.DB - bucketId []byte -} - -var _ TrackingStore = (*BoltTrackingStore)(nil) - -func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) { - opts := &bolt.Options{ - Timeout: 1 * time.Second, - NoSync: true, - } - db, err := bolt.Open(path, 0644, opts) - if err != nil { - return nil, err - } - - bucketId := []byte("tracker") - err = db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(bucketId) - if err != nil { - return xerrors.Errorf("error creating bolt db bucket %s: %w", string(bucketId), err) - } - return nil - }) - - if err != nil { - _ = db.Close() - return nil, err - } - - return &BoltTrackingStore{db: db, bucketId: bucketId}, nil -} - -func (s *BoltTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { - val := epochToBytes(epoch) - return s.db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - return b.Put(cid.Hash(), val) - }) -} - -func (s *BoltTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { - val := epochToBytes(epoch) - return s.db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - for _, cid := range cids { - err := b.Put(cid.Hash(), val) - if err != nil { - return err - } - } - return nil - }) -} - -func (s *BoltTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) { - err = s.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - val := b.Get(cid.Hash()) - if val == nil { - return xerrors.Errorf("missing tracking epoch for %s", cid) - } - epoch = bytesToEpoch(val) - return nil - }) - return epoch, err -} - -func (s *BoltTrackingStore) Delete(cid cid.Cid) error { - return s.db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - return b.Delete(cid.Hash()) - }) -} - -func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error { - return s.db.Batch(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - for _, cid := range cids { - err := b.Delete(cid.Hash()) - if err != nil { - return xerrors.Errorf("error deleting %s", cid) - } - } - return nil - }) -} - -func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { - return s.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucketId) - return b.ForEach(func(k, v []byte) error { - cid := cid.NewCidV1(cid.Raw, k) - epoch := bytesToEpoch(v) - return f(cid, epoch) - }) - }) -} - -func (s *BoltTrackingStore) Sync() error { - return s.db.Sync() -} - -func (s *BoltTrackingStore) Close() error { - return s.db.Close() -} diff --git a/blockstore/splitstore/tracking_test.go b/blockstore/splitstore/tracking_test.go deleted file mode 100644 index afd475da5a5..00000000000 --- a/blockstore/splitstore/tracking_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package splitstore - -import ( - "io/ioutil" - "testing" - - cid "github.com/ipfs/go-cid" - "github.com/multiformats/go-multihash" - - "github.com/filecoin-project/go-state-types/abi" -) - -func TestBoltTrackingStore(t *testing.T) { - testTrackingStore(t, "bolt") -} - -func testTrackingStore(t *testing.T, tsType string) { - t.Helper() - - makeCid := func(key string) cid.Cid { - h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) - if err != nil { - t.Fatal(err) - } - - return cid.NewCidV1(cid.Raw, h) - } - - mustHave := func(s TrackingStore, cid cid.Cid, epoch abi.ChainEpoch) { - val, err := s.Get(cid) - if err != nil { - t.Fatal(err) - } - - if val != epoch { - t.Fatal("epoch mismatch") - } - } - - mustNotHave := func(s TrackingStore, cid cid.Cid) { - _, err := s.Get(cid) - if err == nil { - t.Fatal("expected error") - } - } - - path, err := ioutil.TempDir("", "snoop-test.*") - if err != nil { - t.Fatal(err) - } - - s, err := OpenTrackingStore(path, tsType) - if err != nil { - t.Fatal(err) - } - - k1 := makeCid("a") - k2 := makeCid("b") - k3 := makeCid("c") - k4 := makeCid("d") - - s.Put(k1, 1) //nolint - s.Put(k2, 2) //nolint - s.Put(k3, 3) //nolint - s.Put(k4, 4) //nolint - - mustHave(s, k1, 1) - mustHave(s, k2, 2) - mustHave(s, k3, 3) - mustHave(s, k4, 4) - - s.Delete(k1) // nolint - s.Delete(k2) // nolint - - mustNotHave(s, k1) - mustNotHave(s, k2) - mustHave(s, k3, 3) - mustHave(s, k4, 4) - - s.PutBatch([]cid.Cid{k1}, 1) //nolint - s.PutBatch([]cid.Cid{k2}, 2) //nolint - - mustHave(s, k1, 1) - mustHave(s, k2, 2) - mustHave(s, k3, 3) - mustHave(s, k4, 4) - - allKeys := map[string]struct{}{ - k1.String(): {}, - k2.String(): {}, - k3.String(): {}, - k4.String(): {}, - } - - err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error { - _, ok := allKeys[k.String()] - if !ok { - t.Fatal("unexpected key") - } - - delete(allKeys, k.String()) - return nil - }) - - if err != nil { - t.Fatal(err) - } - - if len(allKeys) != 0 { - t.Fatal("not all keys were returned") - } - - // no close and reopen and ensure the keys still exist - err = s.Close() - if err != nil { - t.Fatal(err) - } - - s, err = OpenTrackingStore(path, tsType) - if err != nil { - t.Fatal(err) - } - - mustHave(s, k1, 1) - mustHave(s, k2, 2) - mustHave(s, k3, 3) - mustHave(s, k4, 4) - - s.Close() //nolint:errcheck -} diff --git a/go.mod b/go.mod index 6f18bfa757c..6bddcd2b4d6 100644 --- a/go.mod +++ b/go.mod @@ -144,7 +144,6 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 - go.etcd.io/bbolt v1.3.4 go.opencensus.io v0.23.0 go.uber.org/dig v1.10.0 // indirect go.uber.org/fx v1.9.0 diff --git a/node/builder.go b/node/builder.go index 884261a89b9..45957c4dd49 100644 --- a/node/builder.go +++ b/node/builder.go @@ -643,6 +643,10 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), If(cfg.EnableSplitstore, + If(cfg.Splitstore.ColdStoreType == "universal", + Override(new(dtypes.ColdBlockstore), From(new(dtypes.UniversalBlockstore)))), + If(cfg.Splitstore.ColdStoreType == "discard", + Override(new(dtypes.ColdBlockstore), modules.DiscardColdBlockstore)), If(cfg.Splitstore.HotStoreType == "badger", Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)), Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)), diff --git a/node/config/def.go b/node/config/def.go index 240fadbd93f..9bbf8375c51 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -229,12 +229,9 @@ type Chainstore struct { } type Splitstore struct { - HotStoreType string - TrackingStoreType string - MarkSetType string - EnableFullCompaction bool - EnableGC bool // EXPERIMENTAL - Archival bool + ColdStoreType string + HotStoreType string + MarkSetType string } // // Full Node @@ -305,7 +302,9 @@ func DefaultFullNode() *FullNode { Chainstore: Chainstore{ EnableSplitstore: false, Splitstore: Splitstore{ - HotStoreType: "badger", + ColdStoreType: "universal", + HotStoreType: "badger", + MarkSetType: "map", }, }, } diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 787d782b7ea..9e1293c8e08 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -37,6 +37,10 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked return bs, err } +func DiscardColdBlockstore(lc fx.Lifecycle, bs dtypes.UniversalBlockstore) (dtypes.ColdBlockstore, error) { + return blockstore.NewDiscardStore(bs), nil +} + func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { path, err := r.SplitstorePath() if err != nil { @@ -66,19 +70,16 @@ func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlocksto return bs, nil } -func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { - return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { +func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { path, err := r.SplitstorePath() if err != nil { return nil, err } cfg := &splitstore.Config{ - TrackingStoreType: cfg.Splitstore.TrackingStoreType, - MarkSetType: cfg.Splitstore.MarkSetType, - EnableFullCompaction: cfg.Splitstore.EnableFullCompaction, - EnableGC: cfg.Splitstore.EnableGC, - Archival: cfg.Splitstore.Archival, + MarkSetType: cfg.Splitstore.MarkSetType, + DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil { diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index e35d02811a7..8d82006b752 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -24,9 +24,12 @@ import ( type MetadataDS datastore.Batching type ( - // UniversalBlockstore is the cold blockstore. + // UniversalBlockstore is the universal blockstore backend. UniversalBlockstore blockstore.Blockstore + // ColdBlockstore is the Cold blockstore abstraction for the splitstore + ColdBlockstore blockstore.Blockstore + // HotBlockstore is the Hot blockstore abstraction for the splitstore HotBlockstore blockstore.Blockstore