diff --git a/core/vm/common.go b/core/vm/common.go index 46009cd3ffb..2c916683356 100644 --- a/core/vm/common.go +++ b/core/vm/common.go @@ -67,6 +67,9 @@ func getData(data []byte, start uint64, size uint64) []byte { // getDataBig returns a slice from the data based on the start and size and pads // up to size with zero's. This function is overflow safe. func getDataBig(data []byte, start *uint256.Int, size uint64) []byte { + if size >= 1*1024*1024*1024 { + size = 1 * 1024 * 1024 * 1024 + } start64, overflow := start.Uint64WithOverflow() if overflow { start64 = ^uint64(0) diff --git a/core/vm/contract.go b/core/vm/contract.go index e69d0b547de..a68887ef18d 100644 --- a/core/vm/contract.go +++ b/core/vm/contract.go @@ -102,7 +102,10 @@ func (c *Contract) validJumpdest(dest *uint256.Int) (bool, bool) { if c.skipAnalysis { return true, false } - return c.isCode(udest), true + /* + * zkEVM doesn't do dynamic jumpdest analysis. So PUSHN is not considered. + */ + return true, false } func isCodeFromAnalysis(analysis []uint64, udest uint64) bool { diff --git a/eth/backend.go b/eth/backend.go index a2f44f44d44..f8dea7c4439 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -754,9 +754,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { latestHeader := backend.dataStream.GetHeader() if latestHeader.TotalEntries == 0 { log.Info("[dataStream] setting the stream progress to 0") - if err := stages.SaveStageProgress(tx, stages.DataStream, 0); err != nil { - return nil, err - } backend.preStartTasks.WarmUpDataStream = true } } @@ -1054,23 +1051,7 @@ func newEtherMan(cfg *ethconfig.Config, l2ChainName, url string) *etherman.Clien // creates a datastream client with default parameters func initDataStreamClient(ctx context.Context, cfg *ethconfig.Zk, latestForkId uint16) *client.StreamClient { - // datastream - // Create client - log.Info("Starting datastream client...") - // retry connection - datastreamClient := client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) - - for i := 0; i < 30; i++ { - // Start client (connect to the server) - if err := datastreamClient.Start(); err != nil { - log.Warn(fmt.Sprintf("Error when starting datastream client, retrying... Error: %s", err)) - time.Sleep(1 * time.Second) - } else { - log.Info("Datastream client initialized...") - return datastreamClient - } - } - panic("datastream client could not be initialized") + return client.NewClient(ctx, cfg.L2DataStreamerUrl, cfg.DatastreamVersion, cfg.L2DataStreamerTimeout, latestForkId) } func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error { diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index 43f24eb599a..f7362743284 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -186,10 +186,10 @@ func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function fu } file, err := c.readFileEntry() if err != nil { - return fmt.Errorf("error reading file entry: %v", err) + return fmt.Errorf("reading file entry: %v", err) } if err := function(file); err != nil { - return fmt.Errorf("error executing function: %v", err) + return fmt.Errorf("executing function: %v", err) } count++ @@ -317,7 +317,7 @@ LOOP: case *types.BatchStart: c.currentFork = parsedProto.ForkId c.entryChan <- parsedProto - case *types.GerUpdateProto: + case *types.GerUpdate: c.entryChan <- parsedProto case *types.BatchEnd: c.entryChan <- parsedProto diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go index 5d637753700..4def0d775d7 100644 --- a/zk/debug_tools/datastream-correctness-check/main.go +++ b/zk/debug_tools/datastream-correctness-check/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/ledgerwatch/erigon/zk/datastream/client" "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" @@ -27,123 +28,115 @@ func main() { } // create bookmark - bookmark := types.NewBookmarkProto(5191325, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) + bookmark := types.NewBookmarkProto(0, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) - // var previousFile *types.FileEntry + var previousFile *types.FileEntry + var lastBlockRoot common.Hash progressBatch := uint64(0) progressBlock := uint64(0) - printFunction := func(file *types.FileEntry) error { + function := func(file *types.FileEntry) error { switch file.EntryType { - case types.EntryTypeL2Block: - l2Block, err := types.UnmarshalL2Block(file.Data) + case types.EntryTypeL2BlockEnd: + if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Block && previousFile.EntryType != types.EntryTypeL2Tx { + return fmt.Errorf("unexpected entry type before l2 block end: %v", previousFile.EntryType) + } + case types.BookmarkEntryType: + bookmark, err := types.UnmarshalBookmark(file.Data) if err != nil { return err } - fmt.Println("L2Block: ", l2Block.L2BlockNumber, "batch", l2Block.BatchNumber, "stateRoot", l2Block.StateRoot.Hex()) - if l2Block.L2BlockNumber > 5191335 { - return fmt.Errorf("stop") + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + progressBatch = bookmark.Value + if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd { + return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value) + } + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + progressBlock = bookmark.Value + if previousFile != nil && + previousFile.EntryType != types.EntryTypeBatchStart && + previousFile.EntryType != types.EntryTypeL2BlockEnd { + return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value) + } + } + case types.EntryTypeBatchStart: + batchStart, err := types.UnmarshalBatchStart(file.Data) + if err != nil { + return err + } + progressBatch = batchStart.Number + if previousFile != nil { + if previousFile.EntryType != types.BookmarkEntryType { + return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number) + } else { + bookmark, err := types.UnmarshalBookmark(previousFile.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number) + } + } } case types.EntryTypeBatchEnd: + if previousFile != nil && + previousFile.EntryType != types.EntryTypeL2BlockEnd && + previousFile.EntryType != types.EntryTypeL2Tx && + previousFile.EntryType != types.EntryTypeL2Block && + previousFile.EntryType != types.EntryTypeBatchStart { + return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType) + } batchEnd, err := types.UnmarshalBatchEnd(file.Data) if err != nil { return err } - fmt.Println("BatchEnd: ", batchEnd.Number, "stateRoot", batchEnd.StateRoot.Hex()) + if batchEnd.Number != progressBatch { + return fmt.Errorf("batch end number mismatch: %d, expected: %d", batchEnd.Number, progressBatch) + } + if batchEnd.StateRoot != lastBlockRoot { + return fmt.Errorf("batch end state root mismatch: %x, expected: %x", batchEnd.StateRoot, lastBlockRoot) + } + case types.EntryTypeL2Tx: + if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block { + return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType) + } + case types.EntryTypeL2Block: + l2Block, err := types.UnmarshalL2Block(file.Data) + if err != nil { + return err + } + progressBlock = l2Block.L2BlockNumber + if previousFile != nil { + if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() { + return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber) + } else { + bookmark, err := types.UnmarshalBookmark(previousFile.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber) + } + } + } + lastBlockRoot = l2Block.StateRoot + case types.EntryTypeGerUpdate: + return nil + default: + return fmt.Errorf("unexpected entry type: %v", file.EntryType) } + previousFile = file return nil } - // function := func(file *types.FileEntry) error { - // switch file.EntryType { - // case types.EntryTypeL2BlockEnd: - // if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Block && previousFile.EntryType != types.EntryTypeL2Tx { - // return fmt.Errorf("unexpected entry type before l2 block end: %v", previousFile.EntryType) - // } - // case types.BookmarkEntryType: - // bookmark, err := types.UnmarshalBookmark(file.Data) - // if err != nil { - // return err - // } - // if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - // progressBatch = bookmark.Value - // if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd { - // return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value) - // } - // } - // if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - // progressBlock = bookmark.Value - // if previousFile != nil && - // previousFile.EntryType != types.EntryTypeBatchStart && - // previousFile.EntryType != types.EntryTypeL2BlockEnd { - // return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value) - // } - // } - // case types.EntryTypeBatchStart: - // batchStart, err := types.UnmarshalBatchStart(file.Data) - // if err != nil { - // return err - // } - // progressBatch = batchStart.Number - // if previousFile != nil { - // if previousFile.EntryType != types.BookmarkEntryType { - // return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number) - // } else { - // bookmark, err := types.UnmarshalBookmark(previousFile.Data) - // if err != nil { - // return err - // } - // if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH { - // return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number) - // } - // } - // } - // case types.EntryTypeBatchEnd: - // if previousFile != nil && - // previousFile.EntryType != types.EntryTypeL2BlockEnd && - // previousFile.EntryType != types.EntryTypeBatchStart { - // return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType) - // } - // case types.EntryTypeL2Tx: - // if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block { - // return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType) - // } - // case types.EntryTypeL2Block: - // l2Block, err := types.UnmarshalL2Block(file.Data) - // if err != nil { - // return err - // } - // progressBlock = l2Block.L2BlockNumber - // if previousFile != nil { - // if previousFile.EntryType != types.BookmarkEntryType && !previousFile.IsL2BlockEnd() { - // return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber) - // } else { - // bookmark, err := types.UnmarshalBookmark(previousFile.Data) - // if err != nil { - // return err - // } - // if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { - // return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber) - // } - - // } - // } - // case types.EntryTypeGerUpdate: - // return nil - // default: - // return fmt.Errorf("unexpected entry type: %v", file.EntryType) - // } - - // previousFile = file - // return nil - // } // send start command - err = client.ExecutePerFile(bookmark, printFunction) + err = client.ExecutePerFile(bookmark, function) fmt.Println("progress block: ", progressBlock) fmt.Println("progress batch: ", progressBatch) if err != nil { - panic(fmt.Sprintf("found an error: %s", err)) + panic(err) } } diff --git a/zk/hermez_db/db.go b/zk/hermez_db/db.go index 14c87088a62..f545e35a7dd 100644 --- a/zk/hermez_db/db.go +++ b/zk/hermez_db/db.go @@ -1614,31 +1614,6 @@ func (db *HermezDbReader) GetInvalidBatch(batchNo uint64) (bool, error) { return len(v) > 0, nil } -func (db *HermezDb) WriteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Put(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo), []byte{1}) -} - -func (db *HermezDb) DeleteIsBatchPartiallyProcessed(batchNo uint64) error { - return db.tx.Delete(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) -} - -func (db *HermezDbReader) GetIsBatchPartiallyProcessed(batchNo uint64) (bool, error) { - v, err := db.tx.GetOne(BATCH_PARTIALLY_PROCESSED, Uint64ToBytes(batchNo)) - if err != nil { - return false, err - } - return len(v) > 0, nil -} - -func (db *HermezDb) TruncateIsBatchPartiallyProcessed(fromBatch, toBatch uint64) error { - for batch := fromBatch; batch <= toBatch; batch++ { - if err := db.DeleteIsBatchPartiallyProcessed(batch); err != nil { - return err - } - } - return nil -} - func (db *HermezDb) WriteLocalExitRootForBatchNo(batchNo uint64, root common.Hash) error { return db.tx.Put(LOCAL_EXIT_ROOTS, Uint64ToBytes(batchNo), root.Bytes()) } diff --git a/zk/legacy_executor_verifier/executor.go b/zk/legacy_executor_verifier/executor.go index f60b16a6471..c86d8be8cc7 100644 --- a/zk/legacy_executor_verifier/executor.go +++ b/zk/legacy_executor_verifier/executor.go @@ -158,7 +158,7 @@ func (e *Executor) CheckOnline() bool { return true } -func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot common.Hash) (bool, *executor.ProcessBatchResponseV2, error) { +func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot common.Hash) (bool, *executor.ProcessBatchResponseV2, error, error) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -183,12 +183,12 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com if e.outputLocation != "" { asJson, err := json.Marshal(grpcRequest) if err != nil { - return false, nil, err + return false, nil, nil, err } file := path.Join(e.outputLocation, fmt.Sprintf("payload_%d.json", request.BatchNumber)) err = os.WriteFile(file, asJson, 0644) if err != nil { - return false, nil, err + return false, nil, nil, err } // now save the witness as a hex string along with the datastream @@ -197,20 +197,23 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com witnessAsHex := "0x" + hex.EncodeToString(p.Witness) err = os.WriteFile(witnessHexFile, []byte(witnessAsHex), 0644) if err != nil { - return false, nil, err + return false, nil, nil, err } dataStreamHexFile := path.Join(e.outputLocation, fmt.Sprintf("datastream_%d.hex", request.BatchNumber)) dataStreamAsHex := "0x" + hex.EncodeToString(p.DataStream) err = os.WriteFile(dataStreamHexFile, []byte(dataStreamAsHex), 0644) if err != nil { - return false, nil, err + return false, nil, nil, err } } resp, err := e.client.ProcessStatelessBatchV2(ctx, grpcRequest, grpc.MaxCallSendMsgSize(size), grpc.MaxCallRecvMsgSize(size)) if err != nil { - return false, nil, fmt.Errorf("failed to process stateless batch: %w", err) + return false, nil, nil, fmt.Errorf("failed to process stateless batch: %w", err) + } + if resp == nil { + return false, nil, nil, fmt.Errorf("nil response") } counters := map[string]int{ @@ -266,14 +269,11 @@ func (e *Executor) Verify(p *Payload, request *VerifierRequest, oldStateRoot com log.Debug("Received response from executor", "grpcUrl", e.grpcUrl, "response", resp) - return responseCheck(resp, request) + ok, executorResponse, executorErr := responseCheck(resp, request) + return ok, executorResponse, executorErr, nil } func responseCheck(resp *executor.ProcessBatchResponseV2, request *VerifierRequest) (bool, *executor.ProcessBatchResponseV2, error) { - if resp == nil { - return false, nil, fmt.Errorf("nil response") - } - if resp.ForkId != request.ForkId { log.Warn("Executor fork id mismatch", "executor", resp.ForkId, "our", request.ForkId) } diff --git a/zk/legacy_executor_verifier/executor_test.go b/zk/legacy_executor_verifier/executor_test.go index 60d5db18579..253a5286a14 100644 --- a/zk/legacy_executor_verifier/executor_test.go +++ b/zk/legacy_executor_verifier/executor_test.go @@ -51,7 +51,7 @@ func TestExecutor_Verify(t *testing.T) { } for _, tt := range tests { - go func(tt struct { + func(tt struct { name string expectedStateRoot *common.Hash shouldError bool @@ -81,9 +81,9 @@ func TestExecutor_Verify(t *testing.T) { ContextId: "cdk-erigon-test", } - _, _, err := executor.Verify(payload, &VerifierRequest{StateRoot: *tt.expectedStateRoot}, common.Hash{}) - if (err != nil) != tt.wantErr { - t.Errorf("Executor.Verify() error = %v, wantErr %v", err, tt.wantErr) + _, _, executorErr, generalErr := executor.Verify(payload, &VerifierRequest{StateRoot: *tt.expectedStateRoot}, common.Hash{}) + if (executorErr != nil || generalErr != nil) != tt.wantErr { + t.Errorf("Executor.Verify() executorErr = %v, generalErr = %v, wantErr %v", executorErr, generalErr, tt.wantErr) } }) }(tt) diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index 4f48e58877d..c2717fd9860 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -182,7 +182,10 @@ func (v *LegacyExecutorVerifier) VerifySync(tx kv.Tx, request *VerifierRequest, return err } - _, _, executorErr := e.Verify(payload, request, previousBlock.Root()) + _, _, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root()) + if generalErr != nil { + return generalErr + } return executorErr } @@ -263,7 +266,10 @@ func (v *LegacyExecutorVerifier) VerifyAsync(request *VerifierRequest, blockNumb return verifierBundle, err } - ok, executorResponse, executorErr := e.Verify(payload, request, previousBlock.Root()) + ok, executorResponse, executorErr, generalErr := e.Verify(payload, request, previousBlock.Root()) + if generalErr != nil { + return verifierBundle, generalErr + } if executorErr != nil { if errors.Is(executorErr, ErrExecutorStateRootMismatch) { diff --git a/zk/stages/stage_batches.go b/zk/stages/stage_batches.go index 454f6f71703..cbce1c28be9 100644 --- a/zk/stages/stage_batches.go +++ b/zk/stages/stage_batches.go @@ -218,6 +218,7 @@ func SpawnStageBatches( } lastHash := emptyHash + lastBlockRoot := emptyHash atLeastOneBlockWritten := false startTime := time.Now() @@ -252,6 +253,9 @@ LOOP: } } case *types.BatchEnd: + if entry.StateRoot != lastBlockRoot { + log.Warn(fmt.Sprintf("[%s] batch end state root mismatches last block's: %x, expected: %x", logPrefix, entry.StateRoot, lastBlockRoot)) + } if err := writeBatchEnd(hermezDb, entry); err != nil { return fmt.Errorf("write batch end error: %v", err) } @@ -360,6 +364,7 @@ LOOP: } lastHash = entry.L2Blockhash + lastBlockRoot = entry.StateRoot atLeastOneBlockWritten = true lastBlockHeight = entry.L2BlockNumber diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index f3cae1b3ca3..199fe4d69d4 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/zk/sequencer" ) type DataStreamCatchupCfg struct { @@ -80,12 +81,24 @@ func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) - finalBlockNumber, err := stages.GetStageProgress(tx, stages.Execution) - if err != nil { - return 0, err + var ( + err error + finalBlockNumber uint64 + ) + + if sequencer.IsSequencer() { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.DataStream) + if err != nil { + return 0, err + } + } else { + finalBlockNumber, err = stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return 0, err + } } - previousProgress, err := stages.GetStageProgress(tx, stages.DataStream) + previousProgress, err := srv.GetHighestBlockNumber() if err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index ca888b2567f..c2e02108f76 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -46,11 +46,6 @@ func SpawnSequencingStage( return err } - isLastBatchPariallyProcessed, err := sdb.hermezDb.GetIsBatchPartiallyProcessed(lastBatch) - if err != nil { - return err - } - forkId, err := prepareForkId(lastBatch, executionAt, sdb.hermezDb) if err != nil { return err @@ -66,7 +61,7 @@ func SpawnSequencingStage( var block *types.Block runLoopBlocks := true batchContext := newBatchContext(ctx, &cfg, &historyCfg, s, sdb) - batchState := newBatchState(forkId, prepareBatchNumber(lastBatch, isLastBatchPariallyProcessed), !isLastBatchPariallyProcessed && cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) + batchState := newBatchState(forkId, lastBatch+1, cfg.zk.HasExecutors(), cfg.zk.L1SyncStartBlock > 0, cfg.txPool) blockDataSizeChecker := newBlockDataChecker() streamWriter := newSequencerBatchStreamWriter(batchContext, batchState, lastBatch) // using lastBatch (rather than batchState.batchNumber) is not mistake @@ -79,31 +74,37 @@ func SpawnSequencingStage( if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, sdb.tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchBatchNumber); err != nil { return err } + if err = stages.SaveStageProgress(sdb.tx, stages.DataStream, 1); err != nil { + return err + } return sdb.tx.Commit() } + // handle cases where the last batch wasn't committed to the data stream. + // this could occur because we're migrating from an RPC node to a sequencer + // or because the sequencer was restarted and not all processes completed (like waiting from remote executor) + // we consider the data stream as verified by the executor so treat it as "safe" and unwind blocks beyond there + // if we identify any. During normal operation this function will simply check and move on without performing + // any action. + if !batchState.isL1Recovery() { + isUnwinding, err := handleBatchEndChecks(batchContext, batchState, executionAt, u) + if err != nil || isUnwinding { + return err + } + } + tryHaltSequencer(batchContext, batchState.batchNumber) if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { return err } - batchCounters, err := prepareBatchCounters(batchContext, batchState, isLastBatchPariallyProcessed) + batchCounters, err := prepareBatchCounters(batchContext, batchState, nil) if err != nil { return err } - if !isLastBatchPariallyProcessed { - // handle case where batch wasn't closed properly - // close it before starting a new one - // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted - // and datastream was regenerated - if err = finalizeLastBatchInDatastreamIfNotFinalized(batchContext, batchState, executionAt); err != nil { - return err - } - } - if batchState.isL1Recovery() { if cfg.zk.L1SyncStopBatch > 0 && batchState.batchNumber > cfg.zk.L1SyncStopBatch { log.Info(fmt.Sprintf("[%s] L1 recovery has completed!", logPrefix), "batch", batchState.batchNumber) @@ -221,12 +222,11 @@ func SpawnSequencingStage( if err != nil { return err } - } - - if len(batchState.blockState.transactionsForInclusion) == 0 { - time.Sleep(250 * time.Millisecond) - } else { - log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) + if len(batchState.blockState.transactionsForInclusion) == 0 { + time.Sleep(250 * time.Millisecond) + } else { + log.Trace(fmt.Sprintf("[%s] Yielded transactions from the pool", logPrefix), "txCount", len(batchState.blockState.transactionsForInclusion)) + } } for i, transaction := range batchState.blockState.transactionsForInclusion { @@ -307,8 +307,7 @@ func SpawnSequencingStage( } } - block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters) - if err != nil { + if block, err = doFinishBlockAndUpdateState(batchContext, ibs, header, parentBlock, batchState, ger, l1BlockHash, l1TreeUpdateIndex, infoTreeIndexProgress, batchCounters); err != nil { return err } @@ -335,21 +334,26 @@ func SpawnSequencingStage( batchState.onBuiltBlock(blockNumber) // commit block data here so it is accessible in other threads - if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { - return errCommitAndStart + if !batchState.isL1Recovery() { + if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { + return errCommitAndStart + } + defer sdb.tx.Rollback() } - defer sdb.tx.Rollback() cfg.legacyVerifier.StartAsyncVerification(batchState.forkId, batchState.batchNumber, block.Root(), batchCounters.CombineCollectorsNoChanges().UsedAsMap(), batchState.builtBlocks, batchState.hasExecutorForThisBatch, batchContext.cfg.zk.SequencerBatchVerificationTimeout) // check for new responses from the verifier needsUnwind, err := updateStreamAndCheckRollback(batchContext, batchState, streamWriter, u) - // lets commit everything after updateStreamAndCheckRollback no matter of its result - if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { - return errCommitAndStart + // lets commit everything after updateStreamAndCheckRollback no matter of its result unless + // we're in L1 recovery where losing some blocks on restart doesn't matter + if !batchState.isL1Recovery() { + if errCommitAndStart := sdb.CommitAndStart(); errCommitAndStart != nil { + return errCommitAndStart + } + defer sdb.tx.Rollback() } - defer sdb.tx.Rollback() // check the return values of updateStreamAndCheckRollback if err != nil || needsUnwind { diff --git a/zk/stages/stage_sequence_execute_batch.go b/zk/stages/stage_sequence_execute_batch.go index 6f7dc4275da..23bb025ef01 100644 --- a/zk/stages/stage_sequence_execute_batch.go +++ b/zk/stages/stage_sequence_execute_batch.go @@ -12,28 +12,7 @@ import ( "github.com/ledgerwatch/log/v3" ) -func prepareBatchNumber(lastBatch uint64, isLastBatchPariallyProcessed bool) uint64 { - if isLastBatchPariallyProcessed { - return lastBatch - } - - return lastBatch + 1 -} - -func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, isLastBatchPariallyProcessed bool) (*vm.BatchCounterCollector, error) { - var intermediateUsedCounters *vm.Counters - if isLastBatchPariallyProcessed { - intermediateCountersMap, found, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(batchState.batchNumber) - if err != nil { - return nil, err - } - if found { - intermediateUsedCounters = vm.NewCountersFromUsedMap(intermediateCountersMap) - } else { - log.Warn("intermediate counters not found for batch, initialising with empty counters", "batch", batchState.batchNumber) - } - } - +func prepareBatchCounters(batchContext *BatchContext, batchState *BatchState, intermediateUsedCounters *vm.Counters) (*vm.BatchCounterCollector, error) { return vm.NewBatchCounterCollector(batchContext.sdb.smt.GetDepth(), uint16(batchState.forkId), batchContext.cfg.zk.VirtualCountersSmtReduction, batchContext.cfg.zk.ShouldCountersBeUnlimited(batchState.isL1Recovery()), intermediateUsedCounters), nil } @@ -66,9 +45,6 @@ func doCheckForBadBatch(batchContext *BatchContext, batchState *BatchState, this if err = batchContext.sdb.hermezDb.WriteBatchCounters(currentBlock.NumberU64(), map[string]int{}); err != nil { return false, err } - if err = batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(batchState.batchNumber); err != nil { - return false, err - } if err = stages.SaveStageProgress(batchContext.sdb.tx, stages.HighestSeenBatchNumber, batchState.batchNumber); err != nil { return false, err } @@ -158,9 +134,6 @@ func runBatchLastSteps( if err = batchContext.sdb.hermezDb.WriteBatchCounters(blockNumber, counters.UsedAsMap()); err != nil { return err } - if err := batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(thisBatch); err != nil { - return err - } // Local Exit Root (ler): read s/c storage every batch to store the LER for the highest block in the batch ler, err := utils.GetBatchLocalExitRootFromSCStorage(thisBatch, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) @@ -172,11 +145,16 @@ func runBatchLastSteps( return err } - lastBlock, err := batchContext.sdb.hermezDb.GetHighestBlockInBatch(thisBatch) + // get the last block number written to batch + // we should match it's state root in batch end entry + // if we get the last block in DB errors may occur since we have DB unwinds AFTER we commit batch end to datastream + // the last block written to the datastream before batch end should be the correct one once we are here + // if it is not, we have a bigger problem + lastBlockNumber, err := batchContext.cfg.datastreamServer.GetHighestBlockNumber() if err != nil { return err } - block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlock) + block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastBlockNumber) if err != nil { return err } @@ -186,8 +164,7 @@ func runBatchLastSteps( } // the unwind of this value is handed by UnwindExecutionStageDbWrites - _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlock) - if err != nil { + if _, err = rawdb.IncrementStateVersionByBlockNumberIfNeeded(batchContext.sdb.tx, lastBlockNumber); err != nil { return fmt.Errorf("writing plain state version: %w", err) } diff --git a/zk/stages/stage_sequence_execute_blocks.go b/zk/stages/stage_sequence_execute_blocks.go index 495e9114846..060c753a26d 100644 --- a/zk/stages/stage_sequence_execute_blocks.go +++ b/zk/stages/stage_sequence_execute_blocks.go @@ -247,12 +247,6 @@ func finaliseBlock( return nil, err } - // write partially processed - err = batchContext.sdb.hermezDb.WriteIsBatchPartiallyProcessed(batchState.batchNumber) - if err != nil { - return nil, err - } - // this is actually account + storage indices stages quitCh := batchContext.ctx.Done() from := newNum.Uint64() diff --git a/zk/stages/stage_sequence_execute_data_stream.go b/zk/stages/stage_sequence_execute_data_stream.go index 373511a1212..c57e91b04d5 100644 --- a/zk/stages/stage_sequence_execute_data_stream.go +++ b/zk/stages/stage_sequence_execute_data_stream.go @@ -5,9 +5,11 @@ import ( "fmt" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/eth/stagedsync" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" "github.com/ledgerwatch/erigon/zk/datastream/server" verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier" - "github.com/ledgerwatch/erigon/zk/utils" "github.com/ledgerwatch/log/v3" ) @@ -62,6 +64,10 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, err } + if err = stages.SaveStageProgress(sbc.sdb.tx, stages.DataStream, block.NumberU64()); err != nil { + return checkedVerifierBundles, err + } + // once we have handled the very first block we can update the last batch to be the current batch safely so that // we don't keep adding batch bookmarks in between blocks sbc.lastBatch = request.BatchNumber @@ -78,29 +84,55 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun return checkedVerifierBundles, nil } -func finalizeLastBatchInDatastreamIfNotFinalized(batchContext *BatchContext, batchState *BatchState, thisBlock uint64) error { +func handleBatchEndChecks(batchContext *BatchContext, batchState *BatchState, thisBlock uint64, u stagedsync.Unwinder) (bool, error) { isLastEntryBatchEnd, err := batchContext.cfg.datastreamServer.IsLastEntryBatchEnd() if err != nil { - return err + return false, err } if isLastEntryBatchEnd { - return nil + return false, nil + } + + lastBatch := batchState.batchNumber - 1 + + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), lastBatch)) + + rawCounters, _, err := batchContext.sdb.hermezDb.GetLatestBatchCounters(lastBatch) + if err != nil { + return false, err } - log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", batchContext.s.LogPrefix(), batchState.batchNumber)) - ler, err := utils.GetBatchLocalExitRootFromSCStorage(batchState.batchNumber, batchContext.sdb.hermezDb.HermezDbReader, batchContext.sdb.tx) + latestCounters := vm.NewCountersFromUsedMap(rawCounters) + + endBatchCounters, err := prepareBatchCounters(batchContext, batchState, latestCounters) if err != nil { - return err + return false, err + } + + if err = runBatchLastSteps(batchContext, lastBatch, thisBlock, endBatchCounters); err != nil { + return false, err } - lastBlock, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, thisBlock) + // now check if there is a gap in the stream vs the state db + streamProgress, err := stages.GetStageProgress(batchContext.sdb.tx, stages.DataStream) if err != nil { - return err + return false, err } - root := lastBlock.Root() - if err = batchContext.cfg.datastreamServer.WriteBatchEnd(batchContext.sdb.hermezDb, batchState.batchNumber-1, &root, &ler); err != nil { - return err + + unwinding := false + if streamProgress > 0 && streamProgress < thisBlock { + block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, streamProgress) + if err != nil { + return true, err + } + log.Warn(fmt.Sprintf("[%s] Unwinding due to a datastream gap", batchContext.s.LogPrefix()), + "streamHeight", streamProgress, + "sequencerHeight", thisBlock, + ) + u.UnwindTo(streamProgress, block.Hash()) + unwinding = true } - return nil + + return unwinding, nil } diff --git a/zk/stages/stage_sequence_execute_injected_batch.go b/zk/stages/stage_sequence_execute_injected_batch.go index 323b7a0f2f9..e1917b7748a 100644 --- a/zk/stages/stage_sequence_execute_injected_batch.go +++ b/zk/stages/stage_sequence_execute_injected_batch.go @@ -80,8 +80,7 @@ func processInjectedInitialBatch( return err } - // deleting the partially processed flag - return batchContext.sdb.hermezDb.DeleteIsBatchPartiallyProcessed(injectedBatchBatchNumber) + return err } func handleInjectedBatch( diff --git a/zk/stages/stage_sequence_execute_unwind.go b/zk/stages/stage_sequence_execute_unwind.go index 46c0a58846f..b8918aa33d7 100644 --- a/zk/stages/stage_sequence_execute_unwind.go +++ b/zk/stages/stage_sequence_execute_unwind.go @@ -137,15 +137,6 @@ func UnwindSequenceExecutionStageDbWrites(ctx context.Context, u *stagedsync.Unw if err = hermezDb.DeleteBatchCounters(u.UnwindPoint+1, s.BlockNumber); err != nil { return fmt.Errorf("truncate block batches error: %v", err) } - // only seq - if err = hermezDb.TruncateIsBatchPartiallyProcessed(fromBatch, toBatch); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - if lastBatchToKeepBeforeFrom == fromBatch { - if err = hermezDb.WriteIsBatchPartiallyProcessed(lastBatchToKeepBeforeFrom); err != nil { - return fmt.Errorf("truncate fork id error: %v", err) - } - } return nil } diff --git a/zk/utils/utils.go b/zk/utils/utils.go index 5d4038337f4..3376b863c61 100644 --- a/zk/utils/utils.go +++ b/zk/utils/utils.go @@ -153,6 +153,7 @@ func GetBatchLocalExitRootFromSCStorage(batchNo uint64, db DbReader, tx kv.Tx) ( } stateReader := state.NewPlainState(tx, blockNo+1, systemcontracts.SystemContractCodeLookup["hermez"]) + defer stateReader.Close() rawLer, err := stateReader.ReadAccountStorage(state.GER_MANAGER_ADDRESS, 1, &state.GLOBAL_EXIT_ROOT_POS_1) if err != nil { return libcommon.Hash{}, err