Skip to content

Commit

Permalink
Move subscription to synchronizer
Browse files Browse the repository at this point in the history
In the words of one wise engineer:

"We could make blockchain just a DB and move all the moving parts to sync."
  • Loading branch information
joshklop authored and omerfirmak committed Oct 30, 2023
1 parent e5f6dd5 commit d1be96d
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 81 deletions.
23 changes: 1 addition & 22 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,9 @@ import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/encoder"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/utils"
)

// This is a work-around. mockgen chokes when the instantiated generic type is in the interface.
type HeaderSubscription struct {
*feed.Subscription[*core.Header]
}

//go:generate mockgen -destination=../mocks/mock_blockchain.go -package=mocks github.com/NethermindEth/juno/blockchain Reader
type Reader interface {
Height() (height uint64, err error)
Expand Down Expand Up @@ -49,8 +43,6 @@ type Reader interface {
EventFilter(from *felt.Felt, keys [][]felt.Felt) (*EventFilter, error)

Pending() (Pending, error)

SubscribeNewHeads() HeaderSubscription
}

var (
Expand Down Expand Up @@ -79,8 +71,6 @@ type Blockchain struct {
database db.DB

log utils.SimpleLogger

newHeads *feed.Feed[*core.Header]
}

func New(database db.DB, network utils.Network, log utils.SimpleLogger) *Blockchain {
Expand All @@ -89,7 +79,6 @@ func New(database db.DB, network utils.Network, log utils.SimpleLogger) *Blockch
database: database,
network: network,
log: log,
newHeads: feed.New[*core.Header](),
}
}

Expand Down Expand Up @@ -322,7 +311,7 @@ func (b *Blockchain) SetL1Head(update *core.L1Head) error {
func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommitments,
stateUpdate *core.StateUpdate, newClasses map[felt.Felt]core.Class,
) error {
err := b.database.Update(func(txn db.Transaction) error {
return b.database.Update(func(txn db.Transaction) error {
if err := verifyBlock(txn, block); err != nil {
return err
}
Expand Down Expand Up @@ -357,10 +346,6 @@ func (b *Blockchain) Store(block *core.Block, blockCommitments *core.BlockCommit
heightBin := core.MarshalBlockNumber(block.Number)
return txn.Set(db.ChainHeight.Key(), heightBin)
})
if err == nil {
b.newHeads.Send(block.Header)
}
return err
}

// VerifyBlock assumes the block has already been sanity-checked.
Expand Down Expand Up @@ -990,9 +975,3 @@ func (b *Blockchain) PendingState() (core.StateReader, StateCloser, error) {
core.NewState(txn),
), txn.Discard, nil
}

func (b *Blockchain) SubscribeNewHeads() HeaderSubscription {
return HeaderSubscription{
Subscription: b.newHeads.Subscribe(),
}
}
28 changes: 0 additions & 28 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,31 +758,3 @@ func TestPending(t *testing.T) {
require.NoError(t, pErr)
})
}

func TestSubscribeNewHeads(t *testing.T) {
testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, utils.MAINNET, utils.NewNopZapLogger())
client := feeder.NewTestClient(t, utils.MAINNET)
gw := adaptfeeder.New(client)

block0, err := gw.BlockByNumber(context.Background(), 0)
require.NoError(t, err)
su0, err := gw.StateUpdate(context.Background(), 0)
require.NoError(t, err)
require.NoError(t, chain.Store(block0, &core.BlockCommitments{}, su0, nil))

sub := chain.SubscribeNewHeads()
t.Cleanup(sub.Unsubscribe)

t.Run("send on store", func(t *testing.T) {
block1, err := gw.BlockByNumber(context.Background(), 1)
require.NoError(t, err)
su1, err := gw.StateUpdate(context.Background(), 1)
require.NoError(t, err)
require.NoError(t, chain.Store(block1, &core.BlockCommitments{}, su1, nil))

got1, notClosed := <-sub.Recv()
require.True(t, notClosed)
assert.Equal(t, block1.Header, got1)
})
}
14 changes: 0 additions & 14 deletions mocks/mock_blockchain.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions mocks/mock_synchronizer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (h *Handler) WithIDGen(idgen func() uint64) *Handler {
}

func (h *Handler) Run(ctx context.Context) error {
newHeadsSub := h.bcReader.SubscribeNewHeads().Subscription
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
defer newHeadsSub.Unsubscribe()
feed.Tee[*core.Header](newHeadsSub, h.newHeads)
<-ctx.Done()
Expand Down Expand Up @@ -1521,8 +1521,8 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()
headerSub := h.newHeads.Subscribe()
sub.wg.Go(func() {
headerSub := h.newHeads.Subscribe()
defer func() {
headerSub.Unsubscribe()
h.unsubscribe(sub, id)
Expand Down
53 changes: 38 additions & 15 deletions rpc/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/NethermindEth/juno/mocks"
"github.com/NethermindEth/juno/rpc"
adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder"
"github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -2327,10 +2328,9 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
gw := adaptfeeder.New(client)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
su, block, err := gw.StateUpdateWithBlock(ctx, 0)
require.NoError(t, err)
chain := blockchain.New(pebble.NewMemTest(t), network, log)
handler := rpc.New(chain, nil, network, nil, nil, nil, "", log)
syncer := sync.New(chain, gw, log, 0)
handler := rpc.New(chain, syncer, network, nil, nil, nil, "", log)
go func() {
require.NoError(t, handler.Run(ctx))
}()
Expand All @@ -2349,19 +2349,31 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
require.Zero(t, id)
require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code)

// Sync blocks and then revert head.
// This is a super hacky way to deterministically receive a single block on the subscription.
// It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that.
syncCtx, syncCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()
// This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer.
// But it works.
require.NoError(t, chain.RevertHead())

// Subscribe.
subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
id, rpcErr = handler.SubscribeNewHeads(subCtx)
require.Nil(t, rpcErr)

// Sync block.
require.NoError(t, chain.Store(block, nil, su, nil))
// Sync the block we reverted above.
syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()

// Receive a block header.
want := `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}`
want := `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}`
want = fmt.Sprintf(want, id)
got := make([]byte, len(want))
_, err = clientConn.Read(got)
_, err := clientConn.Read(got)
require.NoError(t, err)
require.Equal(t, want, string(got))

Expand Down Expand Up @@ -2395,17 +2407,26 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
gw := adaptfeeder.New(feederClient)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
su, block, err := gw.StateUpdateWithBlock(ctx, 0)
require.NoError(t, err)
chain := blockchain.New(pebble.NewMemTest(t), network, log)
handler := rpc.New(chain, nil, network, nil, nil, nil, "", log)
syncer := sync.New(chain, gw, log, 0)
handler := rpc.New(chain, syncer, network, nil, nil, nil, "", log)
go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)

// Sync blocks and then revert head.
// This is a super hacky way to deterministically receive a single block on the subscription.
// It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that.
syncCtx, syncCancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()
// This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer.
// But it works.
require.NoError(t, chain.RevertHead())

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "juno_subscribeNewHeads",
Expand Down Expand Up @@ -2442,19 +2463,21 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
require.NoError(t, err)
require.Equal(t, secondWant, string(secondGot))

// Sync block.
require.NoError(t, chain.Store(block, nil, su, nil))
// Now we're subscribed. Sync the block we reverted above.
syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()

// Receive a block header.
want = `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}`
want = `{"jsonrpc":"2.0","method":"juno_subscribeNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_wei":"0x0"}},"subscription":%d}}`
firstWant = fmt.Sprintf(want, firstID)
_, firstGot, err = conn1.Read(ctx)
require.NoError(t, err)
require.Equal(t, []byte(firstWant), firstGot)
require.Equal(t, firstWant, string(firstGot))
secondWant = fmt.Sprintf(want, secondID)
_, secondGot, err = conn2.Read(ctx)
require.NoError(t, err)
require.Equal(t, []byte(secondWant), secondGot)
require.Equal(t, secondWant, string(secondGot))

// Unsubscribe
unsubMsg := `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}`
Expand Down
16 changes: 16 additions & 0 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/service"
"github.com/NethermindEth/juno/starknetdata"
"github.com/NethermindEth/juno/utils"
Expand All @@ -28,10 +29,16 @@ const (
OpFetch = "fetch"
)

// This is a work-around. mockgen chokes when the instantiated generic type is in the interface.
type HeaderSubscription struct {
*feed.Subscription[*core.Header]
}

//go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader
type Reader interface {
StartingBlockNumber() (uint64, error)
HighestBlockHeader() *core.Header
SubscribeNewHeads() HeaderSubscription
}

// Synchronizer manages a list of StarknetData to fetch the latest blockchain updates
Expand All @@ -40,6 +47,7 @@ type Synchronizer struct {
starknetData starknetdata.StarknetData
startingBlockNumber *uint64
highestBlockHeader atomic.Pointer[core.Header]
newHeads *feed.Feed[*core.Header]

log utils.SimpleLogger
listener EventListener
Expand All @@ -55,6 +63,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
blockchain: bc,
starknetData: starkNetData,
log: log,
newHeads: feed.New[*core.Header](),
pendingPollInterval: pendingPollInterval,
listener: &SelectiveListener{},
}
Expand Down Expand Up @@ -198,6 +207,7 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header)
}

s.newHeads.Send(block.Header)
s.log.Infow("Stored Block", "number", block.Number, "hash",
block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString())
}
Expand Down Expand Up @@ -401,3 +411,9 @@ func (s *Synchronizer) StartingBlockNumber() (uint64, error) {
func (s *Synchronizer) HighestBlockHeader() *core.Header {
return s.highestBlockHeader.Load()
}

func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription {
return HeaderSubscription{
Subscription: s.newHeads.Subscribe(),
}
}
24 changes: 24 additions & 0 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,27 @@ func TestPending(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, head.Hash, pending.Block.ParentHash)
}

func TestSubscribeNewHeads(t *testing.T) {
t.Parallel()
testDB := pebble.NewMemTest(t)
log := utils.NewNopZapLogger()
integration := utils.INTEGRATION
chain := blockchain.New(testDB, integration, log)
integrationClient := feeder.NewTestClient(t, integration)
gw := adaptfeeder.New(integrationClient)
syncer := sync.New(chain, gw, log, 0)

sub := syncer.SubscribeNewHeads()

// Receive on new block.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
require.NoError(t, syncer.Run(ctx))
cancel()
got, ok := <-sub.Recv()
require.True(t, ok)
want, err := gw.BlockByNumber(context.Background(), 0)
require.NoError(t, err)
require.Equal(t, want.Header, got)
sub.Unsubscribe()
}

0 comments on commit d1be96d

Please sign in to comment.