From d1be96d59bba9852b08fa6fa3c5cc482f205196c Mon Sep 17 00:00:00 2001 From: Josh Klopfenstein Date: Wed, 25 Oct 2023 17:35:34 -0500 Subject: [PATCH] Move subscription to synchronizer In the words of one wise engineer: "We could make blockchain just a DB and move all the moving parts to sync." --- blockchain/blockchain.go | 23 +-------------- blockchain/blockchain_test.go | 28 ------------------ mocks/mock_blockchain.go | 14 --------- mocks/mock_synchronizer.go | 15 ++++++++++ rpc/handlers.go | 4 +-- rpc/handlers_test.go | 53 +++++++++++++++++++++++++---------- sync/sync.go | 16 +++++++++++ sync/sync_test.go | 24 ++++++++++++++++ 8 files changed, 96 insertions(+), 81 deletions(-) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index ee635fe36c..5b721b4573 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -11,15 +11,9 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/encoder" - "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/utils" ) -// This is a work-around. mockgen chokes when the instantiated generic type is in the interface. -type HeaderSubscription struct { - *feed.Subscription[*core.Header] -} - //go:generate mockgen -destination=../mocks/mock_blockchain.go -package=mocks github.com/NethermindEth/juno/blockchain Reader type Reader interface { Height() (height uint64, err error) @@ -49,8 +43,6 @@ type Reader interface { EventFilter(from *felt.Felt, keys [][]felt.Felt) (*EventFilter, error) Pending() (Pending, error) - - SubscribeNewHeads() HeaderSubscription } var ( @@ -79,8 +71,6 @@ type Blockchain struct { database db.DB log utils.SimpleLogger - - newHeads *feed.Feed[*core.Header] } func New(database db.DB, network utils.Network, log utils.SimpleLogger) *Blockchain { @@ -89,7 +79,6 @@ func New(database db.DB, network utils.Network, log utils.SimpleLogger) *Blockch database: database, network: network, log: log, - newHeads: feed.New[*core.Header](), } } @@ -322,7 +311,7 @@ func (b *Blockchain) SetL1Head(update *core.L1Head) error { func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommitments, stateUpdate *core.StateUpdate, newClasses map[felt.Felt]core.Class, ) error { - err := b.database.Update(func(txn db.Transaction) error { + return b.database.Update(func(txn db.Transaction) error { if err := verifyBlock(txn, block); err != nil { return err } @@ -357,10 +346,6 @@ func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommit heightBin := core.MarshalBlockNumber(block.Number) return txn.Set(db.ChainHeight.Key(), heightBin) }) - if err == nil { - b.newHeads.Send(block.Header) - } - return err } // VerifyBlock assumes the block has already been sanity-checked. @@ -990,9 +975,3 @@ func (b *Blockchain) PendingState() (core.StateReader, StateCloser, error) { core.NewState(txn), ), txn.Discard, nil } - -func (b *Blockchain) SubscribeNewHeads() HeaderSubscription { - return HeaderSubscription{ - Subscription: b.newHeads.Subscribe(), - } -} diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index ce3097902d..0da00c1b05 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -758,31 +758,3 @@ func TestPending(t *testing.T) { require.NoError(t, pErr) }) } - -func TestSubscribeNewHeads(t *testing.T) { - testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, utils.MAINNET, utils.NewNopZapLogger()) - client := feeder.NewTestClient(t, utils.MAINNET) - gw := adaptfeeder.New(client) - - block0, err := gw.BlockByNumber(context.Background(), 0) - require.NoError(t, err) - su0, err := gw.StateUpdate(context.Background(), 0) - require.NoError(t, err) - require.NoError(t, chain.Store(block0, &core.BlockCommitments{}, su0, nil)) - - sub := chain.SubscribeNewHeads() - t.Cleanup(sub.Unsubscribe) - - t.Run("send on store", func(t *testing.T) { - block1, err := gw.BlockByNumber(context.Background(), 1) - require.NoError(t, err) - su1, err := gw.StateUpdate(context.Background(), 1) - require.NoError(t, err) - require.NoError(t, chain.Store(block1, &core.BlockCommitments{}, su1, nil)) - - got1, notClosed := <-sub.Recv() - require.True(t, notClosed) - assert.Equal(t, block1.Header, got1) - }) -} diff --git a/mocks/mock_blockchain.go b/mocks/mock_blockchain.go index 5ec5f5d16f..e48b914d25 100644 --- a/mocks/mock_blockchain.go +++ b/mocks/mock_blockchain.go @@ -316,20 +316,6 @@ func (mr *MockReaderMockRecorder) StateUpdateByNumber(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateUpdateByNumber", reflect.TypeOf((*MockReader)(nil).StateUpdateByNumber), arg0) } -// SubscribeNewHeads mocks base method. -func (m *MockReader) SubscribeNewHeads() blockchain.HeaderSubscription { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SubscribeNewHeads") - ret0, _ := ret[0].(blockchain.HeaderSubscription) - return ret0 -} - -// SubscribeNewHeads indicates an expected call of SubscribeNewHeads. -func (mr *MockReaderMockRecorder) SubscribeNewHeads() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeNewHeads", reflect.TypeOf((*MockReader)(nil).SubscribeNewHeads)) -} - // TransactionByBlockNumberAndIndex mocks base method. func (m *MockReader) TransactionByBlockNumberAndIndex(arg0, arg1 uint64) (core.Transaction, error) { m.ctrl.T.Helper() diff --git a/mocks/mock_synchronizer.go b/mocks/mock_synchronizer.go index d140aae4b1..fc7be424a3 100644 --- a/mocks/mock_synchronizer.go +++ b/mocks/mock_synchronizer.go @@ -12,6 +12,7 @@ import ( reflect "reflect" core "github.com/NethermindEth/juno/core" + sync "github.com/NethermindEth/juno/sync" gomock "go.uber.org/mock/gomock" ) @@ -66,3 +67,17 @@ func (mr *MockSyncReaderMockRecorder) StartingBlockNumber() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartingBlockNumber", reflect.TypeOf((*MockSyncReader)(nil).StartingBlockNumber)) } + +// SubscribeNewHeads mocks base method. +func (m *MockSyncReader) SubscribeNewHeads() sync.HeaderSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeNewHeads") + ret0, _ := ret[0].(sync.HeaderSubscription) + return ret0 +} + +// SubscribeNewHeads indicates an expected call of SubscribeNewHeads. +func (mr *MockSyncReaderMockRecorder) SubscribeNewHeads() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeNewHeads", reflect.TypeOf((*MockSyncReader)(nil).SubscribeNewHeads)) +} diff --git a/rpc/handlers.go b/rpc/handlers.go index fd7b7c2550..277f1f3a48 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -132,7 +132,7 @@ func (h *Handler) WithIDGen(idgen func() uint64) *Handler { } func (h *Handler) Run(ctx context.Context) error { - newHeadsSub := h.bcReader.SubscribeNewHeads().Subscription + newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription defer newHeadsSub.Unsubscribe() feed.Tee[*core.Header](newHeadsSub, h.newHeads) <-ctx.Done() @@ -1521,8 +1521,8 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error h.mu.Lock() h.subscriptions[id] = sub h.mu.Unlock() + headerSub := h.newHeads.Subscribe() sub.wg.Go(func() { - headerSub := h.newHeads.Subscribe() defer func() { headerSub.Unsubscribe() h.unsubscribe(sub, id) diff --git a/rpc/handlers_test.go b/rpc/handlers_test.go index 3879577133..9df087cc47 100644 --- a/rpc/handlers_test.go +++ b/rpc/handlers_test.go @@ -22,6 +22,7 @@ import ( "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" + "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" @@ -2327,10 +2328,9 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { gw := adaptfeeder.New(client) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - su, block, err := gw.StateUpdateWithBlock(ctx, 0) - require.NoError(t, err) chain := blockchain.New(pebble.NewMemTest(t), network, log) - handler := rpc.New(chain, nil, network, nil, nil, nil, "", log) + syncer := sync.New(chain, gw, log, 0) + handler := rpc.New(chain, syncer, network, nil, nil, nil, "", log) go func() { require.NoError(t, handler.Run(ctx)) }() @@ -2349,19 +2349,31 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { require.Zero(t, id) require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) + // Sync blocks and then revert head. + // This is a super hacky way to deterministically receive a single block on the subscription. + // It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that. + syncCtx, syncCancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + require.NoError(t, syncer.Run(syncCtx)) + syncCancel() + // This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer. + // But it works. + require.NoError(t, chain.RevertHead()) + // Subscribe. subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) id, rpcErr = handler.SubscribeNewHeads(subCtx) require.Nil(t, rpcErr) - // Sync block. - require.NoError(t, chain.Store(block, nil, su, nil)) + // Sync the block we reverted above. + syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond) + require.NoError(t, syncer.Run(syncCtx)) + syncCancel() // Receive a block header. - want := `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}` + want := `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}` want = fmt.Sprintf(want, id) got := make([]byte, len(want)) - _, err = clientConn.Read(got) + _, err := clientConn.Read(got) require.NoError(t, err) require.Equal(t, want, string(got)) @@ -2395,10 +2407,9 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { gw := adaptfeeder.New(feederClient) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - su, block, err := gw.StateUpdateWithBlock(ctx, 0) - require.NoError(t, err) chain := blockchain.New(pebble.NewMemTest(t), network, log) - handler := rpc.New(chain, nil, network, nil, nil, nil, "", log) + syncer := sync.New(chain, gw, log, 0) + handler := rpc.New(chain, syncer, network, nil, nil, nil, "", log) go func() { require.NoError(t, handler.Run(ctx)) }() @@ -2406,6 +2417,16 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { // Sleep for a moment just in case. time.Sleep(50 * time.Millisecond) + // Sync blocks and then revert head. + // This is a super hacky way to deterministically receive a single block on the subscription. + // It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that. + syncCtx, syncCancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + require.NoError(t, syncer.Run(syncCtx)) + syncCancel() + // This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer. + // But it works. + require.NoError(t, chain.RevertHead()) + server := jsonrpc.NewServer(1, log) require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "juno_subscribeNewHeads", @@ -2442,19 +2463,21 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { require.NoError(t, err) require.Equal(t, secondWant, string(secondGot)) - // Sync block. - require.NoError(t, chain.Store(block, nil, su, nil)) + // Now we're subscribed. Sync the block we reverted above. + syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond) + require.NoError(t, syncer.Run(syncCtx)) + syncCancel() // Receive a block header. - want = `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}` + want = `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}` firstWant = fmt.Sprintf(want, firstID) _, firstGot, err = conn1.Read(ctx) require.NoError(t, err) - require.Equal(t, []byte(firstWant), firstGot) + require.Equal(t, firstWant, string(firstGot)) secondWant = fmt.Sprintf(want, secondID) _, secondGot, err = conn2.Read(ctx) require.NoError(t, err) - require.Equal(t, []byte(secondWant), secondGot) + require.Equal(t, secondWant, string(secondGot)) // Unsubscribe unsubMsg := `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}` diff --git a/sync/sync.go b/sync/sync.go index 218464a7c8..654c12c04a 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -11,6 +11,7 @@ import ( "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/service" "github.com/NethermindEth/juno/starknetdata" "github.com/NethermindEth/juno/utils" @@ -28,10 +29,16 @@ const ( OpFetch = "fetch" ) +// This is a work-around. mockgen chokes when the instantiated generic type is in the interface. +type HeaderSubscription struct { + *feed.Subscription[*core.Header] +} + //go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader type Reader interface { StartingBlockNumber() (uint64, error) HighestBlockHeader() *core.Header + SubscribeNewHeads() HeaderSubscription } // Synchronizer manages a list of StarknetData to fetch the latest blockchain updates @@ -40,6 +47,7 @@ type Synchronizer struct { starknetData starknetdata.StarknetData startingBlockNumber *uint64 highestBlockHeader atomic.Pointer[core.Header] + newHeads *feed.Feed[*core.Header] log utils.SimpleLogger listener EventListener @@ -55,6 +63,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, blockchain: bc, starknetData: starkNetData, log: log, + newHeads: feed.New[*core.Header](), pendingPollInterval: pendingPollInterval, listener: &SelectiveListener{}, } @@ -198,6 +207,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) } + s.newHeads.Send(block.Header) s.log.Infow("Stored Block", "number", block.Number, "hash", block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString()) } @@ -401,3 +411,9 @@ func (s *Synchronizer) StartingBlockNumber() (uint64, error) { func (s *Synchronizer) HighestBlockHeader() *core.Header { return s.highestBlockHeader.Load() } + +func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription { + return HeaderSubscription{ + Subscription: s.newHeads.Subscribe(), + } +} diff --git a/sync/sync_test.go b/sync/sync_test.go index 8081d93d57..bd3edc4e08 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -201,3 +201,27 @@ func TestPending(t *testing.T) { require.NoError(t, err) assert.Equal(t, head.Hash, pending.Block.ParentHash) } + +func TestSubscribeNewHeads(t *testing.T) { + t.Parallel() + testDB := pebble.NewMemTest(t) + log := utils.NewNopZapLogger() + integration := utils.INTEGRATION + chain := blockchain.New(testDB, integration, log) + integrationClient := feeder.NewTestClient(t, integration) + gw := adaptfeeder.New(integrationClient) + syncer := sync.New(chain, gw, log, 0) + + sub := syncer.SubscribeNewHeads() + + // Receive on new block. + ctx, cancel := context.WithTimeout(context.Background(), timeout) + require.NoError(t, syncer.Run(ctx)) + cancel() + got, ok := <-sub.Recv() + require.True(t, ok) + want, err := gw.BlockByNumber(context.Background(), 0) + require.NoError(t, err) + require.Equal(t, want.Header, got) + sub.Unsubscribe() +}