diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index 92d216f765..73aba6a760 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -14,6 +14,7 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbos/l2pricing" "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/execution/gethexec" "github.com/offchainlabs/nitro/statetransfer" @@ -37,6 +38,9 @@ type execClientWrapper struct { func (w *execClientWrapper) Pause() { w.t.Error("not supported") } func (w *execClientWrapper) Activate() { w.t.Error("not supported") } func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil } +func (w *execClientWrapper) SetConfirmedNodeHelper(confirmedNodeHelper execution.ConfirmedNodeHelper) { + w.t.Error("not supported") +} func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) { chainConfig := params.ArbitrumDevTestChainConfig() @@ -54,7 +58,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (* arbDb := rawdb.NewMemoryDatabase() initReader := statetransfer.NewMemoryInitDataReader(&initData) - bc, err := gethexec.WriteOrTestBlockChain(chainDb, nil, initReader, chainConfig, arbostypes.TestInitMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0) + bc, err := gethexec.WriteOrTestBlockChain(chainDb, nil, initReader, chainConfig, arbostypes.TestInitMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0, nil) if err != nil { Fail(t, err) diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index 31bf1a63ff..efaf7dbc2f 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -65,7 +65,7 @@ func (m *MessagePruner) Start(ctxIn context.Context) { m.StopWaiter.Start(ctxIn, m) } -func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) { +func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState, _ uint64) { locked := m.pruningLock.TryLock() if !locked { return diff --git a/arbnode/node.go b/arbnode/node.go index f92dcefe7c..b707bc73c0 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math/big" + "reflect" "strings" "time" @@ -246,6 +247,7 @@ type Node struct { DASLifecycleManager *das.LifecycleManager ClassicOutboxRetriever *ClassicOutboxRetriever SyncMonitor *SyncMonitor + ConfirmedNodeHelper *staker.ConfirmedNodeHelper configFetcher ConfigFetcher ctx context.Context } @@ -467,6 +469,7 @@ func createNodeImpl( DASLifecycleManager: nil, ClassicOutboxRetriever: classicOutbox, SyncMonitor: syncMonitor, + ConfirmedNodeHelper: nil, configFetcher: configFetcher, ctx: ctx, }, nil @@ -559,6 +562,12 @@ func createNodeImpl( } } + var confirmedNodeHelper *staker.ConfirmedNodeHelper + if l1client != nil && !reflect.ValueOf(l1client).IsNil() { + confirmedNodeHelper = staker.NewConfirmedNodeHelper(deployInfo.Rollup, l1client) + exec.SetConfirmedNodeHelper(confirmedNodeHelper) + } + var stakerObj *staker.Staker var messagePruner *MessagePruner @@ -610,6 +619,9 @@ func createNodeImpl( messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner }) confirmedNotifiers = append(confirmedNotifiers, messagePruner) } + if confirmedNodeHelper != nil { + confirmedNotifiers = append(confirmedNotifiers, confirmedNodeHelper) + } stakerObj, err = staker.NewStaker(l1Reader, wallet, bind.CallOpts{}, config.Staker, blockValidator, statelessBlockValidator, nil, confirmedNotifiers, deployInfo.ValidatorUtils, fatalErrChan) if err != nil { @@ -682,6 +694,7 @@ func createNodeImpl( DASLifecycleManager: dasLifecycleManager, ClassicOutboxRetriever: classicOutbox, SyncMonitor: syncMonitor, + ConfirmedNodeHelper: confirmedNodeHelper, configFetcher: configFetcher, ctx: ctx, }, nil @@ -752,6 +765,9 @@ func (n *Node) Start(ctx context.Context) error { if err != nil { return fmt.Errorf("error starting geth stack: %w", err) } + if n.ConfirmedNodeHelper != nil { + n.ConfirmedNodeHelper.Start(ctx) + } err = n.Execution.Start(ctx) if err != nil { return fmt.Errorf("error starting exec client: %w", err) @@ -923,6 +939,9 @@ func (n *Node) StopAndWait() { if n.Execution != nil { n.Execution.StopAndWait() } + if n.ConfirmedNodeHelper != nil { + n.ConfirmedNodeHelper.StopAndWait() + } if err := n.Stack.Close(); err != nil { log.Error("error on stack close", "err", err) } diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index 4cf5dcda06..ab52513d52 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -159,7 +159,7 @@ func validateBlockChain(blockChain *core.BlockChain, chainConfig *params.ChainCo return nil } -func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) { +func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses, forceTriedbCommitHook core.ForceTriedbCommitHook) (ethdb.Database, *core.BlockChain, error) { if !config.Init.Force { if readOnlyDb, err := stack.OpenDatabaseWithFreezer("l2chaindata", 0, 0, "", "", true); err == nil { if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil { @@ -175,7 +175,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo if err != nil { return chainDb, nil, fmt.Errorf("error pruning: %w", err) } - l2BlockChain, err := gethexec.GetBlockChain(chainDb, cacheConfig, chainConfig, config.Execution.TxLookupLimit) + l2BlockChain, err := gethexec.GetBlockChain(chainDb, cacheConfig, chainConfig, config.Execution.TxLookupLimit, nil) if err != nil { return chainDb, nil, err } @@ -258,7 +258,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo if chainConfig == nil { return chainDb, nil, errors.New("no --init.* mode supplied and chain data not in expected directory") } - l2BlockChain, err = gethexec.GetBlockChain(chainDb, cacheConfig, chainConfig, config.Execution.TxLookupLimit) + l2BlockChain, err = gethexec.GetBlockChain(chainDb, cacheConfig, chainConfig, config.Execution.TxLookupLimit, forceTriedbCommitHook) if err != nil { return chainDb, nil, err } @@ -356,7 +356,7 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo log.Warn("Created fake init message as L1Reader is disabled and serialized chain config from init message is not available", "json", string(serializedChainConfig)) } - l2BlockChain, err = gethexec.WriteOrTestBlockChain(chainDb, cacheConfig, initDataReader, chainConfig, parsedInitMessage, config.Execution.TxLookupLimit, config.Init.AccountsPerSync) + l2BlockChain, err = gethexec.WriteOrTestBlockChain(chainDb, cacheConfig, initDataReader, chainConfig, parsedInitMessage, config.Execution.TxLookupLimit, config.Init.AccountsPerSync, forceTriedbCommitHook) if err != nil { return chainDb, nil, err } diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 45f539488d..0ccc769c03 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -467,7 +467,9 @@ func mainImpl() int { } } - chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), l1Client, rollupAddrs) + syncHelperConfigFetcher := func() *gethexec.NitroSyncHelperConfig { return &liveNodeConfig.Get().Execution.SyncHelper } + forceTriedbCommitHook := gethexec.GetForceTriedbCommitHookForConfig(syncHelperConfigFetcher) + chainDb, l2BlockChain, err := openInitializeChainDb(ctx, stack, nodeConfig, new(big.Int).SetUint64(nodeConfig.Chain.ID), gethexec.DefaultCacheConfigFor(stack, &nodeConfig.Execution.Caching), l1Client, rollupAddrs, forceTriedbCommitHook) if l2BlockChain != nil { deferFuncs = append(deferFuncs, func() { l2BlockChain.Stop() }) } diff --git a/execution/gethexec/blockchain.go b/execution/gethexec/blockchain.go index a85224b635..0fc5a8c743 100644 --- a/execution/gethexec/blockchain.go +++ b/execution/gethexec/blockchain.go @@ -165,7 +165,7 @@ func WriteOrTestChainConfig(chainDb ethdb.Database, config *params.ChainConfig) return nil } -func GetBlockChain(chainDb ethdb.Database, cacheConfig *core.CacheConfig, chainConfig *params.ChainConfig, txLookupLimit uint64) (*core.BlockChain, error) { +func GetBlockChain(chainDb ethdb.Database, cacheConfig *core.CacheConfig, chainConfig *params.ChainConfig, txLookupLimit uint64, forceTriedbCommitHook core.ForceTriedbCommitHook) (*core.BlockChain, error) { engine := arbos.Engine{ IsSequencer: true, } @@ -173,11 +173,10 @@ func GetBlockChain(chainDb ethdb.Database, cacheConfig *core.CacheConfig, chainC vmConfig := vm.Config{ EnablePreimageRecording: false, } - - return core.NewBlockChain(chainDb, cacheConfig, chainConfig, nil, nil, engine, vmConfig, shouldPreserveFalse, &txLookupLimit) + return core.NewArbBlockChain(chainDb, cacheConfig, chainConfig, nil, nil, engine, vmConfig, shouldPreserveFalse, &txLookupLimit, forceTriedbCommitHook) } -func WriteOrTestBlockChain(chainDb ethdb.Database, cacheConfig *core.CacheConfig, initData statetransfer.InitDataReader, chainConfig *params.ChainConfig, initMessage *arbostypes.ParsedInitMessage, txLookupLimit uint64, accountsPerSync uint) (*core.BlockChain, error) { +func WriteOrTestBlockChain(chainDb ethdb.Database, cacheConfig *core.CacheConfig, initData statetransfer.InitDataReader, chainConfig *params.ChainConfig, initMessage *arbostypes.ParsedInitMessage, txLookupLimit uint64, accountsPerSync uint, forceTriedbCommitHook core.ForceTriedbCommitHook) (*core.BlockChain, error) { err := WriteOrTestGenblock(chainDb, initData, chainConfig, initMessage, accountsPerSync) if err != nil { return nil, err @@ -186,7 +185,7 @@ func WriteOrTestBlockChain(chainDb ethdb.Database, cacheConfig *core.CacheConfig if err != nil { return nil, err } - return GetBlockChain(chainDb, cacheConfig, chainConfig, txLookupLimit) + return GetBlockChain(chainDb, cacheConfig, chainConfig, txLookupLimit, forceTriedbCommitHook) } // Don't preserve reorg'd out blocks diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 00337cc355..ae97f1327a 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -49,6 +49,7 @@ type Config struct { Caching CachingConfig `koanf:"caching"` RPC arbitrum.Config `koanf:"rpc"` TxLookupLimit uint64 `koanf:"tx-lookup-limit"` + SyncHelper NitroSyncHelperConfig `koanf:"sync-helper"` Dangerous DangerousConfig `koanf:"dangerous"` forwardingTarget string @@ -136,6 +137,7 @@ type ExecutionNode struct { TxPublisher TransactionPublisher ConfigFetcher ConfigFetcher ParentChainReader *headerreader.HeaderReader + SyncHelper *NitroSyncHelper started atomic.Bool } @@ -199,6 +201,12 @@ func CreateExecutionNode( return nil, err } + var syncHelper *NitroSyncHelper + if config.SyncHelper.Enabled { + syncHelperConfigFetcher := func() *NitroSyncHelperConfig { return &configFetcher().SyncHelper } + syncHelper = NewNitroSyncHelper(syncHelperConfigFetcher, l2BlockChain) + } + apis := []rpc.API{{ Namespace: "arb", Version: "1.0", @@ -243,6 +251,7 @@ func CreateExecutionNode( TxPublisher: txPublisher, ConfigFetcher: configFetcher, ParentChainReader: parentChainReader, + SyncHelper: syncHelper, }, nil } @@ -282,6 +291,12 @@ func (n *ExecutionNode) Start(ctx context.Context) error { if n.ParentChainReader != nil { n.ParentChainReader.Start(ctx) } + if n.SyncHelper != nil { + err := n.SyncHelper.Start(ctx) + if err != nil { + return fmt.Errorf("Failed to start sync helper: %w", err) + } + } return nil } @@ -289,6 +304,9 @@ func (n *ExecutionNode) StopAndWait() { if !n.started.Load() { return } + if n.SyncHelper != nil { + n.SyncHelper.StopAndWait() + } // TODO after separation // n.Stack.StopRPC() // does nothing if not running if n.TxPublisher.Started() { @@ -332,6 +350,11 @@ func (n *ExecutionNode) SequenceDelayedMessage(message *arbostypes.L1IncomingMes func (n *ExecutionNode) ResultAtPos(pos arbutil.MessageIndex) (*execution.MessageResult, error) { return n.ExecEngine.ResultAtPos(pos) } +func (n *ExecutionNode) SetConfirmedNodeHelper(confirmedNodeHelper execution.ConfirmedNodeHelper) { + if n.SyncHelper != nil { + n.SyncHelper.SetConfirmedNodeHelper(confirmedNodeHelper) + } +} func (n *ExecutionNode) RecordBlockCreation( ctx context.Context, diff --git a/execution/gethexec/sync_helper.go b/execution/gethexec/sync_helper.go new file mode 100644 index 0000000000..403ad1ef39 --- /dev/null +++ b/execution/gethexec/sync_helper.go @@ -0,0 +1,296 @@ +package gethexec + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator" + flag "github.com/spf13/pflag" +) + +type NitroSyncHelperConfig struct { + Enabled bool `koanf:"enabled"` + CheckpointPeriod uint64 `koanf:"checkpoint-period"` + CheckpointCache uint `koanf:"checkpoint-cache"` +} + +func NitroSyncHelperConfigAddOptions(prefix string, f *flag.FlagSet) { + f.Uint64(prefix+".checkpoint-period", NitroSyncHelperConfigDefault.CheckpointPeriod, "number of blocks between sync checkpoints") + f.Uint(prefix+".checkpoint-cache", NitroSyncHelperConfigDefault.CheckpointCache, "number of recently confirmed checkpoints to keep in cache") +} + +var NitroSyncHelperConfigDefault = NitroSyncHelperConfig{ + Enabled: true, // TODO + CheckpointPeriod: 10 * 1000, // TODO + CheckpointCache: 16, // TODO +} + +// implements arbitrum.SyncHelper +// implements staker.LatestConfirmedNotifier +type NitroSyncHelper struct { + stopwaiter.StopWaiter + config NitroSyncHelperConfigFetcher + bc *core.BlockChain + checkpointCache *CheckpointCache + newConfirmed chan Confirmed + + lastConfirmedLock sync.RWMutex + lastConfirmed *Confirmed + + confirmedNodeHelper execution.ConfirmedNodeHelper +} + +type NitroSyncHelperConfigFetcher func() *NitroSyncHelperConfig + +func NewNitroSyncHelper(config NitroSyncHelperConfigFetcher, bc *core.BlockChain) *NitroSyncHelper { + return &NitroSyncHelper{ + config: config, + bc: bc, + checkpointCache: NewCheckpointCache(int(config().CheckpointCache)), + newConfirmed: make(chan Confirmed), // TODO + } +} + +func (h *NitroSyncHelper) SetConfirmedNodeHelper(confirmedHelper execution.ConfirmedNodeHelper) { + if h.Started() { + panic("trying to set confirmed node validator after nitro sync helper start") + } + if h.confirmedNodeHelper != nil { + panic("trying to set confirmed node validator when already set") + } + h.confirmedNodeHelper = confirmedHelper +} + +func (h *NitroSyncHelper) Start(ctx context.Context) error { + if err := h.StopWaiter.StopWaiterSafe.Start(ctx, h); err != nil { + return err + } + if h.confirmedNodeHelper != nil { + err := h.confirmedNodeHelper.SubscribeLatest(h) + if err != nil { + return fmt.Errorf("Failed to subscribe for latest confirmed notifications: %w", err) + } + } + return h.StopWaiterSafe.LaunchThreadSafe(func(ctx context.Context) { + for { + select { + // TODO refactor the newConfirmed channel (might not be needed as confirmedNodeHelper should handle non blocking update propagation) + case c := <-h.newConfirmed: + if err := h.updateLastConfirmed(ctx, &c); err != nil { + log.Error("Sync helper failed to update last confirmed", "err", err) + } + case <-ctx.Done(): + return + } + } + }) +} + +// returns true and previous value if last confirmed was updated +// otherwise returns false and nil +func (h *NitroSyncHelper) updateLastConfirmed(ctx context.Context, newConfirmed *Confirmed) error { + // validate block hash + header := h.bc.GetHeaderByNumber(uint64(newConfirmed.BlockNumber)) + newConfirmed.Header = header + if hash := header.Hash(); hash.Cmp(newConfirmed.BlockHash) != 0 { + return fmt.Errorf("confirmed BlockHash doesn't match header hash from blockchain, block #%d %s, confirmed %s", newConfirmed.BlockNumber, hash, newConfirmed.BlockHash) + } + + h.lastConfirmedLock.Lock() + defer h.lastConfirmedLock.Unlock() + previousConfirmed := h.lastConfirmed + if previousConfirmed != nil { + if newConfirmed.BlockNumber == previousConfirmed.BlockNumber { + if newConfirmed.BlockHash != previousConfirmed.BlockHash || newConfirmed.Node != previousConfirmed.Node { + return fmt.Errorf("New confirmed block number same as previous confirmed, but block hash and/or node number doesn't match, block #%d, previous %s node %d, new %s node %d", previousConfirmed.BlockNumber, previousConfirmed.BlockHash, previousConfirmed.Node, newConfirmed.BlockHash, newConfirmed.Node) + } + return nil + } + if newConfirmed.BlockNumber < previousConfirmed.BlockNumber { + // TODO do we want to continue either way? + return fmt.Errorf("New confirmed block number lower then previous confirmed, previous block #%d %s node %d, new block #%d %s node %d", previousConfirmed.BlockNumber, previousConfirmed.BlockHash, previousConfirmed.Node, newConfirmed.BlockNumber, newConfirmed.BlockHash, newConfirmed.Node) + } + } + h.lastConfirmed = newConfirmed + return h.scanNewConfirmedCheckpoints(ctx, newConfirmed, previousConfirmed) +} + +// scan for new confirmed and available checkpoints and add them to cache +func (h *NitroSyncHelper) scanNewConfirmedCheckpoints(ctx context.Context, newConfirmed *Confirmed, previousConfirmed *Confirmed) error { + period := int64(h.config().CheckpointPeriod) + var nextCheckpoint int64 + if previousConfirmed == nil { + genesis := int64(h.bc.Config().ArbitrumChainParams.GenesisBlockNum) + nextCheckpoint = (genesis/period + 1) * period // TODO add option to start the scan from n blocks before nextCheckpoint.BlockNumber + } else { + nextCheckpoint = (previousConfirmed.BlockNumber/period + 1) * period + } + for nextCheckpoint <= newConfirmed.BlockNumber && ctx.Err() == nil { + header := h.bc.GetHeaderByNumber(uint64(nextCheckpoint)) + if header == nil { + // TODO should we continue and just skip this checkpoint? + return fmt.Errorf("missing header for block #%d", nextCheckpoint) + } + // TODO can we just use h.bc.StateAt? + if _, err := state.New(header.Root, h.bc.StateCache(), nil); err == nil { + h.checkpointCache.Add(header) + } + nextCheckpoint += period + } + return nil +} + +func GetForceTriedbCommitHookForConfig(config NitroSyncHelperConfigFetcher) core.ForceTriedbCommitHook { + if !config().Enabled { + // TODO do we want to support hot-reloading of Enabled? + return nil + } + return func(block *types.Block, processing time.Duration, gas uint64) bool { + if block.NumberU64() == 0 { + return false + } + commit := block.NumberU64()%config().CheckpointPeriod == 0 + // TODO add condition for minimal processing since last checkpoint + // TODO add condition for minimal gas used since last checkpoint + _ = processing + _ = gas + return commit + } +} + +// implements staker.LatestConfirmedNotifier +func (h *NitroSyncHelper) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState, node uint64) { + genesis := h.bc.Config().ArbitrumChainParams.GenesisBlockNum + h.newConfirmed <- Confirmed{ + BlockNumber: arbutil.MessageCountToBlockNumber(count, genesis), + BlockHash: globalState.BlockHash, + // TODO do we want to also use SendRoot? + Node: node, + Header: nil, + } +} + +func (h *NitroSyncHelper) LastCheckpoint() (*types.Header, error) { + if last := h.checkpointCache.Last(); last != nil { + return last, nil + } + return nil, errors.New("unavailable") +} + +func (h *NitroSyncHelper) CheckpointSupported(header *types.Header) (bool, error) { + if header == nil { + return false, errors.New("header is nil") + } + return h.checkpointCache.Has(header), nil +} + +func (h *NitroSyncHelper) LastConfirmed() (*types.Header, uint64, error) { + h.lastConfirmedLock.RLock() + defer h.lastConfirmedLock.RUnlock() + if h.lastConfirmed == nil { + return nil, 0, errors.New("unavailable") + } + return h.lastConfirmed.Header, h.lastConfirmed.Node, nil +} + +func (h *NitroSyncHelper) ValidateConfirmed(header *types.Header, node uint64) (bool, error) { + if !h.Started() { + return false, errors.New("not started") + } + if header == nil { + return false, errors.New("header is nil") + } + if h.confirmedNodeHelper == nil { + return false, errors.New("confirmed node validator unavailable") + } + hash := header.Hash() + return h.confirmedNodeHelper.Validate(node, hash) +} + +type Confirmed struct { + BlockNumber int64 + BlockHash common.Hash + Node uint64 + Header *types.Header // filled out later in updateLastConfirmed +} + +type CheckpointCache struct { + capacity int + + lock sync.RWMutex + checkpointsMap map[uint64]*types.Header + checkpoints []*types.Header +} + +// capacity has to be greater then 0 +func NewCheckpointCache(capacity int) *CheckpointCache { + if capacity <= 0 { + capacity = 1 + } + cache := &CheckpointCache{ + capacity: capacity, + checkpointsMap: make(map[uint64]*types.Header, capacity), + checkpoints: make([]*types.Header, 0, capacity), + } + return cache +} + +func (c *CheckpointCache) Add(header *types.Header) { + c.lock.Lock() + defer c.lock.Unlock() + if len(c.checkpoints) >= c.capacity { + var dropped *types.Header + dropped, c.checkpoints = c.checkpoints[0], c.checkpoints[1:] + delete(c.checkpointsMap, dropped.Number.Uint64()) + } + number := header.Number.Uint64() + if previous, has := c.checkpointsMap[number]; has { + // TODO do we expect this to happen in normal operations? + log.Warn("CheckpointCache: duplicate checkpoint header added, replacing previous", "number", number) + var i int + for i := 0; i < len(c.checkpoints); i++ { + if c.checkpoints[i] == previous { + break + } + } + if i == len(c.checkpoints) { + // shouldn't ever happen + log.Error("CheckpointCache: duplicate not found in checkpoints slice", "number", number) + } else { + c.checkpoints = append(c.checkpoints[:i], c.checkpoints[i+1:]...) + } + } + c.checkpoints = append(c.checkpoints, header) + c.checkpointsMap[number] = header +} + +func (c *CheckpointCache) Has(header *types.Header) bool { + c.lock.RLock() + defer c.lock.RUnlock() + cached, has := c.checkpointsMap[header.Number.Uint64()] + if !has { + return false + } + // TODO won't comparing fields be more efficient? + return header.Hash().Cmp(cached.Hash()) == 0 +} + +func (c *CheckpointCache) Last() *types.Header { + c.lock.RLock() + defer c.lock.RUnlock() + if len(c.checkpoints) > 0 { + return c.checkpoints[len(c.checkpoints)-1] + } + return nil +} diff --git a/execution/gethexec/sync_helper_test.go b/execution/gethexec/sync_helper_test.go new file mode 100644 index 0000000000..5941c3cedb --- /dev/null +++ b/execution/gethexec/sync_helper_test.go @@ -0,0 +1,242 @@ +package gethexec + +import ( + "context" + "errors" + "math/big" + "math/rand" + "testing" + + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/trie" +) + +type testBlockchainOptions struct { + cachingConfig *CachingConfig + blocksNum int + forceTriedbCommitHook core.ForceTriedbCommitHook +} + +func createTestBlockchain(t *testing.T, opts testBlockchainOptions) (*core.BlockChain, ethdb.Database) { + if opts.cachingConfig == nil { + opts.cachingConfig = &DefaultCachingConfig + } + stackConfig := node.DefaultConfig + stackConfig.DataDir = t.TempDir() + stackConfig.P2P.DiscoveryV4 = false + stackConfig.P2P.DiscoveryV5 = false + stackConfig.P2P.ListenAddr = "127.0.0.1:0" + stack, err := node.New(&stackConfig) + if err != nil { + t.Fatal(err) + } + db, err := stack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) + if err != nil { + t.Fatal(err) + } + + // create and populate chain + testUser, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + testUserAddress := crypto.PubkeyToAddress(testUser.PublicKey) + + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{ + testUserAddress: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}, + }, + } + + coreCacheConfig := DefaultCacheConfigFor(stack, opts.cachingConfig) + bc, _ := core.NewArbBlockChain(db, coreCacheConfig, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil, opts.forceTriedbCommitHook) + signer := types.MakeSigner(bc.Config(), big.NewInt(1), 0) + + _, blocks, allReceipts := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), opts.blocksNum, func(i int, gen *core.BlockGen) { + nonce := gen.TxNonce(testUserAddress) + tx, err := types.SignNewTx(testUser, signer, &types.LegacyTx{ + Nonce: nonce, + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + + }) + for _, receipts := range allReceipts { + if len(receipts) < 1 { + t.Fatal("missing receipts") + } + for _, receipt := range receipts { + if receipt.Status == 0 { + t.Fatal("failed transaction") + } + } + } + if _, err := bc.InsertChain(blocks); err != nil { + t.Fatal(err) + } + return bc, db +} + +type syncHelperScanTestOptions struct { + blocksNum int + period int + commitedCheckpointsNum int // 0 = all +} + +func testSyncHelperScanNewConfirmedCheckpoints(t *testing.T, opts syncHelperScanTestOptions) { + cachingConfig := DefaultCachingConfig + cachingConfig.Archive = true + cachingConfig.SnapshotCache = 0 // disable snapshot to simplify removing states + cachingConfig.TrieCleanCache = 0 // disable trie/Database.cleans cache, so as states removed from ChainDb won't be cached there + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bc, db := createTestBlockchain(t, testBlockchainOptions{ + cachingConfig: &cachingConfig, + blocksNum: opts.blocksNum, + }) + config := NitroSyncHelperConfig{ + Enabled: true, + CheckpointPeriod: uint64(opts.period), + CheckpointCache: uint(opts.blocksNum * 2), // big enough to detect bugs + } + sh := NewNitroSyncHelper(func() *NitroSyncHelperConfig { return &config }, bc) + + for number := 0; number < opts.blocksNum; number++ { + header := bc.GetHeaderByNumber(uint64(number)) + if header == nil { + t.Fatal("can't get header, number:", number, "opts:", opts) + } + if sh.checkpointCache.Has(header) { + t.Fatal("unexpected error - checkpoint cache should be empty, but has header, number:", number, "opts:", opts) + } + } + statesKept := make(map[int]struct{}) + if opts.commitedCheckpointsNum > 0 { + toKeepCheckpoints := rand.Perm(opts.blocksNum / opts.period)[:opts.commitedCheckpointsNum] + for _, checkpoint := range toKeepCheckpoints { + block := (checkpoint + 1) * opts.period + statesKept[block] = struct{}{} + } + for number := 1; number < opts.blocksNum; number++ { + if _, keep := statesKept[number]; keep { + continue + } + header := bc.GetHeaderByNumber(uint64(number)) + if header == nil { + t.Fatal("can't get header, number:", number, "opts:", opts) + } + err := db.Delete(header.Root.Bytes()) + if err != nil { + t.Fatal("failed to delete key from db, err:", err, "opts:", opts) + } + _, err = bc.StateAt(header.Root) + if err == nil { + t.Fatal("internal test error - failed to remove state from db", "number", number, "opts:", opts) + } + expectedErr := &trie.MissingNodeError{} + if !errors.As(err, &expectedErr) { + t.Fatal("internal test error - failed to remove state from db, err: ", err, "opts:", opts) + } + } + } + var previousConfirmed *Confirmed + for number := 1; number < opts.blocksNum; number++ { + block := bc.GetBlockByNumber(uint64(number)) + if block == nil { + t.Fatal("can't get block, number:", number, "opts:", opts) + } + newConfirmed := Confirmed{ + BlockNumber: int64(number), + BlockHash: block.Hash(), + Node: 0, // doesn't metter here + Header: block.Header(), + } + sh.scanNewConfirmedCheckpoints(ctx, &newConfirmed, previousConfirmed) + previousConfirmed = &newConfirmed + } + for number := 0; number < opts.blocksNum; number++ { + header := bc.GetHeaderByNumber(uint64(number)) + if header == nil { + t.Fatal("can't get header, number:", number, "opts:", opts) + } + _, kept := statesKept[number] + if number != 0 && number%opts.period == 0 && (opts.commitedCheckpointsNum == 0 || kept) { + if !sh.checkpointCache.Has(header) { + t.Fatal("checkpoint cache doesn't have expected header, number:", number, "opts:", opts) + } + } else if sh.checkpointCache.Has(header) { + t.Fatal("checkpoint cache should not have the header, number:", number, "opts:", opts) + } + } +} + +func TestSyncHelperScanNewConfirmedCheckpoints(t *testing.T) { + options := []syncHelperScanTestOptions{} + for i := 1; i < 7; i++ { + options = append(options, syncHelperScanTestOptions{ + blocksNum: 51, + period: i, + commitedCheckpointsNum: 0, + }) + } + for i := 1; i < 7; i++ { + options = append(options, syncHelperScanTestOptions{ + blocksNum: 51, + period: 4, + commitedCheckpointsNum: rand.Intn(51/4) + 1, + }) + } + for _, o := range options { + testSyncHelperScanNewConfirmedCheckpoints(t, o) + } +} + +func TestForceTriedbCommitHook(t *testing.T) { + hook := GetForceTriedbCommitHookForConfig(func() *NitroSyncHelperConfig { + return &NitroSyncHelperConfig{ + Enabled: false, + CheckpointPeriod: 100, + CheckpointCache: 0, + } + }) + if hook != nil { + t.Fatal("Got non nil hook, but NitroSyncHelper was disabled in the config") + } + hook = GetForceTriedbCommitHookForConfig(func() *NitroSyncHelperConfig { + return &NitroSyncHelperConfig{ + Enabled: true, + CheckpointPeriod: 100, + CheckpointCache: 0, + } + }) + for _, i := range []int64{1, 10, 99, 101} { + header := &types.Header{ + Number: big.NewInt(i), + } + block := types.NewBlock(header, nil, nil, nil, nil) + if hook(block, 0, 0) { + t.Errorf("the hook returned true for block #%d", i) + } + } + for _, i := range []int64{100, 200, 300} { + header := &types.Header{ + Number: big.NewInt(i), + } + block := types.NewBlock(header, nil, nil, nil, nil) + if !hook(block, 0, 0) { + t.Errorf("the hook returned false for block #%d", i) + } + } +} diff --git a/execution/interface.go b/execution/interface.go index ef9409b9c1..fe76204932 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -33,6 +33,7 @@ type ExecutionClient interface { HeadMessageNumber() (arbutil.MessageIndex, error) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) ResultAtPos(pos arbutil.MessageIndex) (*MessageResult, error) + SetConfirmedNodeHelper(confirmedValidator ConfirmedNodeHelper) } // needed for validators / stakers @@ -81,3 +82,13 @@ type TransactionStreamer interface { WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata) error ExpectChosenSequencer() error } + +type ConfirmedNodeHelper interface { + Validate(node uint64, blockHash common.Hash) (bool, error) + SubscribeLatest(subscriber LatestConfirmedNotifier) error +} + +// TODO rename? +type LatestConfirmedNotifier interface { + UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState, node uint64) +} diff --git a/go-ethereum b/go-ethereum index 1e2855b24d..690ef23fc2 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 1e2855b24d6555c8cfaf471bd9e2c3d19ab5c32c +Subproject commit 690ef23fc2b4c8e95cdcc22e0af8ff0b7756aefd diff --git a/staker/confirmed_node_helper.go b/staker/confirmed_node_helper.go new file mode 100644 index 0000000000..74242e5fd4 --- /dev/null +++ b/staker/confirmed_node_helper.go @@ -0,0 +1,82 @@ +package staker + +import ( + "context" + "errors" + "sync" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/util/stopwaiter" + "github.com/offchainlabs/nitro/validator" +) + +// TODO rename to ConfirmedNodeHelper? +type ConfirmedNodeHelper struct { + stopwaiter.StopWaiter + rollupAddress common.Address + client arbutil.L1Interface + + // TODO refactor subscribers + subscribers []LatestConfirmedNotifier + subscribersLock sync.Mutex +} + +func NewConfirmedNodeHelper(rollupAddress common.Address, client arbutil.L1Interface) *ConfirmedNodeHelper { + return &ConfirmedNodeHelper{ + rollupAddress: rollupAddress, + client: client, + subscribers: []LatestConfirmedNotifier{}, + } +} + +func (h *ConfirmedNodeHelper) Start(ctx context.Context) { + h.StopWaiter.Start(ctx, h) +} + +func (h *ConfirmedNodeHelper) UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState, node uint64) { + // TODO propagate the update in a separate thread + h.subscribersLock.Lock() + defer h.subscribersLock.Unlock() + for _, subscriber := range h.subscribers { + subscriber.UpdateLatestConfirmed(count, globalState, node) + } +} + +func (h *ConfirmedNodeHelper) SubscribeLatest(subscriber execution.LatestConfirmedNotifier) error { + h.subscribersLock.Lock() + defer h.subscribersLock.Unlock() + h.subscribers = append(h.subscribers, subscriber) + return nil +} + +func (h *ConfirmedNodeHelper) Validate(node uint64, blockHash common.Hash) (bool, error) { + ctx, err := h.GetContextSafe() + if err != nil { + return false, err + } + // TODO do a binary search for block containing NodeConfirmed for validated node + var query = ethereum.FilterQuery{ + FromBlock: nil, + ToBlock: nil, + Addresses: []common.Address{h.rollupAddress}, + Topics: [][]common.Hash{{nodeConfirmedID}, {blockHash}, nil}, + } + logs, err := h.client.FilterLogs(ctx, query) + if err != nil { + return false, err + } + if len(logs) == 0 { + return false, nil + } + if len(logs) > 1 { + // TODO verify if it can happen, and if we should handle it better + log.Error("Found more then one log when validating confirmed node", "node", node, "blockHash", blockHash, "logs", logs) + return false, errors.New("unexpected number of logs for node confirmation") + } + // TODO validate the log? + return true, nil +} diff --git a/staker/rollup_watcher.go b/staker/rollup_watcher.go index c9fa3b132d..8577c8d977 100644 --- a/staker/rollup_watcher.go +++ b/staker/rollup_watcher.go @@ -24,6 +24,7 @@ import ( var rollupInitializedID common.Hash var nodeCreatedID common.Hash +var nodeConfirmedID common.Hash // used by ConfirmedNodeHelper var challengeCreatedID common.Hash func init() { @@ -33,6 +34,7 @@ func init() { } rollupInitializedID = parsedRollup.Events["RollupInitialized"].ID nodeCreatedID = parsedRollup.Events["NodeCreated"].ID + nodeConfirmedID = parsedRollup.Events["NodeConfirmed"].ID challengeCreatedID = parsedRollup.Events["RollupChallengeStarted"].ID } diff --git a/staker/staker.go b/staker/staker.go index 2a95e9c9f7..f9959f4c13 100644 --- a/staker/staker.go +++ b/staker/staker.go @@ -231,7 +231,7 @@ type LatestStakedNotifier interface { } type LatestConfirmedNotifier interface { - UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState) + UpdateLatestConfirmed(count arbutil.MessageIndex, globalState validator.GoGlobalState, node uint64) } type Staker struct { @@ -481,7 +481,7 @@ func (s *Staker) Start(ctxIn context.Context) { stakerLatestConfirmedNodeGauge.Update(int64(confirmed)) if confirmedGlobalState != nil { for _, notifier := range s.confirmedNotifiers { - notifier.UpdateLatestConfirmed(confirmedMsgCount, *confirmedGlobalState) + notifier.UpdateLatestConfirmed(confirmedMsgCount, *confirmedGlobalState, confirmed) } } return s.config.StakerInterval diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 2e17a50ede..5574f95299 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -716,7 +716,7 @@ func createL2BlockChainWithStackConfig( if cacheConfig != nil { coreCacheConfig = gethexec.DefaultCacheConfigFor(stack, cacheConfig) } - blockchain, err := gethexec.WriteOrTestBlockChain(chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0) + blockchain, err := gethexec.WriteOrTestBlockChain(chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0, nil) Require(t, err) return l2info, stack, chainDb, arbDb, blockchain @@ -914,7 +914,7 @@ func Create2ndNodeWithConfig( initMessage := getInitMessage(ctx, t, l1client, first.DeployInfo) coreCacheConfig := gethexec.DefaultCacheConfigFor(l2stack, &execConfig.Caching) - l2blockchain, err := gethexec.WriteOrTestBlockChain(l2chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0) + l2blockchain, err := gethexec.WriteOrTestBlockChain(l2chainDb, coreCacheConfig, initReader, chainConfig, initMessage, gethexec.ConfigDefaultTest().TxLookupLimit, 0, nil) Require(t, err) AddDefaultValNode(t, ctx, nodeConfig, true) diff --git a/system_tests/das_test.go b/system_tests/das_test.go index 96de52e197..39668c9562 100644 --- a/system_tests/das_test.go +++ b/system_tests/das_test.go @@ -181,7 +181,7 @@ func TestDASRekey(t *testing.T) { l2arbDb, err := l2stackA.OpenDatabase("arbitrumdata", 0, 0, "", false) Require(t, err) - l2blockchain, err := gethexec.GetBlockChain(l2chainDb, nil, chainConfig, gethexec.ConfigDefaultTest().TxLookupLimit) + l2blockchain, err := gethexec.GetBlockChain(l2chainDb, nil, chainConfig, gethexec.ConfigDefaultTest().TxLookupLimit, nil) Require(t, err) execA, err := gethexec.CreateExecutionNode(ctx, l2stackA, l2chainDb, l2blockchain, l1client, gethexec.ConfigDefaultTest) diff --git a/system_tests/sync_helper_test.go b/system_tests/sync_helper_test.go new file mode 100644 index 0000000000..19d8dc54b0 --- /dev/null +++ b/system_tests/sync_helper_test.go @@ -0,0 +1,338 @@ +package arbtest + +import ( + "context" + "encoding/hex" + "math/big" + "net" + "os" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/arbitrum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/execution/gethexec" + "github.com/offchainlabs/nitro/validator" +) + +type dummyIterator struct { + lock sync.Mutex + nodes []*enode.Node // first one is never used +} + +func (i *dummyIterator) Next() bool { // moves to next node + i.lock.Lock() + defer i.lock.Unlock() + + if len(i.nodes) == 0 { + log.Info("dummy iterator: done") + return false + } + i.nodes = i.nodes[1:] + return len(i.nodes) > 0 +} + +func (i *dummyIterator) Node() *enode.Node { // returns current node + i.lock.Lock() + defer i.lock.Unlock() + if len(i.nodes) == 0 { + return nil + } + if i.nodes[0] != nil { + log.Info("dummy iterator: emit", "id", i.nodes[0].ID(), "ip", i.nodes[0].IP(), "tcp", i.nodes[0].TCP(), "udp", i.nodes[0].UDP()) + } + return i.nodes[0] +} + +func (i *dummyIterator) Close() { // ends the iterator + i.nodes = nil +} + +type dummyConfirmedNodeHelper struct { + confirmedHash common.Hash +} + +func (h *dummyConfirmedNodeHelper) Validate(node uint64, blockHash common.Hash) (bool, error) { + return blockHash == h.confirmedHash, nil +} + +func (h *dummyConfirmedNodeHelper) SubscribeLatest(subscriber execution.LatestConfirmedNotifier) error { + return nil +} + +func testHasBlock(t *testing.T, chain *core.BlockChain, block *types.Block, shouldHaveState bool) { + t.Helper() + hasHeader := chain.GetHeaderByNumber(block.NumberU64()) + if hasHeader == nil { + t.Fatal("block not found") + } + if hasHeader.Hash() != block.Hash() { + t.Fatal("wrong block in blockchain") + } + _, err := chain.StateAt(hasHeader.Root) + if err != nil && shouldHaveState { + t.Fatal("should have state, but doesn't") + } + if err == nil && !shouldHaveState { + t.Fatal("should not have state, but does") + } +} + +func portFromAddress(address string) (int, error) { + splitAddr := strings.Split(address, ":") + return strconv.Atoi(splitAddr[len(splitAddr)-1]) +} + +func TestSyncHelperExecutionSync(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + const pivotBlockNum = 50 + const syncBlockNum = 70 + const extraBlocks = 200 + syncHelperConfig := gethexec.NitroSyncHelperConfig{ + Enabled: true, + CheckpointPeriod: 10, + CheckpointCache: 10, + } + + glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.TerminalFormat(false))) + glogger.Verbosity(log.LvlTrace) + log.Root().SetHandler(glogger) + + // key for source node p2p + sourceKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + + // key for dest node p2p + destKey, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + + // source node + sourceStackConf := node.DefaultConfig + sourceStackConf.DataDir = t.TempDir() + sourceStackConf.P2P.DiscoveryV4 = false + sourceStackConf.P2P.DiscoveryV5 = false + sourceStackConf.P2P.ListenAddr = "127.0.0.1:0" + sourceStackConf.P2P.PrivateKey = sourceKey + + sourceStack, err := node.New(&sourceStackConf) + if err != nil { + t.Fatal(err) + } + sourceDb, err := sourceStack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) + if err != nil { + t.Fatal(err) + } + + // create and populate chain + + // code for contractcodehex below: + // pragma solidity ^0.8.20; + // + // contract Temmp { + // uint256[0x10000] private store; + // + // fallback(bytes calldata data) external payable returns (bytes memory) { + // uint16 index = uint16(uint256(bytes32(data[0:32]))); + // store[index] += 1; + // return ""; + // } + // } + contractCodeHex := "608060405234801561001057600080fd5b50610218806100206000396000f3fe608060405260003660606000838360009060209261001f9392919061008a565b9061002a91906100e7565b60001c9050600160008261ffff1662010000811061004b5761004a610146565b5b01600082825461005b91906101ae565b9250508190555060405180602001604052806000815250915050915050805190602001f35b600080fd5b600080fd5b6000808585111561009e5761009d610080565b5b838611156100af576100ae610085565b5b6001850283019150848603905094509492505050565b600082905092915050565b6000819050919050565b600082821b905092915050565b60006100f383836100c5565b826100fe81356100d0565b9250602082101561013e576101397fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff836020036008026100da565b831692505b505092915050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052603260045260246000fd5b6000819050919050565b7f4e487b7100000000000000000000000000000000000000000000000000000000600052601160045260246000fd5b60006101b982610175565b91506101c483610175565b92508282019050808211156101dc576101db61017f565b5b9291505056fea26469706673582212202777d6cb94519b9aa7026cf6dad162739731e124c6379b15c343ff1c6e84a5f264736f6c63430008150033" + contractCode, err := hex.DecodeString(contractCodeHex) + if err != nil { + t.Fatal("decode contract error:", err) + } + testUser, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + testUserAddress := crypto.PubkeyToAddress(testUser.PublicKey) + + testUser2, err := crypto.GenerateKey() + if err != nil { + t.Fatal("generate key err:", err) + } + testUser2Address := crypto.PubkeyToAddress(testUser2.PublicKey) + + gspec := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{ + testUserAddress: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}, + testUser2Address: {Balance: new(big.Int).Lsh(big.NewInt(1), 250)}, + }, + } + sourceChain, _ := core.NewArbBlockChain(sourceDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil, nil) + signer := types.MakeSigner(sourceChain.Config(), big.NewInt(1), 0) + + firstAddress := common.Address{} + _, blocks, allReceipts := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), syncBlockNum+extraBlocks, func(i int, gen *core.BlockGen) { + creationNonce := gen.TxNonce(testUser2Address) + tx, err := types.SignTx(types.NewContractCreation(creationNonce, new(big.Int), 1000000, gen.BaseFee(), contractCode), signer, testUser2) + if err != nil { + t.Fatalf("failed to create contract: %v", err) + } + gen.AddTx(tx) + + contractAddress := crypto.CreateAddress(testUser2Address, creationNonce) + + nonce := gen.TxNonce(testUserAddress) + tx, err = types.SignNewTx(testUser, signer, &types.LegacyTx{ + Nonce: nonce, + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + + iterHash := common.BigToHash(big.NewInt(int64(i))) + tx, err = types.SignNewTx(testUser, signer, &types.LegacyTx{ + To: &contractAddress, + Nonce: nonce + 1, + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + Data: iterHash[:], + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + + if firstAddress == (common.Address{}) { + firstAddress = contractAddress + } + + tx, err = types.SignNewTx(testUser, signer, &types.LegacyTx{ + To: &firstAddress, + Nonce: nonce + 2, + GasPrice: gen.BaseFee(), + Gas: uint64(1000001), + Data: iterHash[:], + }) + if err != nil { + t.Fatalf("failed to create tx: %v", err) + } + gen.AddTx(tx) + }) + + for _, receipts := range allReceipts { + if len(receipts) < 3 { + t.Fatal("missing receipts") + } + for _, receipt := range receipts { + if receipt.Status == 0 { + t.Fatal("failed transaction") + } + } + } + pivotBlock := blocks[pivotBlockNum-1] + syncBlock := blocks[syncBlockNum-1] + if _, err := sourceChain.InsertChain(blocks[:pivotBlockNum]); err != nil { + t.Fatal(err) + } + sourceChain.TrieDB().Commit(blocks[pivotBlockNum-1].Root(), true) + if _, err := sourceChain.InsertChain(blocks[pivotBlockNum:]); err != nil { + t.Fatal(err) + } + + // should have state of pivot but nothing around + testHasBlock(t, sourceChain, blocks[pivotBlockNum-2], false) + testHasBlock(t, sourceChain, blocks[pivotBlockNum-1], true) + testHasBlock(t, sourceChain, blocks[pivotBlockNum], false) + + // source node + sourceSyncHelper := gethexec.NewNitroSyncHelper(func() *gethexec.NitroSyncHelperConfig { return &syncHelperConfig }, sourceChain) + if err := sourceSyncHelper.Start(ctx); err != nil { + t.Fatal("failed to start source sync helper, err:", err) + } + sourceHandler := arbitrum.NewProtocolHandler(sourceDb, sourceChain, sourceSyncHelper, false) + sourceSyncHelper.UpdateLatestConfirmed(arbutil.BlockNumberToMessageCount(syncBlock.NumberU64(), 0), validator.GoGlobalState{BlockHash: syncBlock.Hash()}, 0) + _, _, err = sourceSyncHelper.LastConfirmed() + for err != nil { + // wait for the update + time.Sleep(10 * time.Millisecond) + _, _, err = sourceSyncHelper.LastConfirmed() + } + sourceStack.RegisterProtocols(sourceHandler.MakeProtocols(&dummyIterator{})) + if err := sourceStack.Start(); err != nil { + t.Fatal(err) + } + + // figure out port of the source node and create dummy iter that points to it + sourcePort, err := portFromAddress(sourceStack.Server().Config.ListenAddr) + if err != nil { + t.Fatal(err) + } + sourceEnode := enode.NewV4(&sourceKey.PublicKey, net.IPv4(127, 0, 0, 1), sourcePort, 0) + iter := &dummyIterator{ + nodes: []*enode.Node{nil, sourceEnode}, + } + + // dest node + destStackConf := sourceStackConf + destStackConf.DataDir = t.TempDir() + destStackConf.P2P.PrivateKey = destKey + destStack, err := node.New(&destStackConf) + if err != nil { + t.Fatal(err) + } + + destDb, err := destStack.OpenDatabaseWithFreezer("l2chaindata", 2048, 512, "", "", false) + if err != nil { + t.Fatal(err) + } + destChain, _ := core.NewArbBlockChain(destDb, nil, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil, nil) + + destSyncHelper := gethexec.NewNitroSyncHelper(func() *gethexec.NitroSyncHelperConfig { return &syncHelperConfig }, destChain) + confirmedHelper := &dummyConfirmedNodeHelper{syncBlock.Hash()} + destSyncHelper.SetConfirmedNodeHelper(confirmedHelper) + if err := destSyncHelper.Start(ctx); err != nil { + t.Fatal("failed to start destination sync helper, err:", err) + } + destHandler := arbitrum.NewProtocolHandler(destDb, destChain, destSyncHelper, true) + destStack.RegisterProtocols(destHandler.MakeProtocols(iter)) + + // start sync + log.Info("dest listener", "address", destStack.Server().Config.ListenAddr) + log.Info("initial source", "head", sourceChain.CurrentBlock()) + log.Info("initial dest", "head", destChain.CurrentBlock()) + log.Info("pivot", "head", pivotBlock.Header()) + if err := destStack.Start(); err != nil { + t.Fatal(err) + } + + <-time.After(time.Second * 5) + + log.Info("final source", "head", sourceChain.CurrentBlock()) + log.Info("final dest", "head", destChain.CurrentBlock()) + log.Info("sync block", "header", syncBlock.Header()) + + // check sync + if destChain.CurrentBlock().Number.Cmp(syncBlock.Number()) != 0 { + t.Fatal("did not sync to sync block") + } + + testHasBlock(t, destChain, syncBlock, true) + testHasBlock(t, destChain, pivotBlock, true) + testHasBlock(t, destChain, blocks[pivotBlockNum-2], false) +}