Skip to content

Commit

Permalink
Merge pull request #2659 from Juliusan/snapshot
Browse files Browse the repository at this point in the history
Snapshots
  • Loading branch information
jorgemmsilva authored Jul 11, 2023
2 parents aca984f + 5755a1b commit e2c6f94
Show file tree
Hide file tree
Showing 45 changed files with 2,542 additions and 426 deletions.
5 changes: 5 additions & 0 deletions components/chains/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func provide(c *dig.Container) error {
deps.NetworkProvider,
deps.TrustedNetworkManager,
deps.ChainStateDatabaseManager.ChainStateKVStore,
ParamsWAL.LoadToStore,
ParamsWAL.Enabled,
ParamsWAL.Path,
ParamsStateManager.BlockCacheMaxSize,
Expand All @@ -111,6 +112,10 @@ func provide(c *dig.Container) error {
ParamsStateManager.StateManagerTimerTickPeriod,
ParamsStateManager.PruningMinStatesToKeep,
ParamsStateManager.PruningMaxStatesToDelete,
ParamsSnapshotManager.Period,
ParamsSnapshotManager.LocalPath,
ParamsSnapshotManager.NetworkPaths,
ParamsSnapshotManager.UpdatePeriod,
deps.ChainRecordRegistryProvider,
deps.DKShareRegistryProvider,
deps.NodeIdentityProvider,
Expand Down
22 changes: 16 additions & 6 deletions components/chains/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type ParametersChains struct {
}

type ParametersWAL struct {
Enabled bool `default:"true" usage:"whether the \"write-ahead logging\" is enabled"`
Path string `default:"waspdb/wal" usage:"the path to the \"write-ahead logging\" folder"`
LoadToStore bool `default:"false" usage:"load blocks from \"write-ahead log\" to the store on node start-up"`
Enabled bool `default:"true" usage:"whether the \"write-ahead logging\" is enabled"`
Path string `default:"waspdb/wal" usage:"the path to the \"write-ahead logging\" folder"`
}

type ParametersValidator struct {
Expand All @@ -36,11 +37,19 @@ type ParametersStateManager struct {
PruningMaxStatesToDelete int `default:"1000" usage:"on single store pruning attempt at most this number of states will be deleted"`
}

type ParametersSnapshotManager struct {
Period uint32 `default:"0" usage:"how often state snapshots should be made: 1000 meaning \"every 1000th state\", 0 meaning \"making snapshots is disabled\""`
LocalPath string `default:"waspdb/snap" usage:"the path to the snapshots folder in this node's disk"`
NetworkPaths []string `default:"" usage:"the list of paths to the remote (http(s)) snapshot locations; each of listed locations must contain 'INDEX' file with list of snapshot files"`
UpdatePeriod time.Duration `default:"5m" usage:"how often known snapshots list should be updated"`
}

var (
ParamsChains = &ParametersChains{}
ParamsWAL = &ParametersWAL{}
ParamsValidator = &ParametersValidator{}
ParamsStateManager = &ParametersStateManager{}
ParamsChains = &ParametersChains{}
ParamsWAL = &ParametersWAL{}
ParamsValidator = &ParametersValidator{}
ParamsStateManager = &ParametersStateManager{}
ParamsSnapshotManager = &ParametersSnapshotManager{}
)

var params = &app.ComponentParams{
Expand All @@ -49,6 +58,7 @@ var params = &app.ComponentParams{
"wal": ParamsWAL,
"validator": ParamsValidator,
"stateManager": ParamsStateManager,
"snapshots": ParamsSnapshotManager,
},
Masked: nil,
}
25 changes: 9 additions & 16 deletions packages/chain/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/iotaledger/wasp/packages/chain/statemanager"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots"
"github.com/iotaledger/wasp/packages/cryptolib"
"github.com/iotaledger/wasp/packages/gpa"
"github.com/iotaledger/wasp/packages/isc"
Expand Down Expand Up @@ -262,7 +263,9 @@ func New(
processorConfig *processors.Config,
dkShareRegistryProvider registry.DKShareRegistryProvider,
consensusStateRegistry cmt_log.ConsensusStateRegistry,
recoverFromWAL bool,
blockWAL sm_gpa_utils.BlockWAL,
snapshotManager sm_snapshots.SnapshotManager,
listener ChainListener,
accessNodesFromNode []*cryptolib.PublicKey,
net peering.NetworkProvider,
Expand Down Expand Up @@ -339,7 +342,9 @@ func New(
cni.chainMetrics.Pipe.TrackPipeLen("node-serversUpdatedPipe", cni.serversUpdatedPipe.Len)
cni.chainMetrics.Pipe.TrackPipeLen("node-netRecvPipe", cni.netRecvPipe.Len)

cni.tryRecoverStoreFromWAL(chainStore, blockWAL)
if recoverFromWAL {
cni.recoverStoreFromWAL(chainStore, blockWAL)
}
cni.me = cni.pubKeyAsNodeID(nodeIdentity.GetPublicKey())
//
// Create sub-components.
Expand Down Expand Up @@ -398,11 +403,12 @@ func New(
peerPubKeys,
net,
blockWAL,
snapshotManager,
chainStore,
shutdownCoordinator.Nested("StateMgr"),
chainMetrics.StateManager,
chainMetrics.Pipe,
cni.log.Named("SM"),
cni.log,
smParameters,
)
if err != nil {
Expand Down Expand Up @@ -1222,20 +1228,7 @@ func (cni *chainNodeImpl) GetConsensusWorkflowStatus() ConsensusWorkflowStatus {
return &consensusWorkflowStatusImpl{}
}

func (cni *chainNodeImpl) tryRecoverStoreFromWAL(chainStore indexedstore.IndexedStore, chainWAL sm_gpa_utils.BlockWAL) {
defer func() {
if r := recover(); r != nil {
// Don't fail, if this crashes for some reason, that's an optional step.
cni.log.Warnf("TryRecoverStoreFromWAL: Failed to populate chain store from WAL: %v", r)
}
}()
//
// Check, if store is empty.
if _, err := chainStore.BlockByIndex(0); err == nil {
cni.log.Infof("TryRecoverStoreFromWAL: Skipping, because the state is not empty.")
return // Store is not empty, so we skip this.
}
cni.log.Infof("TryRecoverStoreFromWAL: Chain store is empty, will try to load blocks from the WAL.")
func (cni *chainNodeImpl) recoverStoreFromWAL(chainStore indexedstore.IndexedStore, chainWAL sm_gpa_utils.BlockWAL) {
//
// Load all the existing blocks from the WAL.
blocksAdded := 0
Expand Down
3 changes: 3 additions & 0 deletions packages/chain/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/iotaledger/wasp/packages/chain"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_gpa/sm_gpa_utils"
"github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots"
"github.com/iotaledger/wasp/packages/cryptolib"
"github.com/iotaledger/wasp/packages/isc"
"github.com/iotaledger/wasp/packages/kv/dict"
Expand Down Expand Up @@ -451,7 +452,9 @@ func newEnv(t *testing.T, n, f int, reliable bool) *testEnv {
coreprocessors.NewConfigWithCoreContracts().WithNativeContracts(inccounter.Processor),
dkShareProviders[i],
testutil.NewConsensusStateRegistry(),
false,
sm_gpa_utils.NewMockedTestBlockWAL(),
sm_snapshots.NewEmptySnapshotManager(),
chain.NewEmptyChainListener(),
[]*cryptolib.PublicKey{}, // Access nodes.
te.networkProviders[i],
Expand Down
37 changes: 24 additions & 13 deletions packages/chain/statemanager/sm_gpa/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,41 @@ import (

type blockFetcherImpl struct {
start time.Time
stateIndex uint32
commitment *state.L1Commitment
callbacks []blockRequestCallback
related []blockFetcher
}

var _ blockFetcher = &blockFetcherImpl{}

func newBlockFetcher(commitment *state.L1Commitment) blockFetcher {
func newBlockFetcher(stateIndex uint32, commitment *state.L1Commitment) blockFetcher {
return &blockFetcherImpl{
start: time.Now(),
stateIndex: stateIndex,
commitment: commitment,
callbacks: make([]blockRequestCallback, 0),
related: make([]blockFetcher, 0),
}
}

func newBlockFetcherWithCallback(commitment *state.L1Commitment, callback blockRequestCallback) blockFetcher {
result := newBlockFetcher(commitment)
func newBlockFetcherWithCallback(stateIndex uint32, commitment *state.L1Commitment, callback blockRequestCallback) blockFetcher {
result := newBlockFetcher(stateIndex, commitment)
result.addCallback(callback)
return result
}

func newBlockFetcherWithRelatedFetcher(commitment *state.L1Commitment, fetcher blockFetcher) blockFetcher {
result := newBlockFetcher(commitment)
newStateIndex := fetcher.getStateIndex() - 1
result := newBlockFetcher(newStateIndex, commitment)
result.addRelatedFetcher(fetcher)
return result
}

func (bfiT *blockFetcherImpl) getStateIndex() uint32 {
return bfiT.stateIndex
}

func (bfiT *blockFetcherImpl) getCommitment() *state.L1Commitment {
return bfiT.commitment
}
Expand All @@ -52,17 +59,21 @@ func (bfiT *blockFetcherImpl) addRelatedFetcher(fetcher blockFetcher) {
bfiT.related = append(bfiT.related, fetcher)
}

func (bfiT *blockFetcherImpl) notifyFetched(notifyFun func(blockFetcher) bool) {
if notifyFun(bfiT) {
for _, callback := range bfiT.callbacks {
if callback.isValid() {
callback.requestCompleted()
}
}
for _, fetcher := range bfiT.related {
fetcher.notifyFetched(notifyFun)
func (bfiT *blockFetcherImpl) commitAndNotifyFetched(commitFun func(blockFetcher) bool) {
if commitFun(bfiT) {
bfiT.notifyFetched(commitFun)
}
}

func (bfiT *blockFetcherImpl) notifyFetched(commitFun func(blockFetcher) bool) {
for _, callback := range bfiT.callbacks {
if callback.isValid() {
callback.requestCompleted()
}
}
for _, fetcher := range bfiT.related {
fetcher.commitAndNotifyFetched(commitFun)
}
}

func (bfiT *blockFetcherImpl) cleanCallbacks() {
Expand Down
16 changes: 15 additions & 1 deletion packages/chain/statemanager/sm_gpa/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,32 @@ package sm_gpa
import (
"time"

"github.com/iotaledger/wasp/packages/chain/statemanager/sm_snapshots"
"github.com/iotaledger/wasp/packages/state"
)

type StateManagerOutput interface {
addBlockCommitted(uint32, *state.L1Commitment)
addSnapshotToLoad(uint32, *state.L1Commitment)
setUpdateSnapshots()
TakeBlocksCommitted() []sm_snapshots.SnapshotInfo
TakeSnapshotToLoad() sm_snapshots.SnapshotInfo
TakeUpdateSnapshots() bool
}

type SnapshotExistsFun func(uint32, *state.L1Commitment) bool

type blockRequestCallback interface {
isValid() bool
requestCompleted()
}

type blockFetcher interface {
getStateIndex() uint32
getCommitment() *state.L1Commitment
getCallbacksCount() int
notifyFetched(func(blockFetcher) bool) // calls fun for this fetcher and each related recursively; fun for parent block is always called before fun for related block
commitAndNotifyFetched(func(blockFetcher) bool) // calls fun for this block, notifies waiting callbacks of this fetcher and does the same for each related fetcher recursively; fun for parent block is always called before fun for related block
notifyFetched(func(blockFetcher) bool) // notifies waiting callbacks of this fetcher, then calls fun and notifies waiting callbacks of all related fetchers recursively; fun for parent block is always called before fun for related block
addCallback(blockRequestCallback)
addRelatedFetcher(blockFetcher)
cleanCallbacks()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var _ BlockCache = &blockCache{}

func NewBlockCache(tp TimeProvider, maxCacheSize int, wal BlockWAL, metrics *metrics.ChainStateManagerMetrics, log *logger.Logger) (BlockCache, error) {
return &blockCache{
log: log.Named("bc"),
log: log.Named("BC"),
blocks: shrinkingmap.New[BlockKey, state.Block](),
maxCacheSize: maxCacheSize,
wal: wal,
Expand Down
16 changes: 8 additions & 8 deletions packages/chain/statemanager/sm_gpa/sm_gpa_utils/block_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type blockWAL struct {
metrics *metrics.ChainBlockWALMetrics
}

const constFileSuffix = ".blk"
const constBlockWALFileSuffix = ".blk"

func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metrics *metrics.ChainBlockWALMetrics) (BlockWAL, error) {
dir := filepath.Join(baseDir, chainID.String())
Expand All @@ -33,7 +33,7 @@ func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metric
}

result := &blockWAL{
WrappedLogger: logger.NewWrappedLogger(log),
WrappedLogger: logger.NewWrappedLogger(log.Named("WAL")),
dir: dir,
metrics: metrics,
}
Expand All @@ -45,7 +45,7 @@ func NewBlockWAL(log *logger.Logger, baseDir string, chainID isc.ChainID, metric
func (bwT *blockWAL) Write(block state.Block) error {
blockIndex := block.StateIndex()
commitment := block.L1Commitment()
fileName := fileName(commitment.BlockHash())
fileName := blockWALFileName(commitment.BlockHash())
filePath := filepath.Join(bwT.dir, fileName)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o666)
if err != nil {
Expand All @@ -69,12 +69,12 @@ func (bwT *blockWAL) Write(block state.Block) error {
}

func (bwT *blockWAL) Contains(blockHash state.BlockHash) bool {
_, err := os.Stat(filepath.Join(bwT.dir, fileName(blockHash)))
_, err := os.Stat(filepath.Join(bwT.dir, blockWALFileName(blockHash)))
return err == nil
}

func (bwT *blockWAL) Read(blockHash state.BlockHash) (state.Block, error) {
fileName := fileName(blockHash)
fileName := blockWALFileName(blockHash)
filePath := filepath.Join(bwT.dir, fileName)
block, err := blockFromFilePath(filePath)
if err != nil {
Expand All @@ -97,7 +97,7 @@ func (bwT *blockWAL) ReadAllByStateIndex(cb func(stateIndex uint32, block state.
if !dirEntry.Type().IsRegular() {
continue
}
if !strings.HasSuffix(dirEntry.Name(), constFileSuffix) {
if !strings.HasSuffix(dirEntry.Name(), constBlockWALFileSuffix) {
continue
}
filePath := filepath.Join(bwT.dir, dirEntry.Name())
Expand Down Expand Up @@ -160,6 +160,6 @@ func blockFromFilePath(filePath string) (state.Block, error) {
return block, nil
}

func fileName(blockHash state.BlockHash) string {
return blockHash.String() + constFileSuffix
func blockWALFileName(blockHash state.BlockHash) string {
return blockHash.String() + constBlockWALFileSuffix
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (bwtsmT *blockWALTestSM) getGoodBlockHashes() []state.BlockHash {
}

func (bwtsmT *blockWALTestSM) pathFromHash(blockHash state.BlockHash) string {
return filepath.Join(constTestFolder, bwtsmT.factory.GetChainID().String(), fileName(blockHash))
return filepath.Join(constTestFolder, bwtsmT.factory.GetChainID().String(), blockWALFileName(blockHash))
}

func (bwtsmT *blockWALTestSM) invariantAllWrittenBlocksExist(t *rapid.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestBlockWALOverwrite(t *testing.T) {
require.NoError(t, err)
}
pathFromHashFun := func(blockHash state.BlockHash) string {
return filepath.Join(constTestFolder, factory.GetChainID().String(), fileName(blockHash))
return filepath.Join(constTestFolder, factory.GetChainID().String(), blockWALFileName(blockHash))
}
file0Path := pathFromHashFun(blocks[0].Hash())
file1Path := pathFromHashFun(blocks[1].Hash())
Expand Down
Loading

0 comments on commit e2c6f94

Please sign in to comment.