diff --git a/.github/codecov.yml b/.github/codecov.yml index 6ddfae7aa6..1c0b7e1b08 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -10,8 +10,7 @@ coverage: only_pulls: true ignore: - mocks - - l1/internal/contract/starknet.go - grpc/gen/* - vm - p2p/starknet/spec - - docs \ No newline at end of file + - docs diff --git a/ethereum/ethereum_test.go b/ethereum/ethereum_test.go index 3432b892d9..1a4c6240a7 100644 --- a/ethereum/ethereum_test.go +++ b/ethereum/ethereum_test.go @@ -391,7 +391,7 @@ type unreliableEthHandler struct { h *ethHandler } -func (e *unreliableEthHandler) ChainId() *hexutil.Big { //nolint:stylecheck +func (e *unreliableEthHandler) ChainId() *hexutil.Big { return e.h.ChainId() } diff --git a/mocks/mock_l1.go b/mocks/mock_l1.go new file mode 100644 index 0000000000..c7804eb826 --- /dev/null +++ b/mocks/mock_l1.go @@ -0,0 +1,52 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/NethermindEth/juno/sync (interfaces: L1) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + core "github.com/NethermindEth/juno/core" + event "github.com/ethereum/go-ethereum/event" + gomock "github.com/golang/mock/gomock" +) + +// MockL1 is a mock of L1 interface. +type MockL1 struct { + ctrl *gomock.Controller + recorder *MockL1MockRecorder +} + +// MockL1MockRecorder is the mock recorder for MockL1. +type MockL1MockRecorder struct { + mock *MockL1 +} + +// NewMockL1 creates a new mock instance. +func NewMockL1(ctrl *gomock.Controller) *MockL1 { + mock := &MockL1{ctrl: ctrl} + mock.recorder = &MockL1MockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockL1) EXPECT() *MockL1MockRecorder { + return m.recorder +} + +// WatchL1Heads mocks base method. +func (m *MockL1) WatchL1Heads(arg0 context.Context, arg1 chan<- *core.L1Head) (event.Subscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WatchL1Heads", arg0, arg1) + ret0, _ := ret[0].(event.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WatchL1Heads indicates an expected call of WatchL1Heads. +func (mr *MockL1MockRecorder) WatchL1Heads(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchL1Heads", reflect.TypeOf((*MockL1)(nil).WatchL1Heads), arg0, arg1) +} diff --git a/node/node.go b/node/node.go index b778db510d..07557c3ad0 100644 --- a/node/node.go +++ b/node/node.go @@ -18,7 +18,6 @@ import ( "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/db/pebble" "github.com/NethermindEth/juno/ethereum" - "github.com/NethermindEth/juno/l1" "github.com/NethermindEth/juno/metrics" "github.com/NethermindEth/juno/migration" "github.com/NethermindEth/juno/node/http" @@ -98,6 +97,27 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo chain := blockchain.New(database, cfg.Network, log) client := feeder.NewClient(cfg.Network.FeederURL()).WithUserAgent(ua) + + if cfg.EthNode == "" { + log.Warnw("Ethereum node address not found; will not verify against L1") + } else { + var ethNodeURL *url.URL + ethNodeURL, err = url.Parse(cfg.EthNode) + if err != nil { + return nil, fmt.Errorf("parse Ethereum node URL: %w", err) + } + if ethNodeURL.Scheme != "wss" && ethNodeURL.Scheme != "ws" { + return nil, errors.New("non-websocket Ethereum node URL (need wss://... or ws://...): " + cfg.EthNode) + } + var l1Client *l1.Client + l1Client, err = newL1Client(n.cfg.EthNode, n.blockchain, n.log) + if err != nil { + return nil, fmt.Errorf("create L1 client: %w", err) + } + + n.services = append(n.services, l1Client) + } + synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval) gatewayClient := gateway.NewClient(cfg.Network.GatewayURL(), log).WithUserAgent(ua) @@ -120,26 +140,6 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo services: []service.Service{rpcSrv, synchronizer}, } - if n.cfg.EthNode == "" { - n.log.Warnw("Ethereum node address not found; will not verify against L1") - } else { - var ethNodeURL *url.URL - ethNodeURL, err = url.Parse(n.cfg.EthNode) - if err != nil { - return nil, fmt.Errorf("parse Ethereum node URL: %w", err) - } - if ethNodeURL.Scheme != "wss" && ethNodeURL.Scheme != "ws" { - return nil, errors.New("non-websocket Ethereum node URL (need wss://... or ws://...): " + n.cfg.EthNode) - } - var l1Client *l1.Client - l1Client, err = newL1Client(n.cfg.EthNode, n.blockchain, n.log) - if err != nil { - return nil, fmt.Errorf("create L1 client: %w", err) - } - - n.services = append(n.services, l1Client) - } - if cfg.P2P { var privKeyStr string privKeyStr, _ = os.LookupEnv("P2P_PRIVATE_KEY") diff --git a/sync/sync.go b/sync/sync.go index e1173b8ea1..bf36cc60cc 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "fmt" "runtime" "time" @@ -11,14 +12,20 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/metrics" - "github.com/NethermindEth/juno/service" "github.com/NethermindEth/juno/starknetdata" "github.com/NethermindEth/juno/utils" + "github.com/ethereum/go-ethereum/event" "github.com/prometheus/client_golang/prometheus" "github.com/sourcegraph/conc/stream" ) -var _ service.Service = (*Synchronizer)(nil) +//go:generate mockgen -destination=../mocks/mock_l1.go -package=mocks github.com/NethermindEth/juno/sync L1 +type L1 interface { + // WatchL1Heads sends on sink whenever the Starknet block number + // in the core contract is incremented in a finalized Ethereum block. + // All errors received on the subscription's error channel are treated as fatal. + WatchL1Heads(context.Context, chan<- *core.L1Head) (event.Subscription, error) +} const ( opVerifyLabel = "verify" @@ -39,12 +46,14 @@ type Synchronizer struct { catchUpMode bool + l1 L1 + // metrics opTimers *prometheus.HistogramVec totalBlocks prometheus.Counter } -func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, +func New(bc *blockchain.Blockchain, l1 L1, starkNetData starknetdata.StarknetData, log utils.SimpleLogger, pendingPollInterval time.Duration, ) *Synchronizer { s := &Synchronizer{ @@ -61,6 +70,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, Namespace: "sync", Name: "blocks", }), + l1: l1, } metrics.MustRegister(s.opTimers, s.totalBlocks) return s @@ -68,8 +78,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, // Run starts the Synchronizer, returns an error if the loop is already running func (s *Synchronizer) Run(ctx context.Context) error { - s.syncBlocks(ctx) - return nil + return s.syncBlocks(ctx) } func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers *stream.Stream, @@ -219,12 +228,19 @@ func (s *Synchronizer) nextHeight() uint64 { return nextHeight } -func (s *Synchronizer) syncBlocks(syncCtx context.Context) { +func (s *Synchronizer) syncBlocks(syncCtx context.Context) error { defer func() { s.StartingBlockNumber = nil s.HighestBlockHeader = nil }() + l1Heads := make(chan *core.L1Head, 2048) + sub, err := s.l1.WatchL1Heads(syncCtx, l1Heads) + if err != nil { + return fmt.Errorf("watch L1 heads: %w", err) + } + defer sub.Unsubscribe() + fetchers, verifiers := s.setupWorkers() streamCtx, streamCancel := context.WithCancel(syncCtx) @@ -245,13 +261,28 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) { select { case <-syncCtx.Done(): pendingSem <- struct{}{} - return + return nil default: streamCtx, streamCancel = context.WithCancel(syncCtx) nextHeight = s.nextHeight() fetchers, verifiers = s.setupWorkers() s.log.Warnw("Restarting sync process", "height", nextHeight, "catchUpMode", s.catchUpMode) } + case head := <-l1Heads: + if err = s.Blockchain.SetL1Head(head); err == nil { + s.log.Infow("Updated l1 head", + "blockNumber", head.BlockNumber, + "blockHash", head.BlockHash.ShortString(), + "stateRoot", head.StateRoot.ShortString()) + } else { + err = fmt.Errorf("l1 head for block %d and state root %s: %w", head.BlockNumber, head.StateRoot.String(), err) + s.log.Errorw("Failed to set L1 head", "err", err) + } + case err = <-sub.Err(): + streamCancel() + fetchers.Wait() + verifiers.Wait() + return fmt.Errorf("sync: L1 heads subscription: %w", err) default: curHeight, curStreamCtx, curCancel := nextHeight, streamCtx, streamCancel fetchers.Go(func() stream.Callback { diff --git a/sync/sync_test.go b/sync/sync_test.go index a1711317ea..406f8e2482 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -23,12 +23,22 @@ import ( const timeout = time.Second +type fakeSubscription struct { + c <-chan error +} + +func (e *fakeSubscription) Err() <-chan error { return e.c } +func (e *fakeSubscription) Unsubscribe() {} + func TestSyncBlocks(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) t.Cleanup(mockCtrl.Finish) + l1 := mocks.NewMockL1(mockCtrl) + l1.EXPECT().WatchL1Heads(gomock.Any(), gomock.Any()).Return(&fakeSubscription{c: make(<-chan error)}, nil).Times(3) + client := feeder.NewTestClient(t, utils.MAINNET) gw := adaptfeeder.New(client) testBlockchain := func(t *testing.T, bc *blockchain.Blockchain) { @@ -59,7 +69,7 @@ func TestSyncBlocks(t *testing.T) { t.Parallel() testDB := pebble.NewMemTest() bc := blockchain.New(testDB, utils.MAINNET, log) - synchronizer := sync.New(bc, gw, log, time.Duration(0)) + synchronizer := sync.New(bc, l1, gw, log, time.Duration(0)) ctx, cancel := context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -78,7 +88,7 @@ func TestSyncBlocks(t *testing.T) { require.NoError(t, err) require.NoError(t, bc.Store(b0, &core.BlockCommitments{}, s0, nil)) - synchronizer := sync.New(bc, gw, log, time.Duration(0)) + synchronizer := sync.New(bc, l1, gw, log, time.Duration(0)) ctx, cancel := context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -139,7 +149,7 @@ func TestSyncBlocks(t *testing.T) { return gw.BlockLatest(context.Background()) }).AnyTimes() - synchronizer := sync.New(bc, mockSNData, log, time.Duration(0)) + synchronizer := sync.New(bc, l1, mockSNData, log, time.Duration(0)) ctx, cancel := context.WithTimeout(context.Background(), 2*timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -157,11 +167,16 @@ func TestReorg(t *testing.T) { integClient := feeder.NewTestClient(t, utils.INTEGRATION) integGw := adaptfeeder.New(integClient) + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + l1 := mocks.NewMockL1(mockCtrl) + l1.EXPECT().WatchL1Heads(gomock.Any(), gomock.Any()).Return(&fakeSubscription{c: make(<-chan error)}, nil).Times(2) + testDB := pebble.NewMemTest() // sync to integration for 2 blocks bc := blockchain.New(testDB, utils.INTEGRATION, utils.NewNopZapLogger()) - synchronizer := sync.New(bc, integGw, utils.NewNopZapLogger(), time.Duration(0)) + synchronizer := sync.New(bc, l1, integGw, utils.NewNopZapLogger(), time.Duration(0)) ctx, cancel := context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) @@ -176,7 +191,7 @@ func TestReorg(t *testing.T) { require.NoError(t, err) require.Equal(t, utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"), head.Hash) - synchronizer = sync.New(bc, mainGw, utils.NewNopZapLogger(), time.Duration(0)) + synchronizer = sync.New(bc, l1, mainGw, utils.NewNopZapLogger(), time.Duration(0)) ctx, cancel = context.WithTimeout(context.Background(), timeout) require.NoError(t, synchronizer.Run(ctx)) cancel() @@ -191,13 +206,18 @@ func TestReorg(t *testing.T) { func TestPending(t *testing.T) { t.Parallel() + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + l1 := mocks.NewMockL1(mockCtrl) + l1.EXPECT().WatchL1Heads(gomock.Any(), gomock.Any()).Return(&fakeSubscription{c: make(<-chan error)}, nil).Times(1) + client := feeder.NewTestClient(t, utils.MAINNET) gw := adaptfeeder.New(client) testDB := pebble.NewMemTest() log := utils.NewNopZapLogger() bc := blockchain.New(testDB, utils.MAINNET, log) - synchronizer := sync.New(bc, gw, log, time.Millisecond*100) + synchronizer := sync.New(bc, l1, gw, log, time.Millisecond*100) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) require.NoError(t, synchronizer.Run(ctx)) @@ -209,3 +229,76 @@ func TestPending(t *testing.T) { require.NoError(t, err) assert.Equal(t, head.Hash, pending.Block.ParentHash) } + +func TestL1(t *testing.T) { + t.Parallel() + + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + testErr := errors.New("test err") + + t.Run("failed subscription", func(t *testing.T) { + t.Parallel() + client := feeder.NewTestClient(t, utils.INTEGRATION) + gw := adaptfeeder.New(client) + testDB := pebble.NewMemTest() + log := utils.NewNopZapLogger() + bc := blockchain.New(testDB, utils.MAINNET, log) + + l1 := mocks.NewMockL1(mockCtrl) + errChan := make(chan error, 1) + t.Cleanup(func() { close(errChan) }) + errChan <- testErr + l1.EXPECT().WatchL1Heads(gomock.Any(), gomock.Any()).Return(&fakeSubscription{c: errChan}, nil).Times(1) + + synchronizer := sync.New(bc, l1, gw, log, time.Millisecond*100) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + require.ErrorIs(t, synchronizer.Run(ctx), testErr) + cancel() + }) + + t.Run("failed subscription attempt", func(t *testing.T) { + t.Parallel() + client := feeder.NewTestClient(t, utils.INTEGRATION) + gw := adaptfeeder.New(client) + testDB := pebble.NewMemTest() + log := utils.NewNopZapLogger() + bc := blockchain.New(testDB, utils.MAINNET, log) + + l1 := mocks.NewMockL1(mockCtrl) + l1.EXPECT().WatchL1Heads(gomock.Any(), gomock.Any()).Return(nil, testErr).Times(1) + + synchronizer := sync.New(bc, l1, gw, log, time.Millisecond*100) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + require.ErrorIs(t, synchronizer.Run(ctx), testErr) + }) + + t.Run("successful subscription", func(t *testing.T) { + t.Parallel() + client := feeder.NewTestClient(t, utils.INTEGRATION) + gw := adaptfeeder.New(client) + testDB := pebble.NewMemTest() + log := utils.NewNopZapLogger() + bc := blockchain.New(testDB, utils.MAINNET, log) + + l1 := mocks.NewMockL1(mockCtrl) + l1Head := &core.L1Head{ + BlockNumber: 0, + BlockHash: new(felt.Felt), + StateRoot: new(felt.Felt), + } + + l1.EXPECT().WatchL1Heads(gomock.Any(), gomock.Any()).Do(func(_ context.Context, l1HeadsChan chan<- *core.L1Head) { + l1HeadsChan <- l1Head + }).Return(&fakeSubscription{c: make(<-chan error)}, nil).Times(1) + + synchronizer := sync.New(bc, l1, gw, log, time.Millisecond*100) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + require.NoError(t, synchronizer.Run(ctx)) + got, err := bc.L1Head() + require.NoError(t, err) + assert.Equal(t, l1Head, got) + }) +}