Skip to content

Commit

Permalink
Merge pull request #2771 from Juliusan/wal
Browse files Browse the repository at this point in the history
WAL improvements
  • Loading branch information
Juliusan authored Aug 7, 2023
2 parents c614bdf + ab5baeb commit 9153530
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 47 deletions.
132 changes: 99 additions & 33 deletions packages/chain/statemanager/sm_gpa/sm_gpa_utils/block_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sm_gpa_utils

import (
"bufio"
"encoding/hex"
"fmt"
"os"
"path/filepath"
Expand All @@ -24,7 +25,10 @@ type blockWAL struct {
metrics *metrics.ChainBlockWALMetrics
}

const constBlockWALFileSuffix = ".blk"
const (
constBlockWALFileSuffix = ".blk"
constBlockWALTmpFileSuffix = ".tmp"
)

func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metrics *metrics.ChainBlockWALMetrics) (BlockWAL, error) {
dir := filepath.Join(baseDir, chainID.String())
Expand All @@ -45,37 +49,77 @@ func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metric
func (bwT *blockWAL) Write(block state.Block) error {
blockIndex := block.StateIndex()
commitment := block.L1Commitment()
fileName := blockWALFileName(commitment.BlockHash())
filePath := filepath.Join(bwT.dir, fileName)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o666)
if err != nil {
bwT.metrics.IncFailedWrites()
return fmt.Errorf("opening file %s for writing block index %v failed: %w", fileName, blockIndex, err)
subfolderName := blockWALSubFolderName(commitment.BlockHash())
folderPath := filepath.Join(bwT.dir, subfolderName)
if err := ioutils.CreateDirectory(folderPath, 0o777); err != nil {
return fmt.Errorf("failed create folder %v for writing block index %v: %w", folderPath, blockIndex, err)
}
defer f.Close()
blockBytes := block.Bytes()
n, err := f.Write(blockBytes)
tmpFileName := blockWALTmpFileName(commitment.BlockHash())
tmpFilePath := filepath.Join(folderPath, tmpFileName)
err := func() error { // Function is used to make defered close occur when it is needed even if write is successful
f, err := os.OpenFile(tmpFilePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o666)
if err != nil {
bwT.metrics.IncFailedWrites()
return fmt.Errorf("failed to create temporary file %s for writing block index %v: %w", tmpFileName, blockIndex, err)
}
defer f.Close()
blockBytes := block.Bytes()
n, err := f.Write(blockBytes)
if err != nil {
bwT.metrics.IncFailedWrites()
return fmt.Errorf("writing block index %v data to temporary file %s failed: %w", blockIndex, tmpFileName, err)
}
if len(blockBytes) != n {
bwT.metrics.IncFailedWrites()
return fmt.Errorf("only %v of total %v bytes of block index %v were written to temporary file %s", n, len(blockBytes), blockIndex, tmpFileName)
}
return nil
}()
if err != nil {
bwT.metrics.IncFailedWrites()
return fmt.Errorf("writing block index %v data to file %s failed: %w", blockIndex, fileName, err)
return err
}
if len(blockBytes) != n {
bwT.metrics.IncFailedWrites()
return fmt.Errorf("only %v of total %v bytes of block index %v were written to file %s", n, len(blockBytes), blockIndex, fileName)
finalFileName := blockWALFileName(commitment.BlockHash())
finalFilePath := filepath.Join(folderPath, finalFileName)
err = os.Rename(tmpFilePath, finalFilePath)
if err != nil {
return fmt.Errorf("failed to move temporary WAL file %s to permanent location %s: %v",
tmpFilePath, finalFilePath, err)
}

bwT.metrics.BlockWritten(block.StateIndex())
bwT.LogDebugf("Block index %v %s written to wal; file name - %s", blockIndex, commitment, fileName)
bwT.LogDebugf("Block index %v %s written to wal; file name - %s", blockIndex, commitment, finalFileName)
return nil
}

func (bwT *blockWAL) blockFilepath(blockHash state.BlockHash) (string, bool) {
subfolderName := blockWALSubFolderName(blockHash)
fileName := blockWALFileName(blockHash)

pathWithSubFolder := filepath.Join(bwT.dir, subfolderName, fileName)
_, err := os.Stat(pathWithSubFolder)
if err == nil {
return pathWithSubFolder, true
}

// Checked for backward compatibility and for ease of adding some blocks from other sources
pathNoSubFolder := filepath.Join(bwT.dir, fileName)
_, err = os.Stat(pathNoSubFolder)
if err == nil {
return pathNoSubFolder, true
}
return "", false
}

func (bwT *blockWAL) Contains(blockHash state.BlockHash) bool {
_, err := os.Stat(filepath.Join(bwT.dir, blockWALFileName(blockHash)))
return err == nil
_, exists := bwT.blockFilepath(blockHash)
return exists
}

func (bwT *blockWAL) Read(blockHash state.BlockHash) (state.Block, error) {
fileName := blockWALFileName(blockHash)
filePath := filepath.Join(bwT.dir, fileName)
filePath, exists := bwT.blockFilepath(blockHash)
if !exists {
return nil, fmt.Errorf("block hash %s is not present in WAL", blockHash)
}
block, err := blockFromFilePath(filePath)
if err != nil {
bwT.metrics.IncFailedReads()
Expand All @@ -88,24 +132,16 @@ func (bwT *blockWAL) Read(blockHash state.BlockHash) (state.Block, error) {
// The blocks are provided ordered by the state index, so that they can be applied to the store.
// This function reads blocks twice, but tries to minimize the amount of memory required to load the WAL.
func (bwT *blockWAL) ReadAllByStateIndex(cb func(stateIndex uint32, block state.Block) bool) error {
dirEntries, err := os.ReadDir(bwT.dir)
if err != nil {
return err
}
blocksByStateIndex := map[uint32][]string{}
for _, dirEntry := range dirEntries {
if !dirEntry.Type().IsRegular() {
continue
checkFile := func(filePath string) {
if !strings.HasSuffix(filePath, constBlockWALFileSuffix) {
return
}
if !strings.HasSuffix(dirEntry.Name(), constBlockWALFileSuffix) {
continue
}
filePath := filepath.Join(bwT.dir, dirEntry.Name())
fileBlock, fileErr := blockFromFilePath(filePath)
if fileErr != nil {
bwT.metrics.IncFailedReads()
bwT.LogWarn("Unable to read %v: %v", filePath, err)
continue
bwT.LogWarn("Unable to read %v: %v", filePath, fileErr)
return
}
stateIndex := fileBlock.StateIndex()
stateIndexPaths, found := blocksByStateIndex[stateIndex]
Expand All @@ -116,6 +152,28 @@ func (bwT *blockWAL) ReadAllByStateIndex(cb func(stateIndex uint32, block state.
}
blocksByStateIndex[stateIndex] = stateIndexPaths
}

var checkDir func(dirPath string, dirEntries []os.DirEntry)
checkDir = func(dirPath string, dirEntries []os.DirEntry) {
for _, dirEntry := range dirEntries {
entryPath := filepath.Join(dirPath, dirEntry.Name())
if dirEntry.IsDir() {
subDirEntries, err := os.ReadDir(entryPath)
if err == nil {
checkDir(entryPath, subDirEntries)
}
} else {
checkFile(entryPath)
}
}
}

dirEntries, err := os.ReadDir(bwT.dir)
if err != nil {
return err
}
checkDir(bwT.dir, dirEntries)

allStateIndexes := lo.Keys(blocksByStateIndex)
sort.Slice(allStateIndexes, func(i, j int) bool { return allStateIndexes[i] < allStateIndexes[j] })
for _, stateIndex := range allStateIndexes {
Expand Down Expand Up @@ -160,6 +218,14 @@ func blockFromFilePath(filePath string) (state.Block, error) {
return block, nil
}

func blockWALSubFolderName(blockHash state.BlockHash) string {
return hex.EncodeToString(blockHash[:1])
}

func blockWALFileName(blockHash state.BlockHash) string {
return blockHash.String() + constBlockWALFileSuffix
}

func blockWALTmpFileName(blockHash state.BlockHash) string {
return blockWALFileName(blockHash) + constBlockWALTmpFileSuffix
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sm_gpa_utils
import (
"crypto/rand"
"os"
"path/filepath"
"testing"

"github.com/samber/lo"
Expand Down Expand Up @@ -44,7 +43,7 @@ func newBlockWALTestSM(t *rapid.T) *blockWALTestSM {
return bwtsmT
}

func (bwtsmT *blockWALTestSM) Cleanup() {
func (bwtsmT *blockWALTestSM) cleanup() {
bwtsmT.log.Sync()
os.RemoveAll(constTestFolder)
}
Expand Down Expand Up @@ -107,8 +106,8 @@ func (bwtsmT *blockWALTestSM) MoveBlock(t *rapid.T) {
if blockHashOrig.Equals(blockHashToDamage) {
t.Skip()
}
fileOrigPath := bwtsmT.pathFromHash(blockHashOrig)
fileToDamagePath := bwtsmT.pathFromHash(blockHashToDamage)
fileOrigPath := walPathFromHash(bwtsmT.factory.GetChainID(), blockHashOrig)
fileToDamagePath := walPathFromHash(bwtsmT.factory.GetChainID(), blockHashToDamage)
data, err := os.ReadFile(fileOrigPath)
require.NoError(t, err)
err = os.WriteFile(fileToDamagePath, data, 0o644)
Expand All @@ -124,7 +123,7 @@ func (bwtsmT *blockWALTestSM) DamageBlock(t *rapid.T) {
t.Skip()
}
blockHash := rapid.SampledFrom(blockHashes).Example()
filePath := bwtsmT.pathFromHash(blockHash)
filePath := walPathFromHash(bwtsmT.factory.GetChainID(), blockHash)
data := make([]byte, 50)
_, err := rand.Read(data)
require.NoError(t, err)
Expand Down Expand Up @@ -188,10 +187,6 @@ func (bwtsmT *blockWALTestSM) getGoodBlockHashes() []state.BlockHash {
return result
}

func (bwtsmT *blockWALTestSM) pathFromHash(blockHash state.BlockHash) string {
return filepath.Join(constTestFolder, bwtsmT.factory.GetChainID().String(), blockWALFileName(blockHash))
}

func (bwtsmT *blockWALTestSM) invariantAllWrittenBlocksExist(t *rapid.T) {
for blockHash := range bwtsmT.blocks {
require.True(t, bwtsmT.bw.Contains(blockHash))
Expand All @@ -202,6 +197,7 @@ func TestBlockWALPropBased(t *testing.T) {
rapid.Check(t, func(t *rapid.T) {
sm := newBlockWALTestSM(t)
t.Repeat(rapid.StateMachineActions(sm))
sm.cleanup()
})
}

Expand Down
42 changes: 37 additions & 5 deletions packages/chain/statemanager/sm_gpa/sm_gpa_utils/block_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestBlockWALBasic(t *testing.T) {
require.Error(t, err)
}

// Check if existing WAL record is overwritten
func TestBlockWALOverwrite(t *testing.T) {
// Check if existing block in WAL is found even if it is not in a subfolder
func TestBlockWALNoSubfolder(t *testing.T) {
log := testlogger.NewLogger(t)
defer log.Sync()
defer cleanupAfterTest(t)
Expand All @@ -63,11 +63,39 @@ func TestBlockWALOverwrite(t *testing.T) {
err = wal.Write(blocks[i])
require.NoError(t, err)
}
pathFromHashFun := func(blockHash state.BlockHash) string {
pathNoSubfolderFromHashFun := func(blockHash state.BlockHash) string {
return filepath.Join(constTestFolder, factory.GetChainID().String(), blockWALFileName(blockHash))
}
file0Path := pathFromHashFun(blocks[0].Hash())
file1Path := pathFromHashFun(blocks[1].Hash())
for _, block := range blocks {
pathWithSubfolder := walPathFromHash(factory.GetChainID(), block.Hash())
pathNoSubfolder := pathNoSubfolderFromHashFun(block.Hash())
err = os.Rename(pathWithSubfolder, pathNoSubfolder)
require.NoError(t, err)
}
for _, block := range blocks {
require.True(t, wal.Contains(block.Hash()))
blockRead, err := wal.Read(block.Hash())
require.NoError(t, err)
CheckBlocksEqual(t, block, blockRead)
}
}

// Check if existing WAL record is overwritten
func TestBlockWALOverwrite(t *testing.T) {
log := testlogger.NewLogger(t)
defer log.Sync()
defer cleanupAfterTest(t)

factory := NewBlockFactory(t)
blocks := factory.GetBlocks(4, 1)
wal, err := NewBlockWAL(log, constTestFolder, factory.GetChainID(), mockBlockWALMetrics())
require.NoError(t, err)
for i := range blocks {
err = wal.Write(blocks[i])
require.NoError(t, err)
}
file0Path := walPathFromHash(factory.GetChainID(), blocks[0].Hash())
file1Path := walPathFromHash(factory.GetChainID(), blocks[1].Hash())
err = os.Rename(file1Path, file0Path)
require.NoError(t, err)
// block[1] is no longer in WAL
Expand Down Expand Up @@ -116,6 +144,10 @@ func TestBlockWALRestart(t *testing.T) {
}
}

func walPathFromHash(chainID isc.ChainID, blockHash state.BlockHash) string {
return filepath.Join(constTestFolder, chainID.String(), blockWALSubFolderName(blockHash), blockWALFileName(blockHash))
}

func cleanupAfterTest(t *testing.T) {
err := os.RemoveAll(constTestFolder)
require.NoError(t, err)
Expand Down

0 comments on commit 9153530

Please sign in to comment.