From a459d967c7005323020d0df9045ba09661abdf92 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Thu, 9 May 2024 08:28:39 -0500 Subject: [PATCH 1/2] Add BatchPreload to decode slabs in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The intended use for BatchPreload is to speedup migrations. BatchPreload decodes slabs in parallel and stores decoded slabs in cache for later retrieval. This is useful for migration program when most or all slabs are expected to be migrated. │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ StorageRetrieve/10-12 36.23µ ± 3% 35.33µ ± 4% ~ (p=0.075 n=10) StorageRetrieve/100-12 469.6µ ± 8% 124.3µ ± 0% -73.52% (p=0.000 n=10) StorageRetrieve/1000-12 6.678m ± 7% 2.303m ± 20% -65.51% (p=0.000 n=10) StorageRetrieve/10000-12 29.81m ± 2% 12.26m ± 5% -58.86% (p=0.000 n=10) StorageRetrieve/100000-12 303.33m ± 1% 88.40m ± 1% -70.86% (p=0.000 n=10) StorageRetrieve/1000000-12 3.442 ± 1% 1.137 ± 3% -66.96% (p=0.000 n=10) geomean 12.34m 4.816m -60.98% │ before.txt │ after.txt │ │ B/op │ B/op vs base │ StorageRetrieve/10-12 21.59Ki ± 0% 21.59Ki ± 0% ~ (p=1.000 n=10) StorageRetrieve/100-12 219.8Ki ± 0% 224.7Ki ± 0% +2.24% (p=0.000 n=10) StorageRetrieve/1000-12 2.266Mi ± 0% 2.272Mi ± 0% +0.27% (p=0.000 n=10) StorageRetrieve/10000-12 21.94Mi ± 0% 22.14Mi ± 0% +0.91% (p=0.000 n=10) StorageRetrieve/100000-12 215.3Mi ± 0% 218.5Mi ± 0% +1.50% (p=0.000 n=10) StorageRetrieve/1000000-12 2.211Gi ± 0% 2.212Gi ± 0% +0.05% (p=0.000 n=10) geomean 6.919Mi 6.976Mi +0.82% │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ StorageRetrieve/10-12 76.00 ± 0% 76.00 ± 0% ~ (p=1.000 n=10) ¹ StorageRetrieve/100-12 745.0 ± 0% 759.0 ± 0% +1.88% (p=0.000 n=10) StorageRetrieve/1000-12 7.161k ± 0% 7.153k ± 0% -0.11% (p=0.000 n=10) StorageRetrieve/10000-12 70.77k ± 0% 70.58k ± 0% -0.27% (p=0.000 n=10) StorageRetrieve/100000-12 711.9k ± 0% 709.7k ± 0% -0.31% (p=0.000 n=10) StorageRetrieve/1000000-12 7.115M ± 0% 7.077M ± 0% -0.54% (p=0.000 n=10) geomean 22.93k 22.95k +0.11% --- storage.go | 148 ++++++++++++++++++++++++++++++++++++ storage_bench_test.go | 119 +++++++++++++++++++++++++++++ storage_test.go | 170 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 437 insertions(+) diff --git a/storage.go b/storage.go index 19d0c9c..34b3760 100644 --- a/storage.go +++ b/storage.go @@ -1560,3 +1560,151 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) ( return references, brokenReferences, nil } + +func (s *PersistentSlabStorage) BatchPreload(ids []SlabID, numWorkers int) error { + if len(ids) == 0 { + return nil + } + + minCountForBatchPreload := 11 + if len(ids) < minCountForBatchPreload { + + for _, id := range ids { + // fetch from base storage last + data, ok, err := s.baseStorage.Retrieve(id) + if err != nil { + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id)) + } + if !ok { + continue + } + + slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo) + if err != nil { + // err is already categorized by DecodeSlab(). + return err + } + + // save decoded slab to cache + s.cache[id] = slab + } + + return nil + } + + type slabToBeDecoded struct { + slabID SlabID + data []byte + } + + type decodedSlab struct { + slabID SlabID + slab Slab + err error + } + + // Define decoder (worker) to decode slabs in parallel + decoder := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan slabToBeDecoded, results chan<- decodedSlab) { + defer wg.Done() + + for slabData := range jobs { + // Check if goroutine is signaled to stop before proceeding. + select { + case <-done: + return + default: + } + + id := slabData.slabID + data := slabData.data + + slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo) + // err is already categorized by DecodeSlab(). + results <- decodedSlab{ + slabID: id, + slab: slab, + err: err, + } + } + } + + if numWorkers > len(ids) { + numWorkers = len(ids) + } + + var wg sync.WaitGroup + + // Construct done signal channel + done := make(chan struct{}) + + // Construct job queue + jobs := make(chan slabToBeDecoded, len(ids)) + + // Construct result queue + results := make(chan decodedSlab, len(ids)) + + defer func() { + // This ensures that all goroutines are stopped before output channel is closed. + + // Wait for all goroutines to finish + wg.Wait() + + // Close output channel + close(results) + }() + + // Preallocate cache map if empty + if len(s.cache) == 0 { + s.cache = make(map[SlabID]Slab, len(ids)) + } + + // Launch workers + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go decoder(&wg, done, jobs, results) + } + + // Send jobs + jobCount := 0 + { + // Need to close input channel (jobs) here because + // if there isn't any job in jobs channel, + // done is never processed inside loop "for slabData := range jobs". + defer close(jobs) + + for _, id := range ids { + // fetch from base storage last + data, ok, err := s.baseStorage.Retrieve(id) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id)) + } + if !ok { + continue + } + + jobs <- slabToBeDecoded{id, data} + jobCount++ + } + } + + // Process results + for i := 0; i < jobCount; i++ { + result := <-results + + if result.err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // result.err is already categorized by DecodeSlab(). + return result.err + } + + // save decoded slab to cache + s.cache[result.slabID] = result.slab + } + + return nil +} diff --git a/storage_bench_test.go b/storage_bench_test.go index ae76f26..5fb80f1 100644 --- a/storage_bench_test.go +++ b/storage_bench_test.go @@ -132,3 +132,122 @@ func BenchmarkStorageNondeterministicFastCommit(b *testing.B) { benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000) benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000) } + +func benchmarkRetrieve(b *testing.B, seed int64, numberOfSlabs int) { + + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + encodedSlabs := make(map[SlabID][]byte) + ids := make([]SlabID, 0, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slab := generateLargeSlab(id) + + data, err := EncodeSlab(slab, encMode) + require.NoError(b, err) + + encodedSlabs[id] = data + ids = append(ids, id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + b.StartTimer() + + for _, id := range ids { + _, found, err := storage.Retrieve(id) + require.True(b, found) + require.NoError(b, err) + } + } + }) +} + +func benchmarkBatchPreload(b *testing.B, seed int64, numberOfSlabs int) { + + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + encodedSlabs := make(map[SlabID][]byte) + ids := make([]SlabID, 0, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slab := generateLargeSlab(id) + + data, err := EncodeSlab(slab, encMode) + require.NoError(b, err) + + encodedSlabs[id] = data + ids = append(ids, id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + b.StartTimer() + + err = storage.BatchPreload(ids, runtime.NumCPU()) + require.NoError(b, err) + + for _, id := range ids { + _, found, err := storage.Retrieve(id) + require.True(b, found) + require.NoError(b, err) + } + } + }) +} + +func BenchmarkStorageRetrieve(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkRetrieve(b, fixedSeed, 10) + benchmarkRetrieve(b, fixedSeed, 100) + benchmarkRetrieve(b, fixedSeed, 1_000) + benchmarkRetrieve(b, fixedSeed, 10_000) + benchmarkRetrieve(b, fixedSeed, 100_000) + benchmarkRetrieve(b, fixedSeed, 1_000_000) +} + +func BenchmarkStorageBatchPreload(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkBatchPreload(b, fixedSeed, 10) + benchmarkBatchPreload(b, fixedSeed, 100) + benchmarkBatchPreload(b, fixedSeed, 1_000) + benchmarkBatchPreload(b, fixedSeed, 10_000) + benchmarkBatchPreload(b, fixedSeed, 100_000) + benchmarkBatchPreload(b, fixedSeed, 1_000_000) +} diff --git a/storage_test.go b/storage_test.go index 12ebdf4..a808e47 100644 --- a/storage_test.go +++ b/storage_test.go @@ -19,6 +19,7 @@ package atree import ( + "encoding/binary" "errors" "math/rand" "runtime" @@ -4598,3 +4599,172 @@ func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, n require.Nil(t, storedValue) } } + +func TestStorageBatchPreload(t *testing.T) { + t.Run("0 slab", func(t *testing.T) { + numberOfAccounts := 0 + numberOfSlabsPerAccount := 0 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("1 slab", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 1 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("10 slab", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 10 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("100 slabs", func(t *testing.T) { + numberOfAccounts := 10 + numberOfSlabsPerAccount := 10 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("10_000 slabs", func(t *testing.T) { + numberOfAccounts := 10 + numberOfSlabsPerAccount := 1_000 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) +} + +func testStorageBatchPreload(t *testing.T, numberOfAccounts int, numberOfSlabsPerAccount int) { + + indexesByAddress := make(map[Address]uint64) + + generateSlabID := func(address Address) SlabID { + nextIndex := indexesByAddress[address] + 1 + + var idx SlabIndex + binary.BigEndian.PutUint64(idx[:], nextIndex) + + indexesByAddress[address] = nextIndex + + return NewSlabID(address, idx) + } + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + encodedSlabs := make(map[SlabID][]byte) + + // Generate and encode slabs + for i := 0; i < numberOfAccounts; i++ { + + addr := generateRandomAddress(r) + + for j := 0; j < numberOfSlabsPerAccount; j++ { + + slabID := generateSlabID(addr) + + slab := generateRandomSlab(slabID, r) + + encodedSlabs[slabID], err = EncodeSlab(slab, encMode) + require.NoError(t, err) + } + } + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + ids := make([]SlabID, 0, len(encodedSlabs)) + for id := range encodedSlabs { + ids = append(ids, id) + } + + // Batch preload slabs from base storage + err = storage.BatchPreload(ids, runtime.NumCPU()) + require.NoError(t, err) + require.Equal(t, len(encodedSlabs), len(storage.cache)) + require.Equal(t, 0, len(storage.deltas)) + + // Compare encoded data + for id, data := range encodedSlabs { + cachedData, err := EncodeSlab(storage.cache[id], encMode) + require.NoError(t, err) + + require.Equal(t, cachedData, data) + } +} + +func TestStorageBatchPreloadNotFoundSlabs(t *testing.T) { + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + t.Run("empty storage", func(t *testing.T) { + const numberOfSlabs = 10 + + ids := make([]SlabID, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + ids[i] = NewSlabID(generateRandomAddress(r), index) + } + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + err := storage.BatchPreload(ids, runtime.NumCPU()) + require.NoError(t, err) + + require.Equal(t, 0, len(storage.cache)) + require.Equal(t, 0, len(storage.deltas)) + }) + + t.Run("non-empty storage", func(t *testing.T) { + const numberOfSlabs = 10 + + ids := make([]SlabID, numberOfSlabs) + encodedSlabs := make(map[SlabID][]byte) + + for i := 0; i < numberOfSlabs; i++ { + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := NewSlabID(generateRandomAddress(r), index) + + slab := generateRandomSlab(id, r) + + encodedSlabs[id], err = EncodeSlab(slab, encMode) + require.NoError(t, err) + + ids[i] = id + } + + // Append a slab ID that doesn't exist in storage. + ids = append(ids, NewSlabID(generateRandomAddress(r), SlabIndex{numberOfSlabs})) + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + err := storage.BatchPreload(ids, runtime.NumCPU()) + require.NoError(t, err) + + require.Equal(t, len(encodedSlabs), len(storage.cache)) + require.Equal(t, 0, len(storage.deltas)) + + // Compare encoded data + for id, data := range encodedSlabs { + cachedData, err := EncodeSlab(storage.cache[id], encMode) + require.NoError(t, err) + + require.Equal(t, cachedData, data) + } + }) +} From aa2ee903d629a0347d2cd418a857d14d09f8d667 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 14:58:30 -0500 Subject: [PATCH 2/2] Add more comments in BatchPreload --- storage.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/storage.go b/storage.go index 34b3760..045e568 100644 --- a/storage.go +++ b/storage.go @@ -1561,12 +1561,16 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) ( return references, brokenReferences, nil } +// BatchPreload decodeds and caches slabs of given ids in parallel. +// This is useful for storage health or data validation in migration programs. func (s *PersistentSlabStorage) BatchPreload(ids []SlabID, numWorkers int) error { if len(ids) == 0 { return nil } - minCountForBatchPreload := 11 + // Use 11 for min slab count for parallel decoding because micro benchmarks showed + // performance regression for <= 10 slabs when decoding slabs in parallel. + const minCountForBatchPreload = 11 if len(ids) < minCountForBatchPreload { for _, id := range ids {