Skip to content

Commit

Permalink
Merge branch 'master' into finalize-arbos-20
Browse files Browse the repository at this point in the history
  • Loading branch information
PlasmaPower authored Jan 30, 2024
2 parents d84f1ba + 28127e0 commit ab13ede
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 47 deletions.
12 changes: 10 additions & 2 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,16 @@ func (s *TransactionStreamer) executeNextMsg(ctx context.Context, exec execution
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos)
return false
}
err = s.exec.DigestMessage(pos, msg)
if err != nil {
var msgForPrefetch *arbostypes.MessageWithMetadata
if pos+1 < msgCount {
msg, err := s.GetMessage(pos + 1)
if err != nil {
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos+1)
return false
}
msgForPrefetch = msg
}
if err = s.exec.DigestMessage(pos, msg, msgForPrefetch); err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
logger = log.Debug
Expand Down
9 changes: 6 additions & 3 deletions arbstate/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
}
payload := data[40:]

// Stage 1: Extract the payload from any data availability header.
// It's important that multiple DAS strategies can't both be invoked in the same batch,
// as these headers are validated by the sequencer inbox and not other DASs.
if len(payload) > 0 && IsDASMessageHeaderByte(payload[0]) {
if dasReader == nil {
log.Error("No DAS Reader configured, but sequencer message found with DAS header")
Expand All @@ -88,9 +91,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
return parsedMsg, nil
}
}
}

if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) {
} else if len(payload) > 0 && IsBlobHashesHeaderByte(payload[0]) {
blobHashes := payload[1:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, fmt.Errorf("blob batch data is not a list of hashes as expected")
Expand All @@ -115,6 +116,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
}
}

// Stage 2: If enabled, decode the zero heavy payload (saves gas based on calldata charging).
if len(payload) > 0 && IsZeroheavyEncodedHeaderByte(payload[0]) {
pl, err := io.ReadAll(io.LimitReader(zeroheavy.NewZeroheavyDecoder(bytes.NewReader(payload[1:])), int64(maxZeroheavyDecompressedLen)))
if err != nil {
Expand All @@ -124,6 +126,7 @@ func parseSequencerMessage(ctx context.Context, batchNum uint64, batchBlockHash
payload = pl
}

// Stage 3: Decompress the brotli payload and fill the parsedMsg.segments list.
if len(payload) > 0 && IsBrotliMessageHeaderByte(payload[0]) {
decompressed, err := arbcompress.Decompress(payload[1:], MaxDecompressedLen)
if err == nil {
Expand Down
43 changes: 38 additions & 5 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type ExecutionEngine struct {
nextScheduledVersionCheck time.Time // protected by the createBlocksMutex

reorgSequencing bool

prefetchBlock bool
}

func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) {
Expand Down Expand Up @@ -71,6 +73,16 @@ func (s *ExecutionEngine) EnableReorgSequencing() {
s.reorgSequencing = true
}

func (s *ExecutionEngine) EnablePrefetchBlock() {
if s.Started() {
panic("trying to enable prefetch block after start")
}
if s.prefetchBlock {
panic("trying to enable prefetch block when already set")
}
s.prefetchBlock = true
}

func (s *ExecutionEngine) SetTransactionStreamer(streamer execution.TransactionStreamer) {
if s.Started() {
panic("trying to set transaction streamer after start")
Expand Down Expand Up @@ -107,7 +119,11 @@ func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbost
return err
}
for i := range newMessages {
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i])
var msgForPrefetch *arbostypes.MessageWithMetadata
if i < len(newMessages)-1 {
msgForPrefetch = &newMessages[i]
}
err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i], msgForPrefetch)
if err != nil {
return err
}
Expand Down Expand Up @@ -489,15 +505,20 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess
return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos)))
}

func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
// DigestMessage is used to create a block by executing msg against the latest state and storing it.
// Also, while creating a block by executing msg against the latest state,
// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state
// but does not store the block.
// This helps in filling the cache, so that the next block creation is faster.
func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
if !s.createBlocksMutex.TryLock() {
return errors.New("createBlock mutex held")
}
defer s.createBlocksMutex.Unlock()
return s.digestMessageWithBlockMutex(num, msg)
return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch)
}

func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
currentHeader, err := s.getCurrentHeader()
if err != nil {
return err
Expand All @@ -511,11 +532,23 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex,
}

startTime := time.Now()
var wg sync.WaitGroup
if s.prefetchBlock && msgForPrefetch != nil {
wg.Add(1)
go func() {
defer wg.Done()
_, _, _, err := s.createBlockFromNextMessage(msgForPrefetch)
if err != nil {
return
}
}()
}

block, statedb, receipts, err := s.createBlockFromNextMessage(msg)
if err != nil {
return err
}

wg.Wait()
err = s.appendBlock(block, statedb, receipts, time.Since(startTime))
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func (n *ExecutionNode) StopAndWait() {
// }
}

func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg)
func (n *ExecutionNode) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error {
return n.ExecEngine.DigestMessage(num, msg, msgForPrefetch)
}
func (n *ExecutionNode) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error {
return n.ExecEngine.Reorg(count, newMessages, oldMessages)
Expand Down
7 changes: 7 additions & 0 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SequencerConfig struct {
MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"`
NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"`
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
EnablePrefetchBlock bool `koanf:"enable-prefetch-block"`
}

func (c *SequencerConfig) Validate() error {
Expand Down Expand Up @@ -97,6 +98,7 @@ var DefaultSequencerConfig = SequencerConfig{
MaxTxDataSize: 95000,
NonceFailureCacheSize: 1024,
NonceFailureCacheExpiry: time.Second,
EnablePrefetchBlock: false,
}

var TestSequencerConfig = SequencerConfig{
Expand All @@ -112,6 +114,7 @@ var TestSequencerConfig = SequencerConfig{
MaxTxDataSize: 95000,
NonceFailureCacheSize: 1024,
NonceFailureCacheExpiry: time.Second,
EnablePrefetchBlock: false,
}

func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
Expand All @@ -127,6 +130,7 @@ func SequencerConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".max-tx-data-size", DefaultSequencerConfig.MaxTxDataSize, "maximum transaction size the sequencer will accept")
f.Int(prefix+".nonce-failure-cache-size", DefaultSequencerConfig.NonceFailureCacheSize, "number of transactions with too high of a nonce to keep in memory while waiting for their predecessor")
f.Duration(prefix+".nonce-failure-cache-expiry", DefaultSequencerConfig.NonceFailureCacheExpiry, "maximum amount of time to wait for a predecessor before rejecting a tx with nonce too high")
f.Bool(prefix+".enable-prefetch-block", DefaultSequencerConfig.EnablePrefetchBlock, "enable prefetching of blocks")
}

type txQueueItem struct {
Expand Down Expand Up @@ -324,6 +328,9 @@ func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderRead
}
s.Pause()
execEngine.EnableReorgSequencing()
if config.EnablePrefetchBlock {
execEngine.EnablePrefetchBlock()
}
return s, nil
}

Expand Down
2 changes: 1 addition & 1 deletion execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var ErrSequencerInsertLockTaken = errors.New("insert lock taken")

// always needed
type ExecutionClient interface {
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata) error
DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) error
Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadata, oldMessages []*arbostypes.MessageWithMetadata) error
HeadMessageNumber() (arbutil.MessageIndex, error)
HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error)
Expand Down
98 changes: 64 additions & 34 deletions system_tests/pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package arbtest
import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/trie"
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/cmd/pruning"
"github.com/offchainlabs/nitro/execution/gethexec"
Expand All @@ -32,35 +34,34 @@ func TestPruning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var dataDir string

func() {
builder := NewNodeBuilder(ctx).DefaultConfig(t, true)
_ = builder.Build(t)
dataDir = builder.dataDir
l2cleanupDone := false
defer func() {
if !l2cleanupDone {
builder.L2.cleanup()
}
builder.L1.cleanup()
}()
builder.L2Info.GenerateAccount("User2")
var txs []*types.Transaction
for i := uint64(0); i < 200; i++ {
tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil)
txs = append(txs, tx)
err := builder.L2.Client.SendTransaction(ctx, tx)
Require(t, err)
}
for _, tx := range txs {
_, err := builder.L2.EnsureTxSucceeded(tx)
Require(t, err)
builder := NewNodeBuilder(ctx).DefaultConfig(t, true)
_ = builder.Build(t)
l2cleanupDone := false
defer func() {
if !l2cleanupDone {
builder.L2.cleanup()
}
l2cleanupDone = true
builder.L2.cleanup()
t.Log("stopped l2 node")
builder.L1.cleanup()
}()
builder.L2Info.GenerateAccount("User2")
var txs []*types.Transaction
for i := uint64(0); i < 200; i++ {
tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil)
txs = append(txs, tx)
err := builder.L2.Client.SendTransaction(ctx, tx)
Require(t, err)
}
for _, tx := range txs {
_, err := builder.L2.EnsureTxSucceeded(tx)
Require(t, err)
}
lastBlock, err := builder.L2.Client.BlockNumber(ctx)
Require(t, err)
l2cleanupDone = true
builder.L2.cleanup()
t.Log("stopped l2 node")

func() {
stack, err := node.New(builder.l2StackConfig)
Require(t, err)
defer stack.Close()
Expand Down Expand Up @@ -105,15 +106,44 @@ func TestPruning(t *testing.T) {
Fatal(t, "The db doesn't have less entries after pruning then before. Before:", chainDbEntriesBeforePruning, "After:", chainDbEntriesAfterPruning)
}
}()
builder := NewNodeBuilder(ctx).DefaultConfig(t, true)
builder.dataDir = dataDir
cancel = builder.Build(t)
defer cancel()

builder.L2Info.GenerateAccount("User2")
testClient, cleanup := builder.Build2ndNode(t, &SecondNodeParams{stackConfig: builder.l2StackConfig})
defer cleanup()

currentBlock := uint64(0)
// wait for the chain to catch up
for currentBlock < lastBlock {
currentBlock, err = testClient.Client.BlockNumber(ctx)
Require(t, err)
time.Sleep(20 * time.Millisecond)
}

currentBlock, err = testClient.Client.BlockNumber(ctx)
Require(t, err)
bc := testClient.ExecNode.Backend.ArbInterface().BlockChain()
triedb := bc.StateCache().TrieDB()
var start uint64
if currentBlock+1 >= builder.execConfig.Caching.BlockCount {
start = currentBlock + 1 - builder.execConfig.Caching.BlockCount
} else {
start = 0
}
for i := start; i <= currentBlock; i++ {
header := bc.GetHeaderByNumber(i)
_, err := bc.StateAt(header.Root)
Require(t, err)
tr, err := trie.New(trie.TrieID(header.Root), triedb)
Require(t, err)
it, err := tr.NodeIterator(nil)
Require(t, err)
for it.Next(true) {
}
Require(t, it.Error())
}

tx := builder.L2Info.PrepareTx("Owner", "User2", builder.L2Info.TransferGas, common.Big1, nil)
err := builder.L2.Client.SendTransaction(ctx, tx)
err = testClient.Client.SendTransaction(ctx, tx)
Require(t, err)
_, err = builder.L2.EnsureTxSucceeded(tx)
_, err = testClient.EnsureTxSucceeded(tx)
Require(t, err)
}

0 comments on commit ab13ede

Please sign in to comment.