Skip to content

Commit

Permalink
tidy up tests code
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored and pnowosie committed Oct 23, 2024
1 parent e9aaefc commit 10ca74b
Showing 1 changed file with 78 additions and 120 deletions.
198 changes: 78 additions & 120 deletions rpc/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,31 +253,49 @@ func (fs *fakeSyncer) HighestBlockHeader() *core.Header {
return nil
}

func TestSubscribeNewHeads(t *testing.T) {
t.Parallel()
func setupSubscriptionTest(t *testing.T, ctx context.Context) (*rpc.Handler, *fakeSyncer, *jsonrpc.Server) {
t.Helper()

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
handler := rpc.New(chain, syncer, nil, "", log)

go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)

server := jsonrpc.NewServer(1, log)

return handler, syncer, server
}

func sendAndReceiveMessage(t *testing.T, ctx context.Context, conn *websocket.Conn, message string) string {
t.Helper()

require.NoError(t, conn.Write(ctx, websocket.MessageText, []byte(message)))

_, response, err := conn.Read(ctx)
require.NoError(t, err)
return string(response)
}

func TestSubscribeNewHeads(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

handler, syncer, server := setupSubscriptionTest(t, ctx)

require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribeNewHeads",
Params: []jsonrpc.Parameter{{Name: "block", Optional: true}},
Handler: handler.SubscribeNewHeads,
}))
ws := jsonrpc.NewWebsocket(server, log)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
Expand All @@ -286,13 +304,10 @@ func TestSubscribeNewHeads(t *testing.T) {
id := uint64(1)
handler.WithIDGen(func() uint64 { return id })

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`)
require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg))

subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`
got := sendAndReceiveMessage(t, ctx, conn, subscribeMsg)
want := fmt.Sprintf(subscribeResponse, id)
_, got, err := conn.Read(ctx)
require.NoError(t, err)
require.Equal(t, want, string(got))
require.Equal(t, want, got)

// Simulate a new block
syncer.newHeads.Send(testHeader(t))
Expand All @@ -307,22 +322,11 @@ func TestSubscribeNewHeads(t *testing.T) {
func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", log)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)
handler, syncer, server := setupSubscriptionTest(t, ctx)

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribeNewHeads",
Params: []jsonrpc.Parameter{{Name: "block", Optional: true}},
Expand All @@ -332,44 +336,45 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
Params: []jsonrpc.Parameter{{Name: "id"}},
Handler: handler.Unsubscribe,
}))
ws := jsonrpc.NewWebsocket(server, log)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
require.NoError(t, err)
conn2, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
require.NoError(t, err)

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`)
subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`

firstID := uint64(1)
secondID := uint64(2)
handler.WithIDGen(func() uint64 { return firstID })
require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg))

handler.WithIDGen(func() uint64 { return firstID })
firstWant := fmt.Sprintf(subscribeResponse, firstID)
_, firstGot, err := conn1.Read(ctx)
firstGot := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg)
require.NoError(t, err)
require.Equal(t, firstWant, string(firstGot))
require.Equal(t, firstWant, firstGot)

handler.WithIDGen(func() uint64 { return secondID })
require.NoError(t, conn2.Write(ctx, websocket.MessageText, subscribeMsg))
secondWant := fmt.Sprintf(subscribeResponse, secondID)
_, secondGot, err := conn2.Read(ctx)
secondGot := sendAndReceiveMessage(t, ctx, conn2, subscribeMsg)
require.NoError(t, err)
require.Equal(t, secondWant, string(secondGot))
require.Equal(t, secondWant, secondGot)

// Simulate a new block
syncer.newHeads.Send(testHeader(t))

// Receive a block header.
firstWant = fmt.Sprintf(newHeadsResponse, firstID)
_, firstGot, err = conn1.Read(ctx)
firstHeaderWant := fmt.Sprintf(newHeadsResponse, firstID)
_, firstHeaderGot, err := conn1.Read(ctx)
require.NoError(t, err)
require.Equal(t, firstWant, string(firstGot))
secondWant = fmt.Sprintf(newHeadsResponse, secondID)
_, secondGot, err = conn2.Read(ctx)
require.Equal(t, firstHeaderWant, string(firstHeaderGot))

secondHeaderWant := fmt.Sprintf(newHeadsResponse, secondID)
_, secondHeaderGot, err := conn2.Read(ctx)
require.NoError(t, err)
require.Equal(t, secondWant, string(secondGot))
require.Equal(t, secondHeaderWant, string(secondHeaderGot))

// Unsubscribe
unsubMsg := `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}`
Expand All @@ -378,6 +383,8 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
}

func TestSubscribeNewHeadsHistorical(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
client := feeder.NewTestClient(t, &utils.Mainnet)
gw := adaptfeeder.New(client)
Expand All @@ -402,16 +409,16 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
go func() {
require.NoError(t, handler.Run(ctx))
}()
// Technically, there's a race between goroutine above and the SubscribeNewHeads call down below.
// Sleep for a moment just in case.
time.Sleep(50 * time.Millisecond)

server := jsonrpc.NewServer(1, log)

require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribeNewHeads",
Params: []jsonrpc.Parameter{{Name: "block", Optional: true}},
Handler: handler.SubscribeNewHeads,
}))

ws := jsonrpc.NewWebsocket(server, log)
httpSrv := httptest.NewServer(ws)

Expand All @@ -421,13 +428,11 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
id := uint64(1)
handler.WithIDGen(func() uint64 { return id })

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads", "params":{"block":{"block_number":0}}}`)
require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg))

subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads", "params":{"block":{"block_number":0}}}`
got := sendAndReceiveMessage(t, ctx, conn, subscribeMsg)
want := fmt.Sprintf(subscribeResponse, id)
_, got, err := conn.Read(ctx)
require.NoError(t, err)
require.Equal(t, want, string(got))
require.Equal(t, want, got)

// Check block 0 content
want = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
Expand All @@ -449,26 +454,18 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
func TestSubscriptionReorg(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", log)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
time.Sleep(50 * time.Millisecond)
handler, syncer, server := setupSubscriptionTest(t, ctx)

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribeNewHeads",
Params: []jsonrpc.Parameter{{Name: "block", Optional: true}},
Handler: handler.SubscribeNewHeads,
}))
ws := jsonrpc.NewWebsocket(server, log)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
Expand All @@ -477,13 +474,10 @@ func TestSubscriptionReorg(t *testing.T) {
id := uint64(1)
handler.WithIDGen(func() uint64 { return id })

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`)
require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg))

subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`
got := sendAndReceiveMessage(t, ctx, conn, subscribeMsg)
want := fmt.Sprintf(subscribeResponse, id)
_, got, err := conn.Read(ctx)
require.NoError(t, err)
require.Equal(t, want, string(got))
require.Equal(t, want, got)

// Simulate a reorg
syncer.reorgs.Send(&sync.ReorgData{
Expand All @@ -504,41 +498,29 @@ func TestSubscriptionReorg(t *testing.T) {
func TestSubscribePendingTxs(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
time.Sleep(50 * time.Millisecond)
handler, syncer, server := setupSubscriptionTest(t, ctx)

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribePendingTransactions",
Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}},
Handler: handler.SubscribePendingTxs,
}))
ws := jsonrpc.NewWebsocket(server, log)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
require.NoError(t, err)

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions"}`)

subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions"}`
id := uint64(1)
handler.WithIDGen(func() uint64 { return id })
require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg))

got := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg)
want := fmt.Sprintf(subscribeResponse, id)
_, got, err := conn1.Read(ctx)
require.NoError(t, err)
require.Equal(t, want, string(got))
require.Equal(t, want, got)

hash1 := new(felt.Felt).SetUint64(1)
addr1 := new(felt.Felt).SetUint64(11)
Expand Down Expand Up @@ -568,41 +550,29 @@ func TestSubscribePendingTxs(t *testing.T) {
func TestSubscribePendingTxsFilter(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
time.Sleep(50 * time.Millisecond)
handler, syncer, server := setupSubscriptionTest(t, ctx)

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribePendingTransactions",
Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}},
Handler: handler.SubscribePendingTxs,
}))
ws := jsonrpc.NewWebsocket(server, log)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
require.NoError(t, err)

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"sender_address":["0xb", "0x16"]}}`)

subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"sender_address":["0xb", "0x16"]}}`
id := uint64(1)
handler.WithIDGen(func() uint64 { return id })
require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg))

got := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg)
want := fmt.Sprintf(subscribeResponse, id)
_, got, err := conn1.Read(ctx)
require.NoError(t, err)
require.Equal(t, want, string(got))
require.Equal(t, want, got)

hash1 := new(felt.Felt).SetUint64(1)
addr1 := new(felt.Felt).SetUint64(11)
Expand Down Expand Up @@ -640,41 +610,29 @@ func TestSubscribePendingTxsFilter(t *testing.T) {
func TestSubscribePendingTxsFullDetails(t *testing.T) {
t.Parallel()

log := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
syncer := newFakeSyncer()
handler := rpc.New(chain, syncer, nil, "", log)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

go func() {
require.NoError(t, handler.Run(ctx))
}()
time.Sleep(50 * time.Millisecond)
handler, syncer, server := setupSubscriptionTest(t, ctx)

server := jsonrpc.NewServer(1, log)
require.NoError(t, server.RegisterMethods(jsonrpc.Method{
Name: "starknet_subscribePendingTransactions",
Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}},
Handler: handler.SubscribePendingTxs,
}))
ws := jsonrpc.NewWebsocket(server, log)

ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger())
httpSrv := httptest.NewServer(ws)

conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil)
require.NoError(t, err)

subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"transaction_details": true}}`)

subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"transaction_details": true}}`
id := uint64(1)
handler.WithIDGen(func() uint64 { return id })
require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg))

got := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg)
want := fmt.Sprintf(subscribeResponse, id)
_, got, err := conn1.Read(ctx)
require.NoError(t, err)
require.Equal(t, want, string(got))
require.Equal(t, want, got)

syncer.pendingTxs.Send([]core.Transaction{
&core.InvokeTransaction{
Expand Down

0 comments on commit 10ca74b

Please sign in to comment.