From 8bbc9d6c3fe0b2db25fed346f0e3ea6035e9fcf3 Mon Sep 17 00:00:00 2001 From: Chris Ziogas Date: Thu, 25 Apr 2024 17:00:33 +0300 Subject: [PATCH] cmd/devp2p/internal/ethtest,eth/downloader,eth,eth/protocols/eth,eth: Revert "cmd/devp2p, eth: drop support for eth/67 (#28956)" This reverts commit 8a76a814a2b9e5b4c1a4c6de44cd702536104507. --- cmd/devp2p/internal/ethtest/conn.go | 2 +- cmd/devp2p/internal/ethtest/suite.go | 10 ++-- cmd/devp2p/internal/ethtest/transaction.go | 4 +- eth/downloader/downloader_test.go | 68 +++++++++++++++++++++- eth/downloader/skeleton_test.go | 4 +- eth/handler_eth.go | 5 +- eth/handler_eth_test.go | 17 ++++-- eth/protocols/eth/broadcast.go | 13 ++++- eth/protocols/eth/handler.go | 27 +++++++-- eth/protocols/eth/handler_test.go | 3 + eth/protocols/eth/handlers.go | 21 ++++++- eth/protocols/eth/handshake_test.go | 1 + eth/protocols/eth/peer.go | 18 +++++- eth/protocols/eth/protocol.go | 17 ++++-- eth/sync_test.go | 1 + 15 files changed, 176 insertions(+), 35 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index 4a74857c97..0081a9ebf7 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -166,7 +166,7 @@ func (c *Conn) ReadEth() (any, error) { case eth.TransactionsMsg: msg = new(eth.TransactionsPacket) case eth.NewPooledTransactionHashesMsg: - msg = new(eth.NewPooledTransactionHashesPacket) + msg = new(eth.NewPooledTransactionHashesPacket68) case eth.GetPooledTransactionsMsg: msg = new(eth.GetPooledTransactionsPacket) case eth.PooledTransactionsMsg: diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index f7bd16a5c7..b05fb1c2b9 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -718,7 +718,7 @@ the transactions using a GetPooledTransactions request.`) } // Send announcement. - ann := eth.NewPooledTransactionHashesPacket{Types: txTypes, Sizes: sizes, Hashes: hashes} + ann := eth.NewPooledTransactionHashesPacket68{Types: txTypes, Sizes: sizes, Hashes: hashes} err = conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann) if err != nil { t.Fatalf("failed to write to connection: %v", err) @@ -736,7 +736,7 @@ the transactions using a GetPooledTransactions request.`) t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsRequest)) } return - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket68: continue case *eth.TransactionsPacket: continue @@ -806,12 +806,12 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { t2 = s.makeBlobTxs(2, 3, 0x2) ) for _, test := range []struct { - ann eth.NewPooledTransactionHashesPacket + ann eth.NewPooledTransactionHashesPacket68 resp eth.PooledTransactionsResponse }{ // Invalid tx size. { - ann: eth.NewPooledTransactionHashesPacket{ + ann: eth.NewPooledTransactionHashesPacket68{ Types: []byte{types.BlobTxType, types.BlobTxType}, Sizes: []uint32{uint32(t1[0].Size()), uint32(t1[1].Size() + 10)}, Hashes: []common.Hash{t1[0].Hash(), t1[1].Hash()}, @@ -820,7 +820,7 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { }, // Wrong tx type. { - ann: eth.NewPooledTransactionHashesPacket{ + ann: eth.NewPooledTransactionHashesPacket68{ Types: []byte{types.DynamicFeeTxType, types.BlobTxType}, Sizes: []uint32{uint32(t2[0].Size()), uint32(t2[1].Size())}, Hashes: []common.Hash{t2[0].Hash(), t2[1].Hash()}, diff --git a/cmd/devp2p/internal/ethtest/transaction.go b/cmd/devp2p/internal/ethtest/transaction.go index 80b5d80745..e00913a7ac 100644 --- a/cmd/devp2p/internal/ethtest/transaction.go +++ b/cmd/devp2p/internal/ethtest/transaction.go @@ -71,7 +71,7 @@ func (s *Suite) sendTxs(t *utesting.T, txs []*types.Transaction) error { for _, tx := range *msg { got[tx.Hash()] = true } - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket68: for _, hash := range msg.Hashes { got[hash] = true } @@ -156,7 +156,7 @@ func (s *Suite) sendInvalidTxs(t *utesting.T, txs []*types.Transaction) error { return fmt.Errorf("received bad tx: %s", tx.Hash()) } } - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket68: for _, hash := range msg.Hashes { if _, ok := invalids[hash]; ok { return fmt.Errorf("received bad tx: %s", hash) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 96b69c547e..5bfb039360 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -454,6 +454,9 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) { func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) } func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) } func TestCanonicalSynchronisation68Light(t *testing.T) { testCanonSync(t, eth.ETH68, LightSync) } +func TestCanonicalSynchronisation67Full(t *testing.T) { testCanonSync(t, eth.ETH67, FullSync) } +func TestCanonicalSynchronisation67Snap(t *testing.T) { testCanonSync(t, eth.ETH67, SnapSync) } +func TestCanonicalSynchronisation67Light(t *testing.T) { testCanonSync(t, eth.ETH67, LightSync) } func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -474,6 +477,8 @@ func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { // until the cached blocks are retrieved. func TestThrottling68Full(t *testing.T) { testThrottling(t, eth.ETH68, FullSync) } func TestThrottling68Snap(t *testing.T) { testThrottling(t, eth.ETH68, SnapSync) } +func TestThrottling67Full(t *testing.T) { testThrottling(t, eth.ETH67, FullSync) } +func TestThrottling67Snap(t *testing.T) { testThrottling(t, eth.ETH67, SnapSync) } func testThrottling(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -555,6 +560,9 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { func TestForkedSync68Full(t *testing.T) { testForkedSync(t, eth.ETH68, FullSync) } func TestForkedSync68Snap(t *testing.T) { testForkedSync(t, eth.ETH68, SnapSync) } func TestForkedSync68Light(t *testing.T) { testForkedSync(t, eth.ETH68, LightSync) } +func TestForkedSync67Full(t *testing.T) { testForkedSync(t, eth.ETH67, FullSync) } +func TestForkedSync67Snap(t *testing.T) { testForkedSync(t, eth.ETH67, SnapSync) } +func TestForkedSync67Light(t *testing.T) { testForkedSync(t, eth.ETH67, LightSync) } func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -582,6 +590,9 @@ func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { func TestHeavyForkedSync68Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, FullSync) } func TestHeavyForkedSync68Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, SnapSync) } func TestHeavyForkedSync68Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH68, LightSync) } +func TestHeavyForkedSync67Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, FullSync) } +func TestHeavyForkedSync67Snap(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, SnapSync) } +func TestHeavyForkedSync67Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH67, LightSync) } func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -611,6 +622,9 @@ func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { func TestBoundedForkedSync68Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, FullSync) } func TestBoundedForkedSync68Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, SnapSync) } func TestBoundedForkedSync68Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH68, LightSync) } +func TestBoundedForkedSync67Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, FullSync) } +func TestBoundedForkedSync67Snap(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, SnapSync) } +func TestBoundedForkedSync67Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH67, LightSync) } func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -645,6 +659,15 @@ func TestBoundedHeavyForkedSync68Snap(t *testing.T) { func TestBoundedHeavyForkedSync68Light(t *testing.T) { testBoundedHeavyForkedSync(t, eth.ETH68, LightSync) } +func TestBoundedHeavyForkedSync67Full(t *testing.T) { + testBoundedHeavyForkedSync(t, eth.ETH67, FullSync) +} +func TestBoundedHeavyForkedSync67Snap(t *testing.T) { + testBoundedHeavyForkedSync(t, eth.ETH67, SnapSync) +} +func TestBoundedHeavyForkedSync67Light(t *testing.T) { + testBoundedHeavyForkedSync(t, eth.ETH67, LightSync) +} func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -672,6 +695,9 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) } func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) } func TestCancel68Light(t *testing.T) { testCancel(t, eth.ETH68, LightSync) } +func TestCancel67Full(t *testing.T) { testCancel(t, eth.ETH67, FullSync) } +func TestCancel67Snap(t *testing.T) { testCancel(t, eth.ETH67, SnapSync) } +func TestCancel67Light(t *testing.T) { testCancel(t, eth.ETH67, LightSync) } func testCancel(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -699,6 +725,9 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { func TestMultiSynchronisation68Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, FullSync) } func TestMultiSynchronisation68Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, SnapSync) } func TestMultiSynchronisation68Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH68, LightSync) } +func TestMultiSynchronisation67Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, FullSync) } +func TestMultiSynchronisation67Snap(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, SnapSync) } +func TestMultiSynchronisation67Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH67, LightSync) } func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -723,6 +752,9 @@ func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) } func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) } func TestMultiProtoSynchronisation68Light(t *testing.T) { testMultiProtoSync(t, eth.ETH68, LightSync) } +func TestMultiProtoSynchronisation67Full(t *testing.T) { testMultiProtoSync(t, eth.ETH67, FullSync) } +func TestMultiProtoSynchronisation67Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH67, SnapSync) } +func TestMultiProtoSynchronisation67Light(t *testing.T) { testMultiProtoSync(t, eth.ETH67, LightSync) } func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -733,6 +765,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // Create peers of every type tester.newPeer("peer 68", eth.ETH68, chain.blocks[1:]) + tester.newPeer("peer 67", eth.ETH67, chain.blocks[1:]) // Synchronise with the requested peer and make sure all blocks were retrieved if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { @@ -741,7 +774,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { assertOwnChain(t, tester, len(chain.blocks)) // Check that no peers have been dropped off - for _, version := range []int{68} { + for _, version := range []int{68, 67} { peer := fmt.Sprintf("peer %d", version) if _, ok := tester.peers[peer]; !ok { t.Errorf("%s dropped", peer) @@ -754,6 +787,9 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) } func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) } func TestEmptyShortCircuit68Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, LightSync) } +func TestEmptyShortCircuit67Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, FullSync) } +func TestEmptyShortCircuit67Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, SnapSync) } +func TestEmptyShortCircuit67Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH67, LightSync) } func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -802,6 +838,9 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { func TestMissingHeaderAttack68Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, FullSync) } func TestMissingHeaderAttack68Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, SnapSync) } func TestMissingHeaderAttack68Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH68, LightSync) } +func TestMissingHeaderAttack67Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, FullSync) } +func TestMissingHeaderAttack67Snap(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, SnapSync) } +func TestMissingHeaderAttack67Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH67, LightSync) } func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -828,6 +867,9 @@ func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { func TestShiftedHeaderAttack68Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, FullSync) } func TestShiftedHeaderAttack68Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, SnapSync) } func TestShiftedHeaderAttack68Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH68, LightSync) } +func TestShiftedHeaderAttack67Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, FullSync) } +func TestShiftedHeaderAttack67Snap(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, SnapSync) } +func TestShiftedHeaderAttack67Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH67, LightSync) } func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -940,6 +982,15 @@ func TestHighTDStarvationAttack68Snap(t *testing.T) { func TestHighTDStarvationAttack68Light(t *testing.T) { testHighTDStarvationAttack(t, eth.ETH68, LightSync) } +func TestHighTDStarvationAttack67Full(t *testing.T) { + testHighTDStarvationAttack(t, eth.ETH67, FullSync) +} +func TestHighTDStarvationAttack67Snap(t *testing.T) { + testHighTDStarvationAttack(t, eth.ETH67, SnapSync) +} +func TestHighTDStarvationAttack67Light(t *testing.T) { + testHighTDStarvationAttack(t, eth.ETH67, LightSync) +} func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -956,6 +1007,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { // Tests that misbehaving peers are disconnected, whilst behaving ones are not. func TestBlockHeaderAttackerDropping68(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH68) } +func TestBlockHeaderAttackerDropping67(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH67) } func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Define the disconnection requirement for individual hash fetch errors @@ -1006,6 +1058,9 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) } func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) } func TestSyncProgress68Light(t *testing.T) { testSyncProgress(t, eth.ETH68, LightSync) } +func TestSyncProgress67Full(t *testing.T) { testSyncProgress(t, eth.ETH67, FullSync) } +func TestSyncProgress67Snap(t *testing.T) { testSyncProgress(t, eth.ETH67, SnapSync) } +func TestSyncProgress67Light(t *testing.T) { testSyncProgress(t, eth.ETH67, LightSync) } func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1083,6 +1138,9 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync func TestForkedSyncProgress68Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, FullSync) } func TestForkedSyncProgress68Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, SnapSync) } func TestForkedSyncProgress68Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH68, LightSync) } +func TestForkedSyncProgress67Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, FullSync) } +func TestForkedSyncProgress67Snap(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, SnapSync) } +func TestForkedSyncProgress67Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH67, LightSync) } func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1154,6 +1212,9 @@ func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { func TestFailedSyncProgress68Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, FullSync) } func TestFailedSyncProgress68Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, SnapSync) } func TestFailedSyncProgress68Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH68, LightSync) } +func TestFailedSyncProgress67Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, FullSync) } +func TestFailedSyncProgress67Snap(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, SnapSync) } +func TestFailedSyncProgress67Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH67, LightSync) } func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1220,6 +1281,9 @@ func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { func TestFakedSyncProgress68Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, FullSync) } func TestFakedSyncProgress68Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, SnapSync) } func TestFakedSyncProgress68Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH68, LightSync) } +func TestFakedSyncProgress67Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, FullSync) } +func TestFakedSyncProgress67Snap(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, SnapSync) } +func TestFakedSyncProgress67Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH67, LightSync) } func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) @@ -1402,6 +1466,8 @@ func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) { // being fast-synced from, avoiding potential cheap eclipse attacks. func TestBeaconSync68Full(t *testing.T) { testBeaconSync(t, eth.ETH68, FullSync) } func TestBeaconSync68Snap(t *testing.T) { testBeaconSync(t, eth.ETH68, SnapSync) } +func TestBeaconSync67Full(t *testing.T) { testBeaconSync(t, eth.ETH67, FullSync) } +func TestBeaconSync67Snap(t *testing.T) { testBeaconSync(t, eth.ETH67, SnapSync) } func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { // log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 92eace21f1..c60c3c5800 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -815,7 +815,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { // Create a peer set to feed headers through peerset := newPeerSet() for _, peer := range tt.peers { - peerset.Register(newPeerConnection(peer.id, eth.ETH68, peer, log.New("id", peer.id))) + peerset.Register(newPeerConnection(peer.id, eth.ETH67, peer, log.New("id", peer.id))) } // Create a peer dropper to track malicious peers dropped := make(map[string]int) @@ -917,7 +917,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { skeleton.Sync(tt.newHead, nil, true) } if tt.newPeer != nil { - if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH68, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil { + if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH67, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil { t.Errorf("test %d: failed to register new peer: %v", i, err) } } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index a5244951a3..fa27fddf97 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -67,7 +67,10 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.NewBlockPacket: return h.handleBlockBroadcast(peer, packet.Block, packet.TD) - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket67: + return h.txFetcher.Notify(peer.ID(), nil, nil, *packet) + + case *eth.NewPooledTransactionHashesPacket68: return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes) case *eth.TransactionsPacket: diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index d5f16dff8b..136f8b294a 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -62,7 +62,11 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { h.blockBroadcasts.Send(packet.Block) return nil - case *eth.NewPooledTransactionHashesPacket: + case *eth.NewPooledTransactionHashesPacket67: + h.txAnnounces.Send(([]common.Hash)(*packet)) + return nil + + case *eth.NewPooledTransactionHashesPacket68: h.txAnnounces.Send(packet.Hashes) return nil @@ -81,6 +85,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // Tests that peers are correctly accepted (or rejected) based on the advertised // fork IDs in the protocol handshake. +func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) } func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) } func testForkIDSplit(t *testing.T, protocol uint) { @@ -235,6 +240,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { } // Tests that received transactions are added to the local pool. +func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) } func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) } func testRecvTransactions(t *testing.T, protocol uint) { @@ -292,6 +298,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { } // This test checks that pending transactions are sent. +func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) } func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) } func testSendTransactions(t *testing.T, protocol uint) { @@ -350,7 +357,7 @@ func testSendTransactions(t *testing.T, protocol uint) { seen := make(map[common.Hash]struct{}) for len(seen) < len(insert) { switch protocol { - case 68: + case 67, 68: select { case hashes := <-anns: for _, hash := range hashes { @@ -376,6 +383,7 @@ func testSendTransactions(t *testing.T, protocol uint) { // Tests that transactions get propagated to all attached peers, either via direct // broadcasts or via announcements/retrievals. +func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) } func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) } func testTransactionPropagation(t *testing.T, protocol uint) { @@ -625,8 +633,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { defer sourcePipe.Close() defer sinkPipe.Close() - sourcePeer := eth.NewPeer(eth.ETH68, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) - sinkPeer := eth.NewPeer(eth.ETH68, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) + sourcePeer := eth.NewPeer(eth.ETH67, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) + sinkPeer := eth.NewPeer(eth.ETH67, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) defer sourcePeer.Close() defer sinkPeer.Close() @@ -678,6 +686,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Tests that a propagated malformed block (uncles or transactions don't match // with the hashes in the header) gets discarded and not broadcast forward. +func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) } func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) } func testBroadcastMalformedBlock(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index ad5395cb8d..3045303f22 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -163,9 +163,16 @@ func (p *Peer) announceTransactions() { if len(pending) > 0 { done = make(chan struct{}) go func() { - if err := p.sendPooledTransactionHashes(pending, pendingTypes, pendingSizes); err != nil { - fail <- err - return + if p.version >= ETH68 { + if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + fail <- err + return + } + } else { + if err := p.sendPooledTransactionHashes66(pending); err != nil { + fail <- err + return + } } close(done) p.Log().Trace("Sent transaction announcements", "count", len(pending)) diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 5a4f810080..326929bb95 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -93,8 +93,6 @@ type TxPool interface { func MakeProtocols(backend Backend, network uint64, protocolVersions []uint, dnsdisc enode.Iterator) []p2p.Protocol { protocols := make([]p2p.Protocol, 0, len(protocolVersions)) for _, version := range protocolVersions { - version := version // Closure - // TODO(meowsbits): FIXME re: Cancun config/time/enabled check // Blob transactions require eth/68 announcements, disable everything else eip4844Enabled := backend.Chain().Config().GetEIP4844TransitionTime() != nil || backend.Chain().Config().GetEIP4844Transition() != nil @@ -102,6 +100,8 @@ func MakeProtocols(backend Backend, network uint64, protocolVersions []uint, dns continue } + version := version // Closure + protocols = append(protocols, p2p.Protocol{ Name: ProtocolName, Version: version, @@ -169,11 +169,26 @@ type Decoder interface { Time() time.Time } +var eth67 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes67, + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetReceiptsMsg: handleGetReceipts, + ReceiptsMsg: handleReceipts, + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, +} + var eth68 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, - NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68, GetBlockHeadersMsg: handleGetBlockHeaders, BlockHeadersMsg: handleBlockHeaders, GetBlockBodiesMsg: handleGetBlockBodies, @@ -197,8 +212,10 @@ func handleMessage(backend Backend, peer *Peer) error { } defer msg.Discard() - var handlers = eth68 - + var handlers = eth67 + if peer.Version() >= ETH68 { + handlers = eth68 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 0a48f67b5d..aeabf0e3ee 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -154,6 +154,7 @@ func (b *testBackend) Handle(*Peer, Packet) error { } // Tests that block headers can be retrieved from a remote chain based on user queries. +func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) } func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) } func testGetBlockHeaders(t *testing.T, protocol uint) { @@ -339,6 +340,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { } // Tests that block contents can be retrieved from a remote chain based on their hashes. +func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) } func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) } func testGetBlockBodies(t *testing.T, protocol uint) { @@ -433,6 +435,7 @@ func testGetBlockBodies(t *testing.T, protocol uint) { } // Tests that the transaction receipts can be retrieved based on hashes. +func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) } func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) } func testGetBlockReceipts(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 0275708a6c..069e92dadf 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -383,13 +383,30 @@ func handleReceipts(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { +func handleNewPooledTransactionHashes67(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them if !backend.AcceptTxs() { return nil } - ann := new(NewPooledTransactionHashesPacket) + ann := new(NewPooledTransactionHashesPacket67) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Schedule all the unknown hashes for retrieval + for _, hash := range *ann { + peer.markTransaction(hash) + } + return backend.Handle(peer, ann) +} + +func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error { + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + ann := new(NewPooledTransactionHashesPacket68) if err := msg.Decode(ann); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index b9fd13d863..d96cfc8165 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -27,6 +27,7 @@ import ( ) // Tests that handshake failures are detected and reported correctly. +func TestHandshake67(t *testing.T) { testHandshake(t, ETH67) } func TestHandshake68(t *testing.T) { testHandshake(t, ETH68) } func testHandshake(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index cf269ee5a4..a1eaa7d24a 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -220,17 +220,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { } } -// sendPooledTransactionHashes sends transaction hashes (tagged with their type +// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes +// them in its transaction hash set for future reference. +// +// This method is a helper used by the async transaction announcer. Don't call it +// directly as the queueing (memory) and transmission (bandwidth) costs should +// not be managed directly. +func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + p.knownTxs.Add(hashes...) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket67(hashes)) +} + +// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type // and size) to the peer and includes them in its transaction hash set for future // reference. // // This method is a helper used by the async transaction announcer. Don't call it // directly as the queueing (memory) and transmission (bandwidth) costs should // not be managed directly. -func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash, types []byte, sizes []uint32) error { +func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error { // Mark all the transactions as known, but ensure we don't overflow our limits p.knownTxs.Add(hashes...) - return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket{Types: types, Sizes: sizes, Hashes: hashes}) + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes}) } // AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index f3b6faff8e..36559ca448 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -40,11 +40,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH68} +var ProtocolVersions = []uint{ETH68, ETH67} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH68: 17} +var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -285,8 +285,11 @@ type ReceiptsRLPPacket struct { ReceiptsRLPResponse } -// NewPooledTransactionHashesPacket represents a transaction announcement packet on eth/68 and newer. -type NewPooledTransactionHashesPacket struct { +// NewPooledTransactionHashesPacket67 represents a transaction announcement packet on eth/67. +type NewPooledTransactionHashesPacket67 []common.Hash + +// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer. +type NewPooledTransactionHashesPacket68 struct { Types []byte Sizes []uint32 Hashes []common.Hash @@ -345,8 +348,10 @@ func (*BlockBodiesResponse) Kind() byte { return BlockBodiesMsg } func (*NewBlockPacket) Name() string { return "NewBlock" } func (*NewBlockPacket) Kind() byte { return NewBlockMsg } -func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" } -func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket67) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket67) Kind() byte { return NewPooledTransactionHashesMsg } +func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" } +func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg } func (*GetPooledTransactionsRequest) Name() string { return "GetPooledTransactions" } func (*GetPooledTransactionsRequest) Kind() byte { return GetPooledTransactionsMsg } diff --git a/eth/sync_test.go b/eth/sync_test.go index b73a2b075f..950d13e2a9 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -91,6 +91,7 @@ func newTestHandlerWithBlocksWithOpts(blocks int, mode downloader.SyncMode, gen } // Tests that snap sync is disabled after a successful sync cycle. +func TestSnapSyncDisabling67(t *testing.T) { testSnapSyncDisabling(t, eth.ETH67, snap.SNAP1) } func TestSnapSyncDisabling68(t *testing.T) { testSnapSyncDisabling(t, eth.ETH68, snap.SNAP1) } // Tests that snap sync gets disabled as soon as a real block is successfully