Skip to content

Commit

Permalink
Merge pull request #404 from onflow/fxamacker/add-batch-preload
Browse files Browse the repository at this point in the history
Add BatchPreload to decode slabs in parallel and cache
  • Loading branch information
fxamacker authored May 14, 2024
2 parents 12929f5 + aa2ee90 commit 73e00ec
Show file tree
Hide file tree
Showing 3 changed files with 441 additions and 0 deletions.
152 changes: 152 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,3 +1585,155 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) (

return references, brokenReferences, nil
}

// BatchPreload decodeds and caches slabs of given ids in parallel.
// This is useful for storage health or data validation in migration programs.
func (s *PersistentSlabStorage) BatchPreload(ids []SlabID, numWorkers int) error {
if len(ids) == 0 {
return nil
}

// Use 11 for min slab count for parallel decoding because micro benchmarks showed
// performance regression for <= 10 slabs when decoding slabs in parallel.
const minCountForBatchPreload = 11
if len(ids) < minCountForBatchPreload {

for _, id := range ids {
// fetch from base storage last
data, ok, err := s.baseStorage.Retrieve(id)
if err != nil {
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
}
if !ok {
continue
}

slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
if err != nil {
// err is already categorized by DecodeSlab().
return err
}

// save decoded slab to cache
s.cache[id] = slab
}

return nil
}

type slabToBeDecoded struct {
slabID SlabID
data []byte
}

type decodedSlab struct {
slabID SlabID
slab Slab
err error
}

// Define decoder (worker) to decode slabs in parallel
decoder := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan slabToBeDecoded, results chan<- decodedSlab) {
defer wg.Done()

for slabData := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := slabData.slabID
data := slabData.data

slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
// err is already categorized by DecodeSlab().
results <- decodedSlab{
slabID: id,
slab: slab,
err: err,
}
}
}

if numWorkers > len(ids) {
numWorkers = len(ids)
}

var wg sync.WaitGroup

// Construct done signal channel
done := make(chan struct{})

// Construct job queue
jobs := make(chan slabToBeDecoded, len(ids))

// Construct result queue
results := make(chan decodedSlab, len(ids))

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Preallocate cache map if empty
if len(s.cache) == 0 {
s.cache = make(map[SlabID]Slab, len(ids))
}

// Launch workers
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go decoder(&wg, done, jobs, results)
}

// Send jobs
jobCount := 0
{
// Need to close input channel (jobs) here because
// if there isn't any job in jobs channel,
// done is never processed inside loop "for slabData := range jobs".
defer close(jobs)

for _, id := range ids {
// fetch from base storage last
data, ok, err := s.baseStorage.Retrieve(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
}
if !ok {
continue
}

jobs <- slabToBeDecoded{id, data}
jobCount++
}
}

// Process results
for i := 0; i < jobCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by DecodeSlab().
return result.err
}

// save decoded slab to cache
s.cache[result.slabID] = result.slab
}

return nil
}
119 changes: 119 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,122 @@ func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}

func benchmarkRetrieve(b *testing.B, seed int64, numberOfSlabs int) {

r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

encodedSlabs := make(map[SlabID][]byte)
ids := make([]SlabID, 0, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slab := generateLargeSlab(id)

data, err := EncodeSlab(slab, encMode)
require.NoError(b, err)

encodedSlabs[id] = data
ids = append(ids, id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)

b.StartTimer()

for _, id := range ids {
_, found, err := storage.Retrieve(id)
require.True(b, found)
require.NoError(b, err)
}
}
})
}

func benchmarkBatchPreload(b *testing.B, seed int64, numberOfSlabs int) {

r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

encodedSlabs := make(map[SlabID][]byte)
ids := make([]SlabID, 0, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slab := generateLargeSlab(id)

data, err := EncodeSlab(slab, encMode)
require.NoError(b, err)

encodedSlabs[id] = data
ids = append(ids, id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)

b.StartTimer()

err = storage.BatchPreload(ids, runtime.NumCPU())
require.NoError(b, err)

for _, id := range ids {
_, found, err := storage.Retrieve(id)
require.True(b, found)
require.NoError(b, err)
}
}
})
}

func BenchmarkStorageRetrieve(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkRetrieve(b, fixedSeed, 10)
benchmarkRetrieve(b, fixedSeed, 100)
benchmarkRetrieve(b, fixedSeed, 1_000)
benchmarkRetrieve(b, fixedSeed, 10_000)
benchmarkRetrieve(b, fixedSeed, 100_000)
benchmarkRetrieve(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageBatchPreload(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkBatchPreload(b, fixedSeed, 10)
benchmarkBatchPreload(b, fixedSeed, 100)
benchmarkBatchPreload(b, fixedSeed, 1_000)
benchmarkBatchPreload(b, fixedSeed, 10_000)
benchmarkBatchPreload(b, fixedSeed, 100_000)
benchmarkBatchPreload(b, fixedSeed, 1_000_000)
}
Loading

0 comments on commit 73e00ec

Please sign in to comment.