Skip to content

Commit

Permalink
Dirty from debug but tests almost done and passing
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Aug 2, 2022
1 parent 2b941b4 commit 4dff48a
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 21 deletions.
13 changes: 13 additions & 0 deletions blockstore/idstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"fmt"
"io"

blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -102,9 +103,21 @@ func (b *idstore) Put(ctx context.Context, blk blocks.Block) error {
return b.bs.Put(ctx, blk)
}

func isTestCid(c cid.Cid) bool {
testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
panic(err)
}

return c.Hash().HexString() == testCid.Hash().HexString()
}

func (b *idstore) PutMany(ctx context.Context, blks []blocks.Block) error {
toPut := make([]blocks.Block, 0, len(blks))
for _, blk := range blks {
if isTestCid(blk.Cid()) {
fmt.Printf("[ID STORE]: putting the test cid\n\n")
}
inline, _, err := decodeCid(blk.Cid())
if err != nil {
return xerrors.Errorf("error decoding Cid: %w", err)
Expand Down
23 changes: 12 additions & 11 deletions blockstore/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,36 @@ func NewMemory() MemBlockstore {
}

// MemBlockstore is a terminal blockstore that keeps blocks in memory.
type MemBlockstore map[cid.Cid]blocks.Block
// To match behavior of badger blockstore we index by multihash only.
type MemBlockstore map[string]blocks.Block

func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
delete(m, k)
delete(m, k.Hash().HexString())
return nil
}

func (m MemBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
for _, k := range ks {
delete(m, k)
delete(m, k.Hash().HexString())
}
return nil
}

func (m MemBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
_, ok := m[k]
_, ok := m[k.Hash().HexString()]
return ok, nil
}

func (m MemBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
b, ok := m[k]
b, ok := m[k.Hash().HexString()]
if !ok {
return ipld.ErrNotFound{Cid: k}
}
return callback(b.RawData())
}

func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
b, ok := m[k]
b, ok := m[k.Hash().HexString()]
if !ok {
return nil, ipld.ErrNotFound{Cid: k}
}
Expand All @@ -51,7 +52,7 @@ func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error)

// GetSize returns the CIDs mapped BlockSize
func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
b, ok := m[k]
b, ok := m[k.Hash().HexString()]
if !ok {
return 0, ipld.ErrNotFound{Cid: k}
}
Expand All @@ -62,7 +63,7 @@ func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
// Convert to a basic block for safety, but try to reuse the existing
// block if it's already a basic block.
k := b.Cid()
k := b.Cid().Hash().HexString()
if _, ok := b.(*blocks.BasicBlock); !ok {
// If we already have the block, abort.
if _, ok := m[k]; ok {
Expand All @@ -71,7 +72,7 @@ func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
// the error is only for debugging.
b, _ = blocks.NewBlockWithCid(b.RawData(), b.Cid())
}
m[b.Cid()] = b
m[k] = b
return nil
}

Expand All @@ -89,8 +90,8 @@ func (m MemBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
// the given context, closing the channel if it becomes Done.
func (m MemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid, len(m))
for k := range m {
ch <- k
for _, b := range m {
ch <- b.Cid()
}
close(ch)
return ch, nil
Expand Down
21 changes: 21 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package splitstore
import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cidtype "github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -314,6 +316,12 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
}

func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {
fmt.Printf("[SS] Get cid: %s\n", cid)
}
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {
defer fmt.Printf("[SS] finish get: %s\n", cid)
}
if isIdentiyCid(cid) {
data, err := decodeIdentityCid(cid)
if err != nil {
Expand All @@ -328,6 +336,10 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

// critical section
if s.txnMarkSet != nil {
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {
fmt.Printf("[SS] Get in critical section\n")
}

has, err := s.txnMarkSet.Has(cid)
if err != nil {
return nil, err
Expand All @@ -350,6 +362,10 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

switch {
case err == nil:
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {

fmt.Printf("[SS] hot had it\n")
}
s.trackTxnRef(cid)
return blk, nil

Expand All @@ -360,6 +376,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)

blk, err = s.cold.Get(ctx, cid)
if err == nil {
if compare, ok := ctx.Value("verbose-cid").(cidtype.Cid); ok && compare == cid {

fmt.Printf("[SS] cold had it\n")
}

s.trackTxnRef(cid)
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
Expand Down
94 changes: 92 additions & 2 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,36 @@ const (
batchSize = 16384
)

func isTestCid(c cid.Cid) bool {
testCid, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
panic(err)
}

return c.Hash().HexString() == testCid.Hash().HexString()
}

func (s *SplitStore) fmtDebug(extra string) {
hHas, cHas := s.lookupTestCid()
fmt.Printf("[COMPACT]: %s, has test cid (hot %t) (cold %t))", extra, hHas, cHas)
}

func (s *SplitStore) lookupTestCid() (bool, bool) {
c, err := cid.Decode("bafy2bzacec4ek45pyx2ihisbmbhbit5htk2ovrry4mpmxkhjasbln3ikvzanu")
if err != nil {
panic(err)
}
hHas, err := s.hot.Has(s.ctx, c)
if err != nil {
panic(err)
}
cHas, err := s.cold.Has(s.ctx, c)
if err != nil {
panic(err)
}
return hHas, cHas
}

func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
s.headChangeMx.Lock()
defer s.headChangeMx.Unlock()
Expand Down Expand Up @@ -121,6 +151,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
if epoch-s.baseEpoch > CompactionThreshold {
fmt.Printf("YYYYYYYYY\n Begin compaction \nYYYYYYY\n")
// it's time to compact -- prepare the transaction and go!
s.fmtDebug("right before compaction")
s.beginTxnProtect()
s.compactType = hot
go func() {
Expand Down Expand Up @@ -540,6 +571,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}

s.fmtDebug("before chain walk")

// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
Expand All @@ -548,6 +581,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
count := new(int64)
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error {
if isTestCid(c) {
fmt.Printf("\n\n[COMPACT] found the cid %s during chain traversal!\n\n", c.String())
}
if isUnitaryObject(c) {
return errStopWalk
}
Expand All @@ -569,6 +605,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error marking: %w", err)
}

s.fmtDebug("after chain walk")

s.markSetSize = *count + *count>>2 // overestimate a bit

log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
Expand Down Expand Up @@ -599,8 +637,11 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

// some stats for logging
var hotCnt, coldCnt int

fmt.Printf("\n\n[COMPACT]: traversing hot store of type %T\n\n", s.hot)
err = s.hot.ForEachKey(func(c cid.Cid) error {
if isTestCid(c) {
fmt.Printf("\n\n[COMPACT]: found %s(%s) in hot store during compact\n\n", c, c.String())
}
// was it marked?
mark, err := markSet.Has(c)
if err != nil {
Expand All @@ -621,6 +662,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil
})

s.fmtDebug("after traversing hot store")
if err != nil {
return xerrors.Errorf("error collecting cold objects: %w", err)
}
Expand Down Expand Up @@ -672,6 +714,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error resetting coldset: %w", err)
}
}
s.fmtDebug("after moving to cold store")

// 4. Purge cold objects with checkpointing for recovery.
// This is the critical section of compaction, whereby any cold object not in the markSet is
Expand All @@ -695,6 +738,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}

s.fmtDebug("inside critical section")

checkpoint, err := NewCheckpoint(s.checkpointPath())
if err != nil {
return xerrors.Errorf("error creating checkpoint: %w", err)
Expand All @@ -710,6 +755,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))

s.fmtDebug("after purge")
s.endCriticalSection()

if err := checkpoint.Close(); err != nil {
Expand All @@ -727,7 +773,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

// we are done; do some housekeeping
s.endTxnProtect()
s.fmtDebug("critical section done")
s.gcHotstore()
s.fmtDebug("hot store gc")

err = s.setBaseEpoch(boundaryEpoch)
if err != nil {
Expand Down Expand Up @@ -1124,13 +1172,42 @@ func (s *SplitStore) getSize(c cid.Cid) (int, error) {
}
}

type codecPreservingBlock struct {
blocks.Block
codec uint64
}

func (b *codecPreservingBlock) RawData() []byte {
return b.Block.RawData()
}

func (b *codecPreservingBlock) Cid() cid.Cid {
raw := b.Block.Cid()
if raw.Version() == 0 {
return raw
}
return cid.NewCidV1(b.codec, raw.Hash())
}

func (b *codecPreservingBlock) String() string {
c := b.Cid()
return fmt.Sprintf("[Block %s]", c)
}

func (b *codecPreservingBlock) Loggable() map[string]interface{} {
return b.Block.Loggable()
}

func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
batch := make([]blocks.Block, 0, batchSize)

err := coldr.ForEach(func(c cid.Cid) error {
if err := s.checkClosing(); err != nil {
return err
}
if isTestCid(c) {
fmt.Printf("[COMPACT]: foudn test cid while moving cold blocks %s\n\n", c)
}

blk, err := s.hot.Get(s.ctx, c)
if err != nil {
Expand All @@ -1142,13 +1219,23 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
}

batch = append(batch, blk)
batch = append(batch, &codecPreservingBlock{blk, c.Prefix().Codec})
if len(batch) == batchSize {
err = s.cold.PutMany(s.ctx, batch)
if err != nil {
return xerrors.Errorf("error putting batch to coldstore: %w", err)
}
for _, b := range batch {
h, err := s.cold.Has(s.ctx, c)
if err != nil {
return err
}
if !h {
fmt.Printf("[COMPACT] cold store has %s(%s) failed immediately after writing to cold store/n", b.Cid(), c)
}
}
batch = batch[:0]

}

return nil
Expand Down Expand Up @@ -1189,6 +1276,9 @@ func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet

err := coldr.ForEach(func(c cid.Cid) error {
batch = append(batch, c)
if isTestCid(c) {
fmt.Printf("[COMPACT]: found test cid while purging\n\n")
}
if len(batch) == batchSize {
return deleteBatch()
}
Expand Down
Loading

0 comments on commit 4dff48a

Please sign in to comment.