From 4d1dd9c6be9b133b6a4ea74f87f30c9fa75c6a89 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Tue, 20 Aug 2024 18:04:01 +0100 Subject: [PATCH 01/10] datastream repopulation logic changes and removal of batch partially processed (#993) partially processed removed in favour of simply sealing the WIP batch on a restart --- eth/backend.go | 3 - zk/hermez_db/db.go | 25 ------- zk/stages/stage_dataStreamCatchup.go | 21 ++++-- zk/stages/stage_sequence_execute.go | 70 ++++++++++++++----- zk/stages/stage_sequence_execute_batch.go | 29 +------- zk/stages/stage_sequence_execute_blocks.go | 6 -- .../stage_sequence_execute_data_stream.go | 32 ++++++--- .../stage_sequence_execute_injected_batch.go | 3 +- zk/stages/stage_sequence_execute_unwind.go | 9 --- 9 files changed, 95 insertions(+), 103 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 52b61ba7622..471a1ed9eeb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -753,9 +753,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { latestHeader := backend.dataStream.GetHeader() if latestHeader.TotalEntries == 0 { log.Info("[dataStream] setting the stream progress to 0") - if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil { - return nil, err - } backend.preStartTasks.WarmUpDataStream = true } } diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 3130db81570..2adbf23cee7 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -1610,31 +1610,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } -func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1}) -} - -func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) -} - -func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) { - v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) - if err != nil { - return false, err - } - return len(v) > 0, nil -} - -func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error { - for batch := fromBatch; batch <= toBatch; batch++ { - if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil { - return err - } - } - return nil -} - func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index f3cae1b3ca3..199fe4d69d4 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/zk/sequencer" ) type DataStreamCatchupCfg struct { @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) - finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution) - if err != nil { - return 0, err + var ( + err error + finalBlockNumber uint64 + ) + + if sequencer.IsSequencer() { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream) + if err != nil { + return 0, err + } + } else { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, err + } } - previousProgress, err := stages.GetStageProgress(tx, stages.DataStream) + previousProgress, err := srv.GetHighestBlockNumber() if err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 2aa293410e1..8687593d66a 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -16,6 +16,7 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" "github.com/ledgerwatch/erigon/zk/utils" + "github.com/ledgerwatch/erigon/core/vm" ) func SpawnSequencingStage( @@ -46,11 +47,6 @@ func SpawnSequencingStage( return err } - isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch) - if err != nil { - return err - } - forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb) if err != nil { return err @@ -66,7 +62,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -79,29 +75,69 @@ func SpawnSequencingStage( if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil { return err } + if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil { + return err + } return sdb.tx.Commit() } - tryHaltSequencer(batchContext, batchState.batchNumber) - - if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { - return err + if batchState.hasExecutorForThisBatch { + // identify a stream gap i.e. a sequencer restart without an ack back from an executor. + // in this case we need to unwind the state so that we match the datastream height + streamProgress, err := stages.GetStageProgress(sdb.tx, stages.DataStream) + if err != nil { + return err + } + if streamProgress > 0 && streamProgress < executionAt { + block, err := rawdb.ReadBlockByNumber(sdb.tx, streamProgress) + if err != nil { + return err + } + log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", logPrefix), + "streamHeight", streamProgress, + "sequencerHeight", executionAt, + ) + u.UnwindTo(streamProgress, block.Hash()) + return nil + } } - batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed) + lastBatchSealed, err := checkIfLastBatchIsSealed(batchContext) if err != nil { return err } - if !isLastBatchPariallyProcessed { - // handle case where batch wasn't closed properly - // close it before starting a new one - // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted - // and datastream was regenerated - if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil { + if !lastBatchSealed { + log.Warn(fmt.Sprintf("[%s] Closing batch early due to partial processing", logPrefix), "batch", lastBatch) + + // we are in a state where the sequencer was perhaps restarted or unwound and the last batch + // that was partially processed needed to be closed, and we will have at least one block in it (because the last + // entry wasn't a batch end) + rawCounters, _, err := sdb.hermezDb.GetLatestBatchCounters(lastBatch) + if err != nil { + return err + } + latestCounters := vm.NewCountersFromUsedMap(rawCounters) + + endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) + + if err = runBatchLastSteps(batchContext, lastBatch, executionAt, endBatchCounters); err != nil { return err } + + return sdb.tx.Commit() + } + + tryHaltSequencer(batchContext, batchState.batchNumber) + + if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { + return err + } + + batchCounters, err := prepareBatchCounters(batchContext, batchState, nil) + if err != nil { + return err } if batchState.isL1Recovery() { diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 6f7dc4275da..ec0fec521c6 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -12,28 +12,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 { - if isLastBatchPariallyProcessed { - return lastBatch - } - - return lastBatch + 1 -} - -func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) { - var intermediateUsedCounters *vm.Counters - if isLastBatchPariallyProcessed { - intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber) - if err != nil { - return nil, err - } - if found { - intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap) - } else { - log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber) - } - } - +func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) { return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil } @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil { return false, err } - if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil { - return false, err - } if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil { return false, err } @@ -158,9 +134,6 @@ func runBatchLastSteps( if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { return err } - if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil { - return err - } // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 495e9114846..060c753a26d 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -247,12 +247,6 @@ func finaliseBlock( return nil, err } - // write partially processed - err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber) - if err != nil { - return nil, err - } - // this is actually account + storage indices stages quitCh := batchContext.ctx.Done() from := newNum.Uint64() diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 373511a1212..23f8c480e1f 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -9,6 +9,7 @@ import ( verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" ) type SequencerBatchStreamWriter struct { @@ -62,6 +63,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, err } + if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil { + return checkedVerifierBundles, err + } + // once we have handled the very first block we can update the last batch to be the current batch safely so that // we don't keep adding batch bookmarks in between blocks sbc.lastBatch = request.BatchNumber @@ -78,29 +83,38 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error { +func checkIfLastBatchIsSealed(batchContext *BatchContext) (bool, error) { + isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() + if err != nil { + return false, err + } + + return isLastEntryBatchEnd, nil +} + +func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) (bool, error) { isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return err + return false, err } if isLastEntryBatchEnd { - return nil + return false, nil } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber-1)) + ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber-1, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) if err != nil { - return err + return true, err } lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock) if err != nil { - return err + return true, err } root := lastBlock.Root() if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil { - return err + return true, err } - return nil + return true, nil } diff --git a/zk/stages/stage_sequence_execute_injected_batch.go b/zk/stages/stage_sequence_execute_injected_batch.go index 323b7a0f2f9..e1917b7748a 100644 --- a/zk/stages/stage_sequence_execute_injected_batch.go +++ b/zk/stages/stage_sequence_execute_injected_batch.go @@ -80,8 +80,7 @@ func processInjectedInitialBatch( return err } - // deleting the partially processed flag - return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber) + return err } func handleInjectedBatch( diff --git a/zk/stages/stage_sequence_execute_unwind.go b/zk/stages/stage_sequence_execute_unwind.go index 46c0a58846f..b8918aa33d7 100644 --- a/zk/stages/stage_sequence_execute_unwind.go +++ b/zk/stages/stage_sequence_execute_unwind.go @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil { return fmt.Errorf("truncate block batches error: %v", err) } - // only seq - if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - if lastBatchToKeepBeforeFrom == fromBatch { - if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - } return nil } From e7b7bc0299ba142358cd62a1c4ce1c618559da91 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Tue, 20 Aug 2024 20:02:59 +0100 Subject: [PATCH 02/10] fix for GER updates in the stream (#994) --- zk/datastream/client/stream_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 43f24eb599a..17f849c8fc1 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -317,7 +317,7 @@ LOOP: case *types.BatchStart: c.currentFork = parsedProto.ForkId c.entryChan <- parsedProto - case *types.GerUpdateProto: + case *types.GerUpdate: c.entryChan <- parsedProto case *types.BatchEnd: c.entryChan <- parsedProto From eb9c2729d61b9d25ba094814e9214cd4bf071caf Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Wed, 21 Aug 2024 10:40:28 +0100 Subject: [PATCH 03/10] Revert "datastream repopulation logic changes and removal of batch partially processed (#993)" (#996) This reverts commit 4d1dd9c6be9b133b6a4ea74f87f30c9fa75c6a89. --- eth/backend.go | 3 + zk/hermez_db/db.go | 25 +++++++ zk/stages/stage_dataStreamCatchup.go | 21 ++---- zk/stages/stage_sequence_execute.go | 70 +++++-------------- zk/stages/stage_sequence_execute_batch.go | 29 +++++++- zk/stages/stage_sequence_execute_blocks.go | 6 ++ .../stage_sequence_execute_data_stream.go | 32 +++------ .../stage_sequence_execute_injected_batch.go | 3 +- zk/stages/stage_sequence_execute_unwind.go | 9 +++ 9 files changed, 103 insertions(+), 95 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 471a1ed9eeb..52b61ba7622 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -753,6 +753,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { latestHeader := backend.dataStream.GetHeader() if latestHeader.TotalEntries == 0 { log.Info("[dataStream] setting the stream progress to 0") + if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil { + return nil, err + } backend.preStartTasks.WarmUpDataStream = true } } diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 2adbf23cee7..3130db81570 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -1610,6 +1610,31 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } +func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error { + return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1}) +} + +func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error { + return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) +} + +func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) { + v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) + if err != nil { + return false, err + } + return len(v) > 0, nil +} + +func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error { + for batch := fromBatch; batch <= toBatch; batch++ { + if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil { + return err + } + } + return nil +} + func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index 199fe4d69d4..f3cae1b3ca3 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -12,7 +12,6 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon/zk/sequencer" ) type DataStreamCatchupCfg struct { @@ -81,24 +80,12 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) - var ( - err error - finalBlockNumber uint64 - ) - - if sequencer.IsSequencer() { - finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream) - if err != nil { - return 0, err - } - } else { - finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution) - if err != nil { - return 0, err - } + finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, err } - previousProgress, err := srv.GetHighestBlockNumber() + previousProgress, err := stages.GetStageProgress(tx, stages.DataStream) if err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 8687593d66a..2aa293410e1 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -16,7 +16,6 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk" "github.com/ledgerwatch/erigon/zk/utils" - "github.com/ledgerwatch/erigon/core/vm" ) func SpawnSequencingStage( @@ -47,6 +46,11 @@ func SpawnSequencingStage( return err } + isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch) + if err != nil { + return err + } + forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb) if err != nil { return err @@ -62,7 +66,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -75,56 +79,6 @@ func SpawnSequencingStage( if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil { return err } - if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil { - return err - } - - return sdb.tx.Commit() - } - - if batchState.hasExecutorForThisBatch { - // identify a stream gap i.e. a sequencer restart without an ack back from an executor. - // in this case we need to unwind the state so that we match the datastream height - streamProgress, err := stages.GetStageProgress(sdb.tx, stages.DataStream) - if err != nil { - return err - } - if streamProgress > 0 && streamProgress < executionAt { - block, err := rawdb.ReadBlockByNumber(sdb.tx, streamProgress) - if err != nil { - return err - } - log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", logPrefix), - "streamHeight", streamProgress, - "sequencerHeight", executionAt, - ) - u.UnwindTo(streamProgress, block.Hash()) - return nil - } - } - - lastBatchSealed, err := checkIfLastBatchIsSealed(batchContext) - if err != nil { - return err - } - - if !lastBatchSealed { - log.Warn(fmt.Sprintf("[%s] Closing batch early due to partial processing", logPrefix), "batch", lastBatch) - - // we are in a state where the sequencer was perhaps restarted or unwound and the last batch - // that was partially processed needed to be closed, and we will have at least one block in it (because the last - // entry wasn't a batch end) - rawCounters, _, err := sdb.hermezDb.GetLatestBatchCounters(lastBatch) - if err != nil { - return err - } - latestCounters := vm.NewCountersFromUsedMap(rawCounters) - - endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) - - if err = runBatchLastSteps(batchContext, lastBatch, executionAt, endBatchCounters); err != nil { - return err - } return sdb.tx.Commit() } @@ -135,11 +89,21 @@ func SpawnSequencingStage( return err } - batchCounters, err := prepareBatchCounters(batchContext, batchState, nil) + batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed) if err != nil { return err } + if !isLastBatchPariallyProcessed { + // handle case where batch wasn't closed properly + // close it before starting a new one + // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted + // and datastream was regenerated + if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil { + return err + } + } + if batchState.isL1Recovery() { if cfg.zk.L1SyncStopBatch > 0 && batchState.batchNumber > cfg.zk.L1SyncStopBatch { log.Info(fmt.Sprintf("[%s] L1 recovery has completed!", logPrefix), "batch", batchState.batchNumber) diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index ec0fec521c6..6f7dc4275da 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -12,7 +12,28 @@ import ( "github.com/ledgerwatch/log/v3" ) -func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) { +func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 { + if isLastBatchPariallyProcessed { + return lastBatch + } + + return lastBatch + 1 +} + +func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) { + var intermediateUsedCounters *vm.Counters + if isLastBatchPariallyProcessed { + intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber) + if err != nil { + return nil, err + } + if found { + intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap) + } else { + log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber) + } + } + return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil } @@ -45,6 +66,9 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil { return false, err } + if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil { + return false, err + } if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil { return false, err } @@ -134,6 +158,9 @@ func runBatchLastSteps( if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { return err } + if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil { + return err + } // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 060c753a26d..495e9114846 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -247,6 +247,12 @@ func finaliseBlock( return nil, err } + // write partially processed + err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber) + if err != nil { + return nil, err + } + // this is actually account + storage indices stages quitCh := batchContext.ctx.Done() from := newNum.Uint64() diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 23f8c480e1f..373511a1212 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -9,7 +9,6 @@ import ( verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" ) type SequencerBatchStreamWriter struct { @@ -63,10 +62,6 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, err } - if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil { - return checkedVerifierBundles, err - } - // once we have handled the very first block we can update the last batch to be the current batch safely so that // we don't keep adding batch bookmarks in between blocks sbc.lastBatch = request.BatchNumber @@ -83,38 +78,29 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func checkIfLastBatchIsSealed(batchContext *BatchContext) (bool, error) { - isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() - if err != nil { - return false, err - } - - return isLastEntryBatchEnd, nil -} - -func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) (bool, error) { +func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error { isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return false, err + return err } if isLastEntryBatchEnd { - return false, nil + return nil } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber-1)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber-1, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber)) + ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) if err != nil { - return true, err + return err } lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock) if err != nil { - return true, err + return err } root := lastBlock.Root() if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil { - return true, err + return err } - return true, nil + return nil } diff --git a/zk/stages/stage_sequence_execute_injected_batch.go b/zk/stages/stage_sequence_execute_injected_batch.go index e1917b7748a..323b7a0f2f9 100644 --- a/zk/stages/stage_sequence_execute_injected_batch.go +++ b/zk/stages/stage_sequence_execute_injected_batch.go @@ -80,7 +80,8 @@ func processInjectedInitialBatch( return err } - return err + // deleting the partially processed flag + return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber) } func handleInjectedBatch( diff --git a/zk/stages/stage_sequence_execute_unwind.go b/zk/stages/stage_sequence_execute_unwind.go index b8918aa33d7..46c0a58846f 100644 --- a/zk/stages/stage_sequence_execute_unwind.go +++ b/zk/stages/stage_sequence_execute_unwind.go @@ -137,6 +137,15 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil { return fmt.Errorf("truncate block batches error: %v", err) } + // only seq + if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil { + return fmt.Errorf("truncate fork id error: %v", err) + } + if lastBatchToKeepBeforeFrom == fromBatch { + if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil { + return fmt.Errorf("truncate fork id error: %v", err) + } + } return nil } From f1beb0d1605cf37f52da2d2f2ca1715189b87602 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Wed, 21 Aug 2024 14:50:57 +0100 Subject: [PATCH 04/10] fix the long delay after datastream repopulation (#1004) --- zk/utils/utils.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zk/utils/utils.go b/zk/utils/utils.go index 5d4038337f4..3376b863c61 100644 --- a/zk/utils/utils.go +++ b/zk/utils/utils.go @@ -153,6 +153,7 @@ func GetBatchLocalExitRootFromSCStorage(batchNo uint64, db DbReader, tx kv.Tx) ( } stateReader := state.NewPlainState(tx, blockNo+1, systemcontracts.SystemContractCodeLookup["hermez"]) + defer stateReader.Close() rawLer, err := stateReader.ReadAccountStorage(state.GER_MANAGER_ADDRESS, 1, &state.GLOBAL_EXIT_ROOT_POS_1) if err != nil { return libcommon.Hash{}, err From 23e91c649be51012eacbba7bdd77ea66579b437b Mon Sep 17 00:00:00 2001 From: Kamen Stoykov <24619432+kstoykov@users.noreply.github.com> Date: Thu, 22 Aug 2024 14:18:43 +0300 Subject: [PATCH 05/10] fix verification response processing (#1009) * fix verification response processing * update verifier test --- zk/legacy_executor_verifier/executor.go | 22 +++++++++---------- zk/legacy_executor_verifier/executor_test.go | 8 +++---- .../legacy_executor_verifier.go | 10 +++++++-- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/zk/legacy_executor_verifier/executor.go b/zk/legacy_executor_verifier/executor.go index f60b16a6471..c86d8be8cc7 100644 --- a/zk/legacy_executor_verifier/executor.go +++ b/zk/legacy_executor_verifier/executor.go @@ -158,7 +158,7 @@ func (e *Executor) CheckOnline() bool { return true } -func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot common.Hash) (bool, *executor.ProcessBatchResponseV2, error) { +func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot common.Hash) (bool, *executor.ProcessBatchResponseV2, error, error) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -183,12 +183,12 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com if e.outputLocation != "" { asJson, err := json.Marshal(grpcRequest) if err != nil { - return false, nil, err + return false, nil, nil, err } file := path.Join(e.outputLocation, fmt.Sprintf("payload_%d.json", request.BatchNumber)) err = os.WriteFile(file, asJson, 0644) if err != nil { - return false, nil, err + return false, nil, nil, err } // now save the witness as a hex string along with the datastream @@ -197,20 +197,23 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com witnessAsHex := "0x" + hex.EncodeToString(p.Witness) err = os.WriteFile(witnessHexFile, []byte(witnessAsHex), 0644) if err != nil { - return false, nil, err + return false, nil, nil, err } dataStreamHexFile := path.Join(e.outputLocation, fmt.Sprintf("datastream_%d.hex", request.BatchNumber)) dataStreamAsHex := "0x" + hex.EncodeToString(p.DataStream) err = os.WriteFile(dataStreamHexFile, []byte(dataStreamAsHex), 0644) if err != nil { - return false, nil, err + return false, nil, nil, err } } resp, err := e.client.ProcessStatelessBatchV2(ctx, grpcRequest, grpc.MaxCallSendMsgSize(size), grpc.MaxCallRecvMsgSize(size)) if err != nil { - return false, nil, fmt.Errorf("failed to process stateless batch: %w", err) + return false, nil, nil, fmt.Errorf("failed to process stateless batch: %w", err) + } + if resp == nil { + return false, nil, nil, fmt.Errorf("nil response") } counters := map[string]int{ @@ -266,14 +269,11 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com log.Debug("Received response from executor", "grpcUrl", e.grpcUrl, "response", resp) - return responseCheck(resp, request) + ok, executorResponse, executorErr := responseCheck(resp, request) + return ok, executorResponse, executorErr, nil } func responseCheck(resp *executor.ProcessBatchResponseV2, request *VerifierRequest) (bool, *executor.ProcessBatchResponseV2, error) { - if resp == nil { - return false, nil, fmt.Errorf("nil response") - } - if resp.ForkId != request.ForkId { log.Warn("Executor fork id mismatch", "executor", resp.ForkId, "our", request.ForkId) } diff --git a/zk/legacy_executor_verifier/executor_test.go b/zk/legacy_executor_verifier/executor_test.go index 60d5db18579..253a5286a14 100644 --- a/zk/legacy_executor_verifier/executor_test.go +++ b/zk/legacy_executor_verifier/executor_test.go @@ -51,7 +51,7 @@ func TestExecutor_Verify(t *testing.T) { } for _, tt := range tests { - go func(tt struct { + func(tt struct { name string expectedStateRoot *common.Hash shouldError bool @@ -81,9 +81,9 @@ func TestExecutor_Verify(t *testing.T) { ContextId: "cdk-erigon-test", } - _, _, err := executor.Verify(payload, &VerifierRequest{StateRoot: *tt.expectedStateRoot}, common.Hash{}) - if (err != nil) != tt.wantErr { - t.Errorf("Executor.Verify() error = %v, wantErr %v", err, tt.wantErr) + _, _, executorErr, generalErr := executor.Verify(payload, &VerifierRequest{StateRoot: *tt.expectedStateRoot}, common.Hash{}) + if (executorErr != nil || generalErr != nil) != tt.wantErr { + t.Errorf("Executor.Verify() executorErr = %v, generalErr = %v, wantErr %v", executorErr, generalErr, tt.wantErr) } }) }(tt) diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index 4f48e58877d..c2717fd9860 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -182,7 +182,10 @@ func (v *LegacyExecutorVerifier) VerifySync(tx kv.Tx, request *VerifierRequest, return err } - _, _, executorErr := e.Verify(payload, request, previousBlock.Root()) + _, _, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root()) + if generalErr != nil { + return generalErr + } return executorErr } @@ -263,7 +266,10 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest, blockNumb return verifierBundle, err } - ok, executorResponse, executorErr := e.Verify(payload, request, previousBlock.Root()) + ok, executorResponse, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root()) + if generalErr != nil { + return verifierBundle, generalErr + } if executorErr != nil { if errors.Is(executorErr, ErrExecutorStateRootMismatch) { From ed9b447687bd12fcc8a60a85689cb2bd49b45598 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:30:39 +0100 Subject: [PATCH 06/10] =?UTF-8?q?datastream=20repopulation=20logic=20chang?= =?UTF-8?q?es=20and=20removal=20of=20batch=20partially=20=E2=80=A6=20(#992?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * datastream repopulation logic changes and removal of batch partially processed partially processed removed in favour of simply sealing the WIP batch on a restart * refactor of checking for stream gap in sequencing * do not connect to datastream during startup this causes timeout problems on new nodes that could spend a long time running L1 sync * refactor of batch end logic in sequencing * tidy up and comments around new datastream handling in sequencer --- eth/backend.go | 21 +------ zk/hermez_db/db.go | 25 --------- zk/stages/stage_dataStreamCatchup.go | 21 +++++-- zk/stages/stage_sequence_execute.go | 33 ++++++----- zk/stages/stage_sequence_execute_batch.go | 29 +--------- zk/stages/stage_sequence_execute_blocks.go | 6 -- .../stage_sequence_execute_data_stream.go | 55 ++++++++++++++----- .../stage_sequence_execute_injected_batch.go | 3 +- zk/stages/stage_sequence_execute_unwind.go | 9 --- 9 files changed, 78 insertions(+), 124 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 52b61ba7622..9091febfd71 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -753,9 +753,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { latestHeader := backend.dataStream.GetHeader() if latestHeader.TotalEntries == 0 { log.Info("[dataStream] setting the stream progress to 0") - if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil { - return nil, err - } backend.preStartTasks.WarmUpDataStream = true } } @@ -1048,23 +1045,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien // creates a datastream client with default parameters func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient { - // datastream - // Create client - log.Info("Starting datastream client...") - // retry connection - datastreamClient := client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) - - for i := 0; i < 30; i++ { - // Start client (connect to the server) - if err := datastreamClient.Start(); err != nil { - log.Warn(fmt.Sprintf("Error when starting datastream client, retrying... Error: %s", err)) - time.Sleep(1 * time.Second) - } else { - log.Info("Datastream client initialized...") - return datastreamClient - } - } - panic("datastream client could not be initialized") + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) } func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error { diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 3130db81570..2adbf23cee7 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -1610,31 +1610,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } -func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1}) -} - -func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) -} - -func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) { - v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) - if err != nil { - return false, err - } - return len(v) > 0, nil -} - -func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error { - for batch := fromBatch; batch <= toBatch; batch++ { - if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil { - return err - } - } - return nil -} - func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index f3cae1b3ca3..199fe4d69d4 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/zk/sequencer" ) type DataStreamCatchupCfg struct { @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) - finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution) - if err != nil { - return 0, err + var ( + err error + finalBlockNumber uint64 + ) + + if sequencer.IsSequencer() { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream) + if err != nil { + return 0, err + } + } else { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, err + } } - previousProgress, err := stages.GetStageProgress(tx, stages.DataStream) + previousProgress, err := srv.GetHighestBlockNumber() if err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 2aa293410e1..7bfdacaa6e3 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -46,11 +46,6 @@ func SpawnSequencingStage( return err } - isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch) - if err != nil { - return err - } - forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb) if err != nil { return err @@ -66,7 +61,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -79,31 +74,35 @@ func SpawnSequencingStage( if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil { return err } + if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil { + return err + } return sdb.tx.Commit() } + // handle cases where the last batch wasn't committed to the data stream. + // this could occur because we're migrating from an RPC node to a sequencer + // or because the sequencer was restarted and not all processes completed (like waiting from remote executor) + // we consider the data stream as verified by the executor so treat it as "safe" and unwind blocks beyond there + // if we identify any. During normal operation this function will simply check and move on without performing + // any action. + isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u) + if err != nil || isUnwinding { + return err + } + tryHaltSequencer(batchContext, batchState.batchNumber) if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { return err } - batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed) + batchCounters, err := prepareBatchCounters(batchContext, batchState, nil) if err != nil { return err } - if !isLastBatchPariallyProcessed { - // handle case where batch wasn't closed properly - // close it before starting a new one - // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted - // and datastream was regenerated - if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil { - return err - } - } - if batchState.isL1Recovery() { if cfg.zk.L1SyncStopBatch > 0 && batchState.batchNumber > cfg.zk.L1SyncStopBatch { log.Info(fmt.Sprintf("[%s] L1 recovery has completed!", logPrefix), "batch", batchState.batchNumber) diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 6f7dc4275da..ec0fec521c6 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -12,28 +12,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 { - if isLastBatchPariallyProcessed { - return lastBatch - } - - return lastBatch + 1 -} - -func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) { - var intermediateUsedCounters *vm.Counters - if isLastBatchPariallyProcessed { - intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber) - if err != nil { - return nil, err - } - if found { - intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap) - } else { - log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber) - } - } - +func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) { return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil } @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil { return false, err } - if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil { - return false, err - } if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil { return false, err } @@ -158,9 +134,6 @@ func runBatchLastSteps( if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { return err } - if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil { - return err - } // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 495e9114846..060c753a26d 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -247,12 +247,6 @@ func finaliseBlock( return nil, err } - // write partially processed - err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber) - if err != nil { - return nil, err - } - // this is actually account + storage indices stages quitCh := batchContext.ctx.Done() from := newNum.Uint64() diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 373511a1212..5e2849ce44f 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -7,8 +7,10 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/zk/datastream/server" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" - "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/core/vm" ) type SequencerBatchStreamWriter struct { @@ -62,6 +64,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, err } + if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil { + return checkedVerifierBundles, err + } + // once we have handled the very first block we can update the last batch to be the current batch safely so that // we don't keep adding batch bookmarks in between blocks sbc.lastBatch = request.BatchNumber @@ -78,29 +84,52 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error { +func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) { isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return err + return false, err } if isLastEntryBatchEnd { - return nil + return false, nil } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) + lastBatch := batchState.batchNumber - 1 + + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch)) + + rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch) if err != nil { - return err + return false, err + } + + latestCounters := vm.NewCountersFromUsedMap(rawCounters) + + endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) + + if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil { + return false, err } - lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock) + // now check if there is a gap in the stream vs the state db + streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream) if err != nil { - return err + return false, err } - root := lastBlock.Root() - if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil { - return err + + unwinding := false + if streamProgress > 0 && streamProgress < thisBlock { + block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress) + if err != nil { + return true, err + } + log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), + "streamHeight", streamProgress, + "sequencerHeight", thisBlock, + ) + u.UnwindTo(streamProgress, block.Hash()) + unwinding = true } - return nil + + return unwinding, nil } diff --git a/zk/stages/stage_sequence_execute_injected_batch.go b/zk/stages/stage_sequence_execute_injected_batch.go index 323b7a0f2f9..e1917b7748a 100644 --- a/zk/stages/stage_sequence_execute_injected_batch.go +++ b/zk/stages/stage_sequence_execute_injected_batch.go @@ -80,8 +80,7 @@ func processInjectedInitialBatch( return err } - // deleting the partially processed flag - return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber) + return err } func handleInjectedBatch( diff --git a/zk/stages/stage_sequence_execute_unwind.go b/zk/stages/stage_sequence_execute_unwind.go index 46c0a58846f..b8918aa33d7 100644 --- a/zk/stages/stage_sequence_execute_unwind.go +++ b/zk/stages/stage_sequence_execute_unwind.go @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil { return fmt.Errorf("truncate block batches error: %v", err) } - // only seq - if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - if lastBatchToKeepBeforeFrom == fromBatch { - if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - } return nil } From b277b26404b71f9bbbcaa0a4276f31bcab4c52e2 Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Thu, 22 Aug 2024 16:15:05 +0300 Subject: [PATCH 07/10] fix jumpn --- core/vm/contract.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/vm/contract.go b/core/vm/contract.go index e69d0b547de..a68887ef18d 100644 --- a/core/vm/contract.go +++ b/core/vm/contract.go @@ -102,7 +102,10 @@ func (c *Contract) validJumpdest(dest *uint256.Int) (bool, bool) { if c.skipAnalysis { return true, false } - return c.isCode(udest), true + /* + * zkEVM doesn't do dynamic jumpdest analysis. So PUSHN is not considered. + */ + return true, false } func isCodeFromAnalysis(analysis []uint64, udest uint64) bool { From ff806a46f2961dbbb9b22fdb9d435568b5cf1b9a Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Thu, 22 Aug 2024 16:15:40 +0300 Subject: [PATCH 08/10] hack --- core/vm/common.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/vm/common.go b/core/vm/common.go index 46009cd3ffb..2c916683356 100644 --- a/core/vm/common.go +++ b/core/vm/common.go @@ -67,6 +67,9 @@ func getData(data []byte, start uint64, size uint64) []byte { // getDataBig returns a slice from the data based on the start and size and pads // up to size with zero's. This function is overflow safe. func getDataBig(data []byte, start *uint256.Int, size uint64) []byte { + if size >= 1*1024*1024*1024 { + size = 1 * 1024 * 1024 * 1024 + } start64, overflow := start.Uint64WithOverflow() if overflow { start64 = ^uint64(0) From a61220db72cbe3b24555af2962d761f844abb044 Mon Sep 17 00:00:00 2001 From: hexoscott <70711990+hexoscott@users.noreply.github.com> Date: Fri, 23 Aug 2024 13:45:34 +0100 Subject: [PATCH 09/10] speed up L1 recovery process (#1017) --- zk/stages/stage_sequence_execute.go | 38 +++++++++++++++++------------ 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 7bfdacaa6e3..545ee386864 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -87,9 +87,11 @@ func SpawnSequencingStage( // we consider the data stream as verified by the executor so treat it as "safe" and unwind blocks beyond there // if we identify any. During normal operation this function will simply check and move on without performing // any action. - isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u) - if err != nil || isUnwinding { - return err + if !batchState.isL1Recovery() { + isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u) + if err != nil || isUnwinding { + return err + } } tryHaltSequencer(batchContext, batchState.batchNumber) @@ -220,12 +222,11 @@ func SpawnSequencingStage( if err != nil { return err } - } - - if len(batchState.blockState.transactionsForInclusion) == 0 { - time.Sleep(250 * time.Millisecond) - } else { - log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) + if len(batchState.blockState.transactionsForInclusion) == 0 { + time.Sleep(250 * time.Millisecond) + } else { + log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) + } } for i, transaction := range batchState.blockState.transactionsForInclusion { @@ -334,21 +335,26 @@ func SpawnSequencingStage( batchState.onBuiltBlock(blockNumber) // commit block data here so it is accessible in other threads - if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { - return errCommitAndStart + if !batchState.isL1Recovery() { + if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { + return errCommitAndStart + } + defer sdb.tx.Rollback() } - defer sdb.tx.Rollback() cfg.legacyVerifier.StartAsyncVerification(batchState.forkId, batchState.batchNumber, block.Root(), batchCounters.CombineCollectorsNoChanges().UsedAsMap(), batchState.builtBlocks, batchState.hasExecutorForThisBatch, batchContext.cfg.zk.SequencerBatchVerificationTimeout) // check for new responses from the verifier needsUnwind, err := updateStreamAndCheckRollback(batchContext, batchState, streamWriter, u) - // lets commit everything after updateStreamAndCheckRollback no matter of its result - if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { - return errCommitAndStart + // lets commit everything after updateStreamAndCheckRollback no matter of its result unless + // we're in L1 recovery where losing some blocks on restart doesn't matter + if !batchState.isL1Recovery() { + if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { + return errCommitAndStart + } + defer sdb.tx.Rollback() } - defer sdb.tx.Rollback() // check the return values of updateStreamAndCheckRollback if err != nil || needsUnwind { From 8c3b1fa7706e03295143dca6990806b6745fc3e1 Mon Sep 17 00:00:00 2001 From: Valentin Staykov <79150443+V-Staykov@users.noreply.github.com> Date: Mon, 26 Aug 2024 13:08:43 +0300 Subject: [PATCH 10/10] get block height for batchend from datastream (#1028) --- zk/datastream/client/stream_client.go | 4 +- .../datastream-correctness-check/main.go | 181 +++++++++--------- zk/stages/stage_batches.go | 5 + zk/stages/stage_sequence_execute.go | 3 +- zk/stages/stage_sequence_execute_batch.go | 12 +- .../stage_sequence_execute_data_stream.go | 9 +- 6 files changed, 109 insertions(+), 105 deletions(-) diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 17f849c8fc1..f7362743284 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -186,10 +186,10 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu } file, err := c.readFileEntry() if err != nil { - return fmt.Errorf("error reading file entry: %v", err) + return fmt.Errorf("reading file entry: %v", err) } if err := function(file); err != nil { - return fmt.Errorf("error executing function: %v", err) + return fmt.Errorf("executing function: %v", err) } count++ diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go index 5d637753700..4def0d775d7 100644 --- a/zk/debug_tools/datastream-correctness-check/main.go +++ b/zk/debug_tools/datastream-correctness-check/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/ledgerwatch/erigon/zk/datastream/client" "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" @@ -27,123 +28,115 @@ func main() { } // create bookmark - bookmark := types.NewBookmarkProto(5191325, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) + bookmark := types.NewBookmarkProto(0, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) - // var previousFile *types.FileEntry + var previousFile *types.FileEntry + var lastBlockRoot common.Hash progressBatch := uint64(0) progressBlock := uint64(0) - printFunction := func(file *types.FileEntry) error { + function := func(file *types.FileEntry) error { switch file.EntryType { - case types.EntryTypeL2Block: - l2Block, err := types.UnmarshalL2Block(file.Data) + case types.EntryTypeL2BlockEnd: + if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Block && previousFile.EntryType != types.EntryTypeL2Tx { + return fmt.Errorf("unexpected entry type before l2 block end: %v", previousFile.EntryType) + } + case types.BookmarkEntryType: + bookmark, err := types.UnmarshalBookmark(file.Data) if err != nil { return err } - fmt.Println("L2Block: ", l2Block.L2BlockNumber, "batch", l2Block.BatchNumber, "stateRoot", l2Block.StateRoot.Hex()) - if l2Block.L2BlockNumber > 5191335 { - return fmt.Errorf("stop") + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + progressBatch = bookmark.Value + if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd { + return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value) + } + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + progressBlock = bookmark.Value + if previousFile != nil && + previousFile.EntryType != types.EntryTypeBatchStart && + previousFile.EntryType != types.EntryTypeL2BlockEnd { + return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value) + } + } + case types.EntryTypeBatchStart: + batchStart, err := types.UnmarshalBatchStart(file.Data) + if err != nil { + return err + } + progressBatch = batchStart.Number + if previousFile != nil { + if previousFile.EntryType != types.BookmarkEntryType { + return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number) + } else { + bookmark, err := types.UnmarshalBookmark(previousFile.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number) + } + } } case types.EntryTypeBatchEnd: + if previousFile != nil && + previousFile.EntryType != types.EntryTypeL2BlockEnd && + previousFile.EntryType != types.EntryTypeL2Tx && + previousFile.EntryType != types.EntryTypeL2Block && + previousFile.EntryType != types.EntryTypeBatchStart { + return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType) + } batchEnd, err := types.UnmarshalBatchEnd(file.Data) if err != nil { return err } - fmt.Println("BatchEnd: ", batchEnd.Number, "stateRoot", batchEnd.StateRoot.Hex()) + if batchEnd.Number != progressBatch { + return fmt.Errorf("batch end number mismatch: %d, expected: %d", batchEnd.Number, progressBatch) + } + if batchEnd.StateRoot != lastBlockRoot { + return fmt.Errorf("batch end state root mismatch: %x, expected: %x", batchEnd.StateRoot, lastBlockRoot) + } + case types.EntryTypeL2Tx: + if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block { + return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType) + } + case types.EntryTypeL2Block: + l2Block, err := types.UnmarshalL2Block(file.Data) + if err != nil { + return err + } + progressBlock = l2Block.L2BlockNumber + if previousFile != nil { + if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() { + return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber) + } else { + bookmark, err := types.UnmarshalBookmark(previousFile.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber) + } + } + } + lastBlockRoot = l2Block.StateRoot + case types.EntryTypeGerUpdate: + return nil + default: + return fmt.Errorf("unexpected entry type: %v", file.EntryType) } + previousFile = file return nil } - // function := func(file *types.FileEntry) error { - // switch file.EntryType { - // case types.EntryTypeL2BlockEnd: - // if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Block && previousFile.EntryType != types.EntryTypeL2Tx { - // return fmt.Errorf("unexpected entry type before l2 block end: %v", previousFile.EntryType) - // } - // case types.BookmarkEntryType: - // bookmark, err := types.UnmarshalBookmark(file.Data) - // if err != nil { - // return err - // } - // if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - // progressBatch = bookmark.Value - // if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd { - // return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value) - // } - // } - // if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - // progressBlock = bookmark.Value - // if previousFile != nil && - // previousFile.EntryType != types.EntryTypeBatchStart && - // previousFile.EntryType != types.EntryTypeL2BlockEnd { - // return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value) - // } - // } - // case types.EntryTypeBatchStart: - // batchStart, err := types.UnmarshalBatchStart(file.Data) - // if err != nil { - // return err - // } - // progressBatch = batchStart.Number - // if previousFile != nil { - // if previousFile.EntryType != types.BookmarkEntryType { - // return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number) - // } else { - // bookmark, err := types.UnmarshalBookmark(previousFile.Data) - // if err != nil { - // return err - // } - // if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - // return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number) - // } - // } - // } - // case types.EntryTypeBatchEnd: - // if previousFile != nil && - // previousFile.EntryType != types.EntryTypeL2BlockEnd && - // previousFile.EntryType != types.EntryTypeBatchStart { - // return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType) - // } - // case types.EntryTypeL2Tx: - // if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block { - // return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType) - // } - // case types.EntryTypeL2Block: - // l2Block, err := types.UnmarshalL2Block(file.Data) - // if err != nil { - // return err - // } - // progressBlock = l2Block.L2BlockNumber - // if previousFile != nil { - // if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() { - // return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber) - // } else { - // bookmark, err := types.UnmarshalBookmark(previousFile.Data) - // if err != nil { - // return err - // } - // if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - // return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber) - // } - - // } - // } - // case types.EntryTypeGerUpdate: - // return nil - // default: - // return fmt.Errorf("unexpected entry type: %v", file.EntryType) - // } - - // previousFile = file - // return nil - // } // send start command - err = client.ExecutePerFile(bookmark, printFunction) + err = client.ExecutePerFile(bookmark, function) fmt.Println("progress block: ", progressBlock) fmt.Println("progress batch: ", progressBatch) if err != nil { - panic(fmt.Sprintf("found an error: %s", err)) + panic(err) } } diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 454f6f71703..cbce1c28be9 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -218,6 +218,7 @@ func SpawnStageBatches( } lastHash := emptyHash + lastBlockRoot := emptyHash atLeastOneBlockWritten := false startTime := time.Now() @@ -252,6 +253,9 @@ LOOP: } } case *types.BatchEnd: + if entry.StateRoot != lastBlockRoot { + log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot)) + } if err := writeBatchEnd(hermezDb, entry); err != nil { return fmt.Errorf("write batch end error: %v", err) } @@ -360,6 +364,7 @@ LOOP: } lastHash = entry.L2Blockhash + lastBlockRoot = entry.StateRoot atLeastOneBlockWritten = true lastBlockHeight = entry.L2BlockNumber diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 545ee386864..1e18d5ff129 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -307,8 +307,7 @@ func SpawnSequencingStage( } } - block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters) - if err != nil { + if block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters); err != nil { return err } diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index ec0fec521c6..23bb025ef01 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -145,11 +145,16 @@ func runBatchLastSteps( return err } - lastBlock, err := batchContext.sdb.hermezDb.GetHighestBlockInBatch(thisBatch) + // get the last block number written to batch + // we should match it's state root in batch end entry + // if we get the last block in DB errors may occur since we have DB unwinds AFTER we commit batch end to datastream + // the last block written to the datastream before batch end should be the correct one once we are here + // if it is not, we have a bigger problem + lastBlockNumber, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber() if err != nil { return err } - block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlock) + block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlockNumber) if err != nil { return err } @@ -159,8 +164,7 @@ func runBatchLastSteps( } // the unwind of this value is handed by UnwindExecutionStageDbWrites - _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlock) - if err != nil { + if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlockNumber); err != nil { return fmt.Errorf("writing plain state version: %w", err) } diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 5e2849ce44f..c57e91b04d5 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -5,12 +5,12 @@ import ( "fmt" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk/datastream/server" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" "github.com/ledgerwatch/log/v3" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/erigon/eth/stagedsync" - "github.com/ledgerwatch/erigon/core/vm" ) type SequencerBatchStreamWriter struct { @@ -106,6 +106,9 @@ func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, th latestCounters := vm.NewCountersFromUsedMap(rawCounters) endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) + if err != nil { + return false, err + } if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil { return false, err