From 10ca74b04caad7873d1ed5898e596fe8c9f0d8b0 Mon Sep 17 00:00:00 2001 From: weiihann Date: Tue, 22 Oct 2024 00:50:20 +0800 Subject: [PATCH] tidy up tests code --- rpc/events_test.go | 198 ++++++++++++++++++--------------------------- 1 file changed, 78 insertions(+), 120 deletions(-) diff --git a/rpc/events_test.go b/rpc/events_test.go index 4e63918a4..38ccac268 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -253,31 +253,49 @@ func (fs *fakeSyncer) HighestBlockHeader() *core.Header { return nil } -func TestSubscribeNewHeads(t *testing.T) { - t.Parallel() +func setupSubscriptionTest(t *testing.T, ctx context.Context) (*rpc.Handler, *fakeSyncer, *jsonrpc.Server) { + t.Helper() log := utils.NewNopZapLogger() chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + handler := rpc.New(chain, syncer, nil, "", log) go func() { require.NoError(t, handler.Run(ctx)) }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. time.Sleep(50 * time.Millisecond) server := jsonrpc.NewServer(1, log) + + return handler, syncer, server +} + +func sendAndReceiveMessage(t *testing.T, ctx context.Context, conn *websocket.Conn, message string) string { + t.Helper() + + require.NoError(t, conn.Write(ctx, websocket.MessageText, []byte(message))) + + _, response, err := conn.Read(ctx) + require.NoError(t, err) + return string(response) +} + +func TestSubscribeNewHeads(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + handler, syncer, server := setupSubscriptionTest(t, ctx) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribeNewHeads", Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: handler.SubscribeNewHeads, })) - ws := jsonrpc.NewWebsocket(server, log) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws) conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) @@ -286,13 +304,10 @@ func TestSubscribeNewHeads(t *testing.T) { id := uint64(1) handler.WithIDGen(func() uint64 { return id }) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`) - require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg)) - + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}` + got := sendAndReceiveMessage(t, ctx, conn, subscribeMsg) want := fmt.Sprintf(subscribeResponse, id) - _, got, err := conn.Read(ctx) - require.NoError(t, err) - require.Equal(t, want, string(got)) + require.Equal(t, want, got) // Simulate a new block syncer.newHeads.Send(testHeader(t)) @@ -307,22 +322,11 @@ func TestSubscribeNewHeads(t *testing.T) { func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { t.Parallel() - log := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", log) - ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. - time.Sleep(50 * time.Millisecond) + handler, syncer, server := setupSubscriptionTest(t, ctx) - server := jsonrpc.NewServer(1, log) require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribeNewHeads", Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, @@ -332,44 +336,45 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { Params: []jsonrpc.Parameter{{Name: "id"}}, Handler: handler.Unsubscribe, })) - ws := jsonrpc.NewWebsocket(server, log) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws) + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err) conn2, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`) + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}` firstID := uint64(1) secondID := uint64(2) - handler.WithIDGen(func() uint64 { return firstID }) - require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) + handler.WithIDGen(func() uint64 { return firstID }) firstWant := fmt.Sprintf(subscribeResponse, firstID) - _, firstGot, err := conn1.Read(ctx) + firstGot := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg) require.NoError(t, err) - require.Equal(t, firstWant, string(firstGot)) + require.Equal(t, firstWant, firstGot) handler.WithIDGen(func() uint64 { return secondID }) - require.NoError(t, conn2.Write(ctx, websocket.MessageText, subscribeMsg)) secondWant := fmt.Sprintf(subscribeResponse, secondID) - _, secondGot, err := conn2.Read(ctx) + secondGot := sendAndReceiveMessage(t, ctx, conn2, subscribeMsg) require.NoError(t, err) - require.Equal(t, secondWant, string(secondGot)) + require.Equal(t, secondWant, secondGot) // Simulate a new block syncer.newHeads.Send(testHeader(t)) // Receive a block header. - firstWant = fmt.Sprintf(newHeadsResponse, firstID) - _, firstGot, err = conn1.Read(ctx) + firstHeaderWant := fmt.Sprintf(newHeadsResponse, firstID) + _, firstHeaderGot, err := conn1.Read(ctx) require.NoError(t, err) - require.Equal(t, firstWant, string(firstGot)) - secondWant = fmt.Sprintf(newHeadsResponse, secondID) - _, secondGot, err = conn2.Read(ctx) + require.Equal(t, firstHeaderWant, string(firstHeaderGot)) + + secondHeaderWant := fmt.Sprintf(newHeadsResponse, secondID) + _, secondHeaderGot, err := conn2.Read(ctx) require.NoError(t, err) - require.Equal(t, secondWant, string(secondGot)) + require.Equal(t, secondHeaderWant, string(secondHeaderGot)) // Unsubscribe unsubMsg := `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}` @@ -378,6 +383,8 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { } func TestSubscribeNewHeadsHistorical(t *testing.T) { + t.Parallel() + log := utils.NewNopZapLogger() client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -402,16 +409,16 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { go func() { require.NoError(t, handler.Run(ctx)) }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. time.Sleep(50 * time.Millisecond) server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribeNewHeads", Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: handler.SubscribeNewHeads, })) + ws := jsonrpc.NewWebsocket(server, log) httpSrv := httptest.NewServer(ws) @@ -421,13 +428,11 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { id := uint64(1) handler.WithIDGen(func() uint64 { return id }) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads", "params":{"block":{"block_number":0}}}`) - require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg)) - + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads", "params":{"block":{"block_number":0}}}` + got := sendAndReceiveMessage(t, ctx, conn, subscribeMsg) want := fmt.Sprintf(subscribeResponse, id) - _, got, err := conn.Read(ctx) require.NoError(t, err) - require.Equal(t, want, string(got)) + require.Equal(t, want, got) // Check block 0 content want = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` @@ -449,26 +454,18 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { func TestSubscriptionReorg(t *testing.T) { t.Parallel() - log := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", log) - ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - time.Sleep(50 * time.Millisecond) + handler, syncer, server := setupSubscriptionTest(t, ctx) - server := jsonrpc.NewServer(1, log) require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribeNewHeads", Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: handler.SubscribeNewHeads, })) - ws := jsonrpc.NewWebsocket(server, log) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws) conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) @@ -477,13 +474,10 @@ func TestSubscriptionReorg(t *testing.T) { id := uint64(1) handler.WithIDGen(func() uint64 { return id }) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`) - require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg)) - + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}` + got := sendAndReceiveMessage(t, ctx, conn, subscribeMsg) want := fmt.Sprintf(subscribeResponse, id) - _, got, err := conn.Read(ctx) - require.NoError(t, err) - require.Equal(t, want, string(got)) + require.Equal(t, want, got) // Simulate a reorg syncer.reorgs.Send(&sync.ReorgData{ @@ -504,41 +498,29 @@ func TestSubscriptionReorg(t *testing.T) { func TestSubscribePendingTxs(t *testing.T) { t.Parallel() - log := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) - ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - time.Sleep(50 * time.Millisecond) + handler, syncer, server := setupSubscriptionTest(t, ctx) - server := jsonrpc.NewServer(1, log) require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribePendingTransactions", Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, Handler: handler.SubscribePendingTxs, })) - ws := jsonrpc.NewWebsocket(server, log) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws) conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions"}`) - + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions"}` id := uint64(1) handler.WithIDGen(func() uint64 { return id }) - require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) - + got := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg) want := fmt.Sprintf(subscribeResponse, id) - _, got, err := conn1.Read(ctx) - require.NoError(t, err) - require.Equal(t, want, string(got)) + require.Equal(t, want, got) hash1 := new(felt.Felt).SetUint64(1) addr1 := new(felt.Felt).SetUint64(11) @@ -568,41 +550,29 @@ func TestSubscribePendingTxs(t *testing.T) { func TestSubscribePendingTxsFilter(t *testing.T) { t.Parallel() - log := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) - ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - time.Sleep(50 * time.Millisecond) + handler, syncer, server := setupSubscriptionTest(t, ctx) - server := jsonrpc.NewServer(1, log) require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribePendingTransactions", Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, Handler: handler.SubscribePendingTxs, })) - ws := jsonrpc.NewWebsocket(server, log) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws) conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"sender_address":["0xb", "0x16"]}}`) - + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"sender_address":["0xb", "0x16"]}}` id := uint64(1) handler.WithIDGen(func() uint64 { return id }) - require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) - + got := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg) want := fmt.Sprintf(subscribeResponse, id) - _, got, err := conn1.Read(ctx) - require.NoError(t, err) - require.Equal(t, want, string(got)) + require.Equal(t, want, got) hash1 := new(felt.Felt).SetUint64(1) addr1 := new(felt.Felt).SetUint64(11) @@ -640,41 +610,29 @@ func TestSubscribePendingTxsFilter(t *testing.T) { func TestSubscribePendingTxsFullDetails(t *testing.T) { t.Parallel() - log := utils.NewNopZapLogger() - chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) - syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", log) - ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go func() { - require.NoError(t, handler.Run(ctx)) - }() - time.Sleep(50 * time.Millisecond) + handler, syncer, server := setupSubscriptionTest(t, ctx) - server := jsonrpc.NewServer(1, log) require.NoError(t, server.RegisterMethods(jsonrpc.Method{ Name: "starknet_subscribePendingTransactions", Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, Handler: handler.SubscribePendingTxs, })) - ws := jsonrpc.NewWebsocket(server, log) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) httpSrv := httptest.NewServer(ws) conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err) - subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"transaction_details": true}}`) - + subscribeMsg := `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"transaction_details": true}}` id := uint64(1) handler.WithIDGen(func() uint64 { return id }) - require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) - + got := sendAndReceiveMessage(t, ctx, conn1, subscribeMsg) want := fmt.Sprintf(subscribeResponse, id) - _, got, err := conn1.Read(ctx) - require.NoError(t, err) - require.Equal(t, want, string(got)) + require.Equal(t, want, got) syncer.pendingTxs.Send([]core.Transaction{ &core.InvokeTransaction{