From 26ad1151b9421c293d4982cf5c18ff0637031f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hern=C3=A1n=20Vanzetto?= <15466498+hvanz@users.noreply.github.com> Date: Thu, 10 Aug 2023 09:13:51 +0300 Subject: [PATCH] mempool: Store peer ids as p2p.ID instead of uint16 (remove mempoolIDs) (#1191) * Remove mempoolIDs; use p2p.ID * Change txSenders map of bools to struct{} * Revert "Change txSenders map of bools to struct{}" This reverts commit af834bbb54eef533c77882dfa1269a7877d28fb7. * Remove redundant methods * add changelog --- .../1146-mempool-remove-ids.md | 3 + mempool/ids.go | 71 ------------------- mempool/ids_test.go | 43 ----------- mempool/reactor.go | 33 +++------ mempool/reactor_test.go | 29 ++++---- 5 files changed, 25 insertions(+), 154 deletions(-) create mode 100644 .changelog/unreleased/breaking-changes/1146-mempool-remove-ids.md delete mode 100644 mempool/ids.go delete mode 100644 mempool/ids_test.go diff --git a/.changelog/unreleased/breaking-changes/1146-mempool-remove-ids.md b/.changelog/unreleased/breaking-changes/1146-mempool-remove-ids.md new file mode 100644 index 00000000000..647536c21d8 --- /dev/null +++ b/.changelog/unreleased/breaking-changes/1146-mempool-remove-ids.md @@ -0,0 +1,3 @@ +- `[mempool]` Remove `mempoolIDs` for internally storing peer ids as `p2p.ID` + instead of `uint16`. + ([\#1146](https://github.com/cometbft/cometbft/pull/1146)) diff --git a/mempool/ids.go b/mempool/ids.go deleted file mode 100644 index aad98b7a7d1..00000000000 --- a/mempool/ids.go +++ /dev/null @@ -1,71 +0,0 @@ -package mempool - -import ( - "fmt" - - cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/p2p" -) - -type mempoolIDs struct { - mtx cmtsync.RWMutex - peerMap map[p2p.ID]uint16 - nextID uint16 // assumes that a node will never have over 65536 active peers - activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter -} - -// Reserve searches for the next unused ID and assigns it to the -// peer. -func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - curID := ids.nextPeerID() - ids.peerMap[peer.ID()] = curID - ids.activeIDs[curID] = struct{}{} -} - -// nextPeerID returns the next unused peer ID to use. -// This assumes that ids's mutex is already locked. -func (ids *mempoolIDs) nextPeerID() uint16 { - if len(ids.activeIDs) == MaxActiveIDs { - panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", MaxActiveIDs)) - } - - _, idExists := ids.activeIDs[ids.nextID] - for idExists { - ids.nextID++ - _, idExists = ids.activeIDs[ids.nextID] - } - curID := ids.nextID - ids.nextID++ - return curID -} - -// Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - removedID, ok := ids.peerMap[peer.ID()] - if ok { - delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer.ID()) - } -} - -// GetForPeer returns an ID reserved for the peer. -func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { - ids.mtx.RLock() - defer ids.mtx.RUnlock() - - return ids.peerMap[peer.ID()] -} - -func newMempoolIDs() *mempoolIDs { - return &mempoolIDs{ - peerMap: make(map[p2p.ID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - } -} diff --git a/mempool/ids_test.go b/mempool/ids_test.go deleted file mode 100644 index a01222cb75f..00000000000 --- a/mempool/ids_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package mempool - -import ( - "net" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/cometbft/cometbft/p2p/mock" -) - -func TestMempoolIDsBasic(t *testing.T) { - ids := newMempoolIDs() - - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - - ids.ReserveForPeer(peer) - assert.EqualValues(t, 1, ids.GetForPeer(peer)) - ids.Reclaim(peer) - - ids.ReserveForPeer(peer) - assert.EqualValues(t, 2, ids.GetForPeer(peer)) - ids.Reclaim(peer) -} - -func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { - if testing.Short() { - return - } - - // 0 is already reserved for UnknownPeerID - ids := newMempoolIDs() - - for i := 0; i < MaxActiveIDs-1; i++ { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - } - - assert.Panics(t, func() { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - }) -} diff --git a/mempool/reactor.go b/mempool/reactor.go index 698085a7cf7..491c9e32caa 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -23,15 +23,12 @@ type Reactor struct { p2p.BaseReactor config *cfg.MempoolConfig mempool *CListMempool - ids *mempoolIDs // `txSenders` maps every received transaction to the set of peer IDs that // have sent the transaction to this node. Sender IDs are used during // transaction propagation to avoid sending a transaction to a peer that - // already has it. A sender ID is the internal peer ID used in the mempool - // to identify the sender, storing two bytes with each transaction instead - // of 20 bytes for the types.NodeID. - txSenders map[types.TxKey]map[uint16]bool + // already has it. + txSenders map[types.TxKey]map[p2p.ID]bool txSendersMtx cmtsync.Mutex } @@ -40,20 +37,13 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { memR := &Reactor{ config: config, mempool: mempool, - ids: newMempoolIDs(), - txSenders: make(map[types.TxKey]map[uint16]bool), + txSenders: make(map[types.TxKey]map[p2p.ID]bool), } memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) memR.mempool.SetTxRemovedCallback(func(txKey types.TxKey) { memR.removeSenders(txKey) }) return memR } -// InitPeer implements Reactor by creating a state for the peer. -func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { - memR.ids.ReserveForPeer(peer) - return peer -} - // SetLogger sets the Logger on the reactor and the underlying mempool. func (memR *Reactor) SetLogger(l log.Logger) { memR.Logger = l @@ -97,12 +87,6 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) { } } -// RemovePeer implements Reactor. -func (memR *Reactor) RemovePeer(peer p2p.Peer, _ interface{}) { - memR.ids.Reclaim(peer) - // broadcast routine checks if peer is gone and returns -} - // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(e p2p.Envelope) { @@ -131,7 +115,7 @@ func (memR *Reactor) Receive(e p2p.Envelope) { // removed from mempool but not the cache. reqRes.SetCallback(func(res *abci.Response) { if res.GetCheckTx().Code == abci.CodeTypeOK { - memR.addSender(tx.Key(), memR.ids.GetForPeer(e.Src)) + memR.addSender(tx.Key(), e.Src.ID()) } }) } @@ -152,7 +136,6 @@ type PeerState interface { // Send new mempool txs to peer. func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { - peerID := memR.ids.GetForPeer(peer) var next *clist.CElement for { @@ -203,7 +186,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - if !memR.isSender(memTx.tx.Key(), peerID) { + if !memR.isSender(memTx.tx.Key(), peer.ID()) { success := peer.Send(p2p.Envelope{ ChannelID: MempoolChannel, Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, @@ -226,7 +209,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } -func (memR *Reactor) isSender(txKey types.TxKey, peerID uint16) bool { +func (memR *Reactor) isSender(txKey types.TxKey, peerID p2p.ID) bool { memR.txSendersMtx.Lock() defer memR.txSendersMtx.Unlock() @@ -234,7 +217,7 @@ func (memR *Reactor) isSender(txKey types.TxKey, peerID uint16) bool { return ok && sendersSet[peerID] } -func (memR *Reactor) addSender(txKey types.TxKey, senderID uint16) bool { +func (memR *Reactor) addSender(txKey types.TxKey, senderID p2p.ID) bool { memR.txSendersMtx.Lock() defer memR.txSendersMtx.Unlock() @@ -242,7 +225,7 @@ func (memR *Reactor) addSender(txKey types.TxKey, senderID uint16) bool { sendersSet[senderID] = true return false } - memR.txSenders[txKey] = map[uint16]bool{senderID: true} + memR.txSenders[txKey] = map[p2p.ID]bool{senderID: true} return true } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 85c1e1a8706..eded3edc574 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -143,17 +143,16 @@ func TestReactorNoBroadcastToSender(t *testing.T) { // create random transactions txs := NewRandomTxs(numTxs, 20) - const peerID0 = 0 - const peerID1 = 1 // the second peer sends all the transactions to the first peer + secondNodeID := reactors[1].Switch.NodeInfo().ID() for _, tx := range txs { - reactors[0].addSender(tx.Key(), peerID1) - _, err := reactors[peerID0].mempool.CheckTx(tx) + reactors[0].addSender(tx.Key(), secondNodeID) + _, err := reactors[0].mempool.CheckTx(tx) require.NoError(t, err) } // the second peer should not receive any transaction - ensureNoTxs(t, reactors[peerID1], 100*time.Millisecond) + ensureNoTxs(t, reactors[1], 100*time.Millisecond) } func TestReactor_MaxTxBytes(t *testing.T) { @@ -281,19 +280,19 @@ func TestReactorTxSendersLocal(t *testing.T) { tx1 := kvstore.NewTxFromID(1) tx2 := kvstore.NewTxFromID(2) - require.False(t, reactor.isSender(types.Tx(tx1).Key(), 1)) + require.False(t, reactor.isSender(types.Tx(tx1).Key(), "peer1")) - reactor.addSender(types.Tx(tx1).Key(), 1) - reactor.addSender(types.Tx(tx1).Key(), 2) - reactor.addSender(types.Tx(tx2).Key(), 1) - require.True(t, reactor.isSender(types.Tx(tx1).Key(), 1)) - require.True(t, reactor.isSender(types.Tx(tx1).Key(), 2)) - require.True(t, reactor.isSender(types.Tx(tx2).Key(), 1)) + reactor.addSender(types.Tx(tx1).Key(), "peer1") + reactor.addSender(types.Tx(tx1).Key(), "peer2") + reactor.addSender(types.Tx(tx2).Key(), "peer1") + require.True(t, reactor.isSender(types.Tx(tx1).Key(), "peer1")) + require.True(t, reactor.isSender(types.Tx(tx1).Key(), "peer2")) + require.True(t, reactor.isSender(types.Tx(tx2).Key(), "peer1")) reactor.removeSenders(types.Tx(tx1).Key()) - require.False(t, reactor.isSender(types.Tx(tx1).Key(), 1)) - require.False(t, reactor.isSender(types.Tx(tx1).Key(), 2)) - require.True(t, reactor.isSender(types.Tx(tx2).Key(), 1)) + require.False(t, reactor.isSender(types.Tx(tx1).Key(), "peer1")) + require.False(t, reactor.isSender(types.Tx(tx1).Key(), "peer2")) + require.True(t, reactor.isSender(types.Tx(tx2).Key(), "peer1")) } // Test that: