Skip to content

Commit

Permalink
add 1 more test case
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored and pnowosie committed Oct 23, 2024
1 parent cceb9f7 commit a7c4c6e
Showing 1 changed file with 146 additions and 42 deletions.
188 changes: 146 additions & 42 deletions rpc/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/jsonrpc"
"github.com/NethermindEth/juno/rpc"
adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/stretchr/testify/require"
)

var emptyCommitments = core.BlockCommitments{}

func TestEvents(t *testing.T) {
testDB := pebble.NewMemTest(t)
n := utils.Ptr(utils.Sepolia)
Expand Down Expand Up @@ -229,17 +232,31 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {
return fc.w == fc2.w
}

type fakeSyncer struct {
newHeads *feed.Feed[*core.Header]
}

func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription {
return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()}
}

func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) {
return 0, nil
}

func (fs *fakeSyncer) HighestBlockHeader() *core.Header {
return nil
}

func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
t.Parallel()
log := utils.NewNopZapLogger()
n := utils.Ptr(utils.Mainnet)
client := feeder.NewTestClient(t, n)
gw := adaptfeeder.New(client)

chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
chain := blockchain.New(pebble.NewMemTest(t), n)
syncer := sync.New(chain, gw, log, 0, false)
handler := rpc.New(chain, syncer, nil, "", log)

go func() {
require.NoError(t, handler.Run(ctx))
Expand All @@ -259,25 +276,28 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
require.Zero(t, id)
require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code)

// Sync blocks and then revert head.
// This is a super hacky way to deterministically receive a single block on the subscription.
// It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that.
syncCtx, syncCancel := context.WithTimeout(context.Background(), time.Second)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()
// This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer.
// But it works.
require.NoError(t, chain.RevertHead())

// Subscribe.
// Subscribe correctly.
subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
id, rpcErr = handler.SubscribeNewHeads(subCtx, nil)
require.Nil(t, rpcErr)

// Sync the block we reverted above.
syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()
// Simulate a new block
syncer.newHeads.Send(&core.Header{
Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"),
Number: 2,
GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"),
Timestamp: 1637084470,
SequencerAddress: utils.HexToFelt(t, "0x0"),
L1DataGasPrice: &core.GasPrice{
PriceInFri: utils.HexToFelt(t, "0x0"),
PriceInWei: utils.HexToFelt(t, "0x0"),
},
GasPrice: utils.HexToFelt(t, "0x0"),
GasPriceSTRK: utils.HexToFelt(t, "0x0"),
L1DAMode: core.Calldata,
ProtocolVersion: "",
})

// Receive a block header.
want := `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
Expand Down Expand Up @@ -312,32 +332,22 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {

func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
n := utils.Ptr(utils.Mainnet)
feederClient := feeder.NewTestClient(t, n)
gw := adaptfeeder.New(feederClient)
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
handler := rpc.New(chain, syncer, nil, "", log)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
chain := blockchain.New(pebble.NewMemTest(t), n)
syncer := sync.New(chain, gw, log, 0, false)
handler := rpc.New(chain, syncer, nil, "", log)

go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)

// Sync blocks and then revert head.
// This is a super hacky way to deterministically receive a single block on the subscription.
// It would be nicer if we could tell the synchronizer to exit after a certain block height, but, alas, we can't do that.
syncCtx, syncCancel := context.WithTimeout(context.Background(), time.Second)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()
// This is technically an unsafe thing to do. We're modifying the synchronizer's blockchain while it is owned by the synchronizer.
// But it works.
require.NoError(t, chain.RevertHead())

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribeNewHeads",
Expand Down Expand Up @@ -375,10 +385,23 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
require.NoError(t, err)
require.Equal(t, secondWant, string(secondGot))

// Now we're subscribed. Sync the block we reverted above.
syncCtx, syncCancel = context.WithTimeout(context.Background(), 250*time.Millisecond)
require.NoError(t, syncer.Run(syncCtx))
syncCancel()
// Simulate a new block
syncer.newHeads.Send(&core.Header{
Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"),
Number: 2,
GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"),
Timestamp: 1637084470,
SequencerAddress: utils.HexToFelt(t, "0x0"),
L1DataGasPrice: &core.GasPrice{
PriceInFri: utils.HexToFelt(t, "0x0"),
PriceInWei: utils.HexToFelt(t, "0x0"),
},
GasPrice: utils.HexToFelt(t, "0x0"),
GasPriceSTRK: utils.HexToFelt(t, "0x0"),
L1DAMode: core.Calldata,
ProtocolVersion: "",
})

// Receive a block header.
want = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
Expand All @@ -396,3 +419,84 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
require.NoError(t, conn1.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, firstID))))
require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, secondID))))
}

func TestSubscribeNewHeadsHistorical(t *testing.T) {
client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)

block0, err := gw.BlockByNumber(context.Background(), 0)
require.NoError(t, err)

stateUpdate0, err := gw.StateUpdate(context.Background(), 0)
require.NoError(t, err)

testDB := pebble.NewMemTest(t)
chain := blockchain.New(testDB, &utils.Mainnet)
assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))

chain = blockchain.New(testDB, &utils.Mainnet)
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)

serverConn, clientConn := net.Pipe()
t.Cleanup(func() {
require.NoError(t, serverConn.Close())
require.NoError(t, clientConn.Close())
})

subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})

// Subscribe to a block that doesn't exist.
id, rpcErr := handler.SubscribeNewHeads(subCtx, &rpc.BlockID{Number: 1025})
require.Equal(t, rpc.ErrBlockNotFound, rpcErr)
require.Zero(t, id)

// Subscribe to a block that exists.
id, rpcErr = handler.SubscribeNewHeads(subCtx, &rpc.BlockID{Number: 0})
require.Nil(t, rpcErr)
require.NotZero(t, id)

// Check block 0 content
want := `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
want = fmt.Sprintf(want, id.ID)
got := make([]byte, len(want))
_, err = clientConn.Read(got)
require.NoError(t, err)
require.Equal(t, want, string(got))

// Simulate a new block
syncer.newHeads.Send(&core.Header{
Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"),
Number: 2,
GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"),
Timestamp: 1637084470,
SequencerAddress: utils.HexToFelt(t, "0x0"),
L1DataGasPrice: &core.GasPrice{
PriceInFri: utils.HexToFelt(t, "0x0"),
PriceInWei: utils.HexToFelt(t, "0x0"),
},
GasPrice: utils.HexToFelt(t, "0x0"),
GasPriceSTRK: utils.HexToFelt(t, "0x0"),
L1DAMode: core.Calldata,
ProtocolVersion: "",
})

// Check new block content
want = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
want = fmt.Sprintf(want, id.ID)
got = make([]byte, len(want))
_, err = clientConn.Read(got)
require.NoError(t, err)
require.Equal(t, want, string(got))
}

0 comments on commit a7c4c6e

Please sign in to comment.