Skip to content

Commit

Permalink
Merge branch 'master' into ondemand-module-compile
Browse files Browse the repository at this point in the history
  • Loading branch information
magicxyyz authored Nov 7, 2024
2 parents 0c4ffe2 + b8cf6ea commit bf5cc81
Show file tree
Hide file tree
Showing 39 changed files with 738 additions and 408 deletions.
8 changes: 4 additions & 4 deletions arbitrator/bench/src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use prover::prepare::prepare_machine;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path to a preimages text file
/// Path to a preimages json file
#[arg(short, long)]
preimages_path: PathBuf,
json_inputs: PathBuf,

/// Path to a machine.wavm.br
#[arg(short, long)]
machine_path: PathBuf,
binary: PathBuf,
}

fn main() -> eyre::Result<()> {
Expand All @@ -33,7 +33,7 @@ fn main() -> eyre::Result<()> {

println!("Running benchmark with always merkleize feature on");
for step_size in step_sizes {
let mut machine = prepare_machine(args.preimages_path.clone(), args.machine_path.clone())?;
let mut machine = prepare_machine(args.json_inputs.clone(), args.binary.clone())?;
let _ = machine.hash();
let mut hash_times = vec![];
let mut step_times = vec![];
Expand Down
9 changes: 7 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type BatchPosterConfig struct {
Dangerous BatchPosterDangerousConfig `koanf:"dangerous"`
ReorgResistanceMargin time.Duration `koanf:"reorg-resistance-margin" reload:"hot"`
CheckBatchCorrectness bool `koanf:"check-batch-correctness"`
MaxEmptyBatchDelay time.Duration `koanf:"max-empty-batch-delay"`

gasRefunder common.Address
l1BlockBound l1BlockBound
Expand Down Expand Up @@ -224,6 +225,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Uint64(prefix+".gas-estimate-base-fee-multiple-bips", uint64(DefaultBatchPosterConfig.GasEstimateBaseFeeMultipleBips), "for gas estimation, use this multiple of the basefee (measured in basis points) as the max fee per gas")
f.Duration(prefix+".reorg-resistance-margin", DefaultBatchPosterConfig.ReorgResistanceMargin, "do not post batch if its within this duration from layer 1 minimum bounds. Requires l1-block-bound option not be set to \"ignore\"")
f.Bool(prefix+".check-batch-correctness", DefaultBatchPosterConfig.CheckBatchCorrectness, "setting this to true will run the batch against an inbox multiplexer and verifies that it produces the correct set of messages")
f.Duration(prefix+".max-empty-batch-delay", DefaultBatchPosterConfig.MaxEmptyBatchDelay, "maximum empty batch posting delay, batch poster will only be able to post an empty batch if this time period building a batch has passed")
redislock.AddConfigOptions(prefix+".redis-lock", f)
dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig)
genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname)
Expand Down Expand Up @@ -255,6 +257,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
GasEstimateBaseFeeMultipleBips: arbmath.OneInUBips * 3 / 2,
ReorgResistanceMargin: 10 * time.Minute,
CheckBatchCorrectness: true,
MaxEmptyBatchDelay: 3 * 24 * time.Hour,
}

var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{
Expand All @@ -277,7 +280,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
DASRetentionPeriod: daprovider.DefaultDASRetentionPeriod,
GasRefunderAddress: "",
ExtraBatchGas: 10_000,
Post4844Blobs: true,
Post4844Blobs: false,
IgnoreBlobPrice: false,
DataPoster: dataposter.TestDataPosterConfig,
ParentChainWallet: DefaultBatchPosterL1WalletConfig,
Expand Down Expand Up @@ -1303,7 +1306,9 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
b.building.muxBackend.delayedInbox = append(b.building.muxBackend.delayedInbox, msg)
}
}
if msg.Message.Header.Kind != arbostypes.L1MessageType_BatchPostingReport {
// #nosec G115
timeSinceMsg := time.Since(time.Unix(int64(msg.Message.Header.Timestamp), 0))
if (msg.Message.Header.Kind != arbostypes.L1MessageType_BatchPostingReport) || (timeSinceMsg >= config.MaxEmptyBatchDelay) {
b.building.haveUsefulMessage = true
if b.building.firstUsefulMsg == nil {
b.building.firstUsefulMsg = msg
Expand Down
14 changes: 11 additions & 3 deletions arbnode/dataposter/data_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/ethclient"
Expand Down Expand Up @@ -1087,7 +1088,7 @@ func (p *DataPoster) updateBalance(ctx context.Context) error {
return nil
}

const maxConsecutiveIntermittentErrors = 10
const maxConsecutiveIntermittentErrors = 20

func (p *DataPoster) maybeLogError(err error, tx *storage.QueuedTransaction, msg string) {
nonce := tx.FullTx.Nonce()
Expand All @@ -1096,10 +1097,17 @@ func (p *DataPoster) maybeLogError(err error, tx *storage.QueuedTransaction, msg
return
}
logLevel := log.Error
if errors.Is(err, storage.ErrStorageRace) {
isStorageRace := errors.Is(err, storage.ErrStorageRace)
if isStorageRace || strings.Contains(err.Error(), txpool.ErrFutureReplacePending.Error()) {
p.errorCount[nonce]++
if p.errorCount[nonce] <= maxConsecutiveIntermittentErrors {
logLevel = log.Debug
if isStorageRace {
logLevel = log.Debug
} else {
logLevel = log.Info
}
} else if isStorageRace {
logLevel = log.Warn
}
} else {
delete(p.errorCount, nonce)
Expand Down
85 changes: 74 additions & 11 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ func (r *InboxReader) CaughtUp() chan struct{} {
return r.caughtUpChan
}

type lazyHashLogging struct {
f func() common.Hash
}

func (l lazyHashLogging) String() string {
return l.f().String()
}

func (l lazyHashLogging) TerminalString() string {
return l.f().TerminalString()
}

func (l lazyHashLogging) MarshalText() ([]byte, error) {
return l.f().MarshalText()
}

func (l lazyHashLogging) Format(s fmt.State, c rune) {
l.f().Format(s, c)
}

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead(ctx)
Expand Down Expand Up @@ -334,6 +354,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if ourLatestDelayedCount < checkingDelayedCount {
log.Debug("Expecting to find delayed messages", "checkingDelayedCount", checkingDelayedCount, "ourLatestDelayedCount", ourLatestDelayedCount, "currentHeight", currentHeight)
checkingDelayedCount = ourLatestDelayedCount
missingDelayed = true
} else if ourLatestDelayedCount > checkingDelayedCount {
Expand All @@ -354,6 +375,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if dbDelayedAcc != l1DelayedAcc {
log.Debug("Latest delayed accumulator mismatch", "delayedSeqNum", checkingDelayedSeqNum, "dbDelayedAcc", dbDelayedAcc, "l1DelayedAcc", l1DelayedAcc)
reorgingDelayed = true
}
}
Expand All @@ -371,6 +393,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if ourLatestBatchCount < checkingBatchCount {
log.Debug("Expecting to find sequencer batches", "checkingBatchCount", checkingBatchCount, "ourLatestBatchCount", ourLatestBatchCount, "currentHeight", currentHeight)
checkingBatchCount = ourLatestBatchCount
missingSequencer = true
} else if ourLatestBatchCount > checkingBatchCount && config.HardReorg {
Expand All @@ -390,6 +413,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if dbBatchAcc != l1BatchAcc {
log.Debug("Latest sequencer batch accumulator mismatch", "batchSeqNum", checkingBatchSeqNum, "dbBatchAcc", dbBatchAcc, "l1BatchAcc", l1BatchAcc)
reorgingSequencer = true
}
}
Expand Down Expand Up @@ -432,6 +456,15 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if to.Cmp(currentHeight) > 0 {
to.Set(currentHeight)
}
log.Debug(
"Looking up messages",
"from", from.String(),
"to", to.String(),
"missingDelayed", missingDelayed,
"missingSequencer", missingSequencer,
"reorgingDelayed", reorgingDelayed,
"reorgingSequencer", reorgingSequencer,
)
sequencerBatches, err := r.sequencerInbox.LookupBatchesInRange(ctx, from, to)
if err != nil {
return err
Expand All @@ -457,6 +490,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if len(sequencerBatches) > 0 {
missingSequencer = false
reorgingSequencer = false
var havePrevAcc common.Hash
firstBatch := sequencerBatches[0]
if firstBatch.SequenceNumber > 0 {
haveAcc, err := r.tracker.GetBatchAcc(firstBatch.SequenceNumber - 1)
Expand All @@ -467,7 +501,10 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc != firstBatch.BeforeInboxAcc {
reorgingSequencer = true
}
havePrevAcc = haveAcc
}
readLastAcc := sequencerBatches[len(sequencerBatches)-1].AfterInboxAcc
var duplicateBatches int
if !reorgingSequencer {
// Skip any batches we already have in the database
for len(sequencerBatches) > 0 {
Expand All @@ -482,14 +519,26 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc == batch.AfterInboxAcc {
// Skip this batch, as we already have it in the database
sequencerBatches = sequencerBatches[1:]
duplicateBatches++
} else {
// The first batch AfterInboxAcc matches, but this batch doesn't,
// so we'll successfully reorg it when we hit the addMessages
break
}
}
}
log.Debug(
"Found sequencer batches",
"firstSequenceNumber", firstBatch.SequenceNumber,
"newBatchesCount", len(sequencerBatches),
"duplicateBatches", duplicateBatches,
"reorgingSequencer", reorgingSequencer,
"readBeforeAcc", firstBatch.BeforeInboxAcc,
"haveBeforeAcc", havePrevAcc,
"readLastAcc", readLastAcc,
)
} else if missingSequencer && to.Cmp(currentHeight) >= 0 {
log.Debug("Didn't find expected sequencer batches", "from", from, "to", to, "currentHeight", currentHeight)
// We were missing sequencer batches but didn't find any.
// This must mean that the sequencer batches are in the past.
reorgingSequencer = true
Expand All @@ -504,6 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if err != nil {
return err
}
var havePrevAcc common.Hash
if beforeCount > 0 {
haveAcc, err := r.tracker.GetDelayedAcc(beforeCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
Expand All @@ -513,14 +563,27 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc != beforeAcc {
reorgingDelayed = true
}
havePrevAcc = haveAcc
}
log.Debug(
"Found delayed messages",
"firstSequenceNumber", beforeCount,
"count", len(delayedMessages),
"reorgingDelayed", reorgingDelayed,
"readBeforeAcc", beforeAcc,
"haveBeforeAcc", havePrevAcc,
"readLastAcc", lazyHashLogging{func() common.Hash {
// Only compute this if we need to log it, as it's somewhat expensive
return delayedMessages[len(delayedMessages)-1].AfterInboxAcc()
}},
)
} else if missingDelayed && to.Cmp(currentHeight) >= 0 {
log.Debug("Didn't find expected delayed messages", "from", from, "to", to, "currentHeight", currentHeight)
// We were missing delayed messages but didn't find any.
// This must mean that the delayed messages are in the past.
reorgingDelayed = true
}

log.Trace("looking up messages", "from", from.String(), "to", to.String(), "missingDelayed", missingDelayed, "missingSequencer", missingSequencer, "reorgingDelayed", reorgingDelayed, "reorgingSequencer", reorgingSequencer)
if !reorgingDelayed && !reorgingSequencer && (len(delayedMessages) != 0 || len(sequencerBatches) != 0) {
delayedMismatch, err := r.addMessages(ctx, sequencerBatches, delayedMessages)
if err != nil {
Expand All @@ -535,14 +598,6 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
storeSeenBatchCount()
}
}
if reorgingDelayed || reorgingSequencer {
from, err = r.getPrevBlockForReorg(from)
if err != nil {
return err
}
} else {
from = arbmath.BigAddByUint(to, 1)
}
// #nosec G115
haveMessages := uint64(len(delayedMessages) + len(sequencerBatches))
if haveMessages <= (config.TargetMessagesRead / 2) {
Expand All @@ -556,6 +611,14 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if blocksToFetch > config.MaxBlocksToRead {
blocksToFetch = config.MaxBlocksToRead
}
if reorgingDelayed || reorgingSequencer {
from, err = r.getPrevBlockForReorg(from, blocksToFetch)
if err != nil {
return err
}
} else {
from = arbmath.BigAddByUint(to, 1)
}
}

if !readAnyBatches {
Expand All @@ -579,11 +642,11 @@ func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*Seque
return false, nil
}

func (r *InboxReader) getPrevBlockForReorg(from *big.Int) (*big.Int, error) {
func (r *InboxReader) getPrevBlockForReorg(from *big.Int, maxBlocksBackwards uint64) (*big.Int, error) {
if from.Cmp(r.firstMessageBlock) <= 0 {
return nil, errors.New("can't get older messages")
}
newFrom := arbmath.BigSub(from, big.NewInt(10))
newFrom := arbmath.BigSub(from, new(big.Int).SetUint64(maxBlocksBackwards))
if newFrom.Cmp(r.firstMessageBlock) < 0 {
newFrom = new(big.Int).Set(r.firstMessageBlock)
}
Expand Down
20 changes: 12 additions & 8 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,22 +697,26 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client *ethclien

for _, batch := range batches {
if batch.SequenceNumber != pos {
return errors.New("unexpected batch sequence number")
return fmt.Errorf("unexpected batch sequence number %v expected %v", batch.SequenceNumber, pos)
}
if nextAcc != batch.BeforeInboxAcc {
return errors.New("previous batch accumulator mismatch")
return fmt.Errorf("previous batch accumulator %v mismatch expected %v", batch.BeforeInboxAcc, nextAcc)
}

if batch.AfterDelayedCount > 0 {
haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
// We somehow missed a referenced delayed message; go back and look for it
return delayedMessagesMismatch
}
if err != nil {
notFound := errors.Is(err, AccumulatorNotFoundErr)
if err != nil && !notFound {
return err
}
if haveDelayedAcc != batch.AfterDelayedAcc {
if notFound || haveDelayedAcc != batch.AfterDelayedAcc {
log.Debug(
"Delayed message accumulator doesn't match sequencer batch",
"batch", batch.SequenceNumber,
"delayedPosition", batch.AfterDelayedCount-1,
"haveDelayedAcc", haveDelayedAcc,
"batchDelayedAcc", batch.AfterDelayedAcc,
)
// We somehow missed a delayed message reorg; go back and look for it
return delayedMessagesMismatch
}
Expand Down
4 changes: 4 additions & 0 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
}
msgCount := endBatchMetadata.MessageCount
delayedCount := endBatchMetadata.DelayedMessageCount
if delayedCount > 0 {
// keep an extra delayed message for the inbox reader to use
delayedCount--
}

return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount)
}
Expand Down
1 change: 0 additions & 1 deletion arbos/arbosState/initialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func tryMarshalUnmarshal(input *statetransfer.ArbosInitializationInfo, t *testin
cacheConfig := core.DefaultCacheConfigWithScheme(env.GetTestStateScheme())
stateroot, err := InitializeArbosInDatabase(raw, cacheConfig, initReader, chainConfig, arbostypes.TestInitMessage, 0, 0)
Require(t, err)

triedbConfig := cacheConfig.TriedbConfig()
stateDb, err := state.New(stateroot, state.NewDatabaseWithConfig(raw, triedbConfig), nil)
Require(t, err)
Expand Down
7 changes: 4 additions & 3 deletions arbos/arbosState/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package arbosState

import (
"errors"
"github.com/ethereum/go-ethereum/core/tracing"
"math/big"
"regexp"
"sort"
Expand Down Expand Up @@ -159,7 +160,7 @@ func InitializeArbosInDatabase(db ethdb.Database, cacheConfig *core.CacheConfig,
if err != nil {
return common.Hash{}, err
}
statedb.SetBalance(account.Addr, uint256.MustFromBig(account.EthBalance))
statedb.SetBalance(account.Addr, uint256.MustFromBig(account.EthBalance), tracing.BalanceChangeUnspecified)
statedb.SetNonce(account.Addr, account.Nonce)
if account.ContractInfo != nil {
statedb.SetCode(account.Addr, account.ContractInfo.Code)
Expand Down Expand Up @@ -190,7 +191,7 @@ func initializeRetryables(statedb *state.StateDB, rs *retryables.RetryableState,
return err
}
if r.Timeout <= currentTimestamp {
statedb.AddBalance(r.Beneficiary, uint256.MustFromBig(r.Callvalue))
statedb.AddBalance(r.Beneficiary, uint256.MustFromBig(r.Callvalue), tracing.BalanceChangeUnspecified)
continue
}
retryablesList = append(retryablesList, r)
Expand All @@ -209,7 +210,7 @@ func initializeRetryables(statedb *state.StateDB, rs *retryables.RetryableState,
addr := r.To
to = &addr
}
statedb.AddBalance(retryables.RetryableEscrowAddress(r.Id), uint256.MustFromBig(r.Callvalue))
statedb.AddBalance(retryables.RetryableEscrowAddress(r.Id), uint256.MustFromBig(r.Callvalue), tracing.BalanceChangeUnspecified)
_, err := rs.CreateRetryable(r.Id, r.Timeout, r.From, to, r.Callvalue, r.Beneficiary, r.Calldata)
if err != nil {
return err
Expand Down
Loading

0 comments on commit bf5cc81

Please sign in to comment.