Skip to content

Commit

Permalink
mempool: Store peer ids as p2p.ID instead of uint16 (remove mempoolID…
Browse files Browse the repository at this point in the history
…s) (cometbft#1191)

* Remove mempoolIDs; use p2p.ID

* Change txSenders map of bools to struct{}

* Revert "Change txSenders map of bools to struct{}"

This reverts commit af834bb.

* Remove redundant methods

* add changelog
  • Loading branch information
hvanz authored Aug 10, 2023
1 parent e3471f8 commit 26ad115
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[mempool]` Remove `mempoolIDs` for internally storing peer ids as `p2p.ID`
instead of `uint16`.
([\#1146](https://github.com/cometbft/cometbft/pull/1146))
71 changes: 0 additions & 71 deletions mempool/ids.go

This file was deleted.

43 changes: 0 additions & 43 deletions mempool/ids_test.go

This file was deleted.

33 changes: 8 additions & 25 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ type Reactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
mempool *CListMempool
ids *mempoolIDs

// `txSenders` maps every received transaction to the set of peer IDs that
// have sent the transaction to this node. Sender IDs are used during
// transaction propagation to avoid sending a transaction to a peer that
// already has it. A sender ID is the internal peer ID used in the mempool
// to identify the sender, storing two bytes with each transaction instead
// of 20 bytes for the types.NodeID.
txSenders map[types.TxKey]map[uint16]bool
// already has it.
txSenders map[types.TxKey]map[p2p.ID]bool
txSendersMtx cmtsync.Mutex
}

Expand All @@ -40,20 +37,13 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
memR := &Reactor{
config: config,
mempool: mempool,
ids: newMempoolIDs(),
txSenders: make(map[types.TxKey]map[uint16]bool),
txSenders: make(map[types.TxKey]map[p2p.ID]bool),
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.mempool.SetTxRemovedCallback(func(txKey types.TxKey) { memR.removeSenders(txKey) })
return memR
}

// InitPeer implements Reactor by creating a state for the peer.
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
memR.ids.ReserveForPeer(peer)
return peer
}

// SetLogger sets the Logger on the reactor and the underlying mempool.
func (memR *Reactor) SetLogger(l log.Logger) {
memR.Logger = l
Expand Down Expand Up @@ -97,12 +87,6 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) {
}
}

// RemovePeer implements Reactor.
func (memR *Reactor) RemovePeer(peer p2p.Peer, _ interface{}) {
memR.ids.Reclaim(peer)
// broadcast routine checks if peer is gone and returns
}

// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
Expand Down Expand Up @@ -131,7 +115,7 @@ func (memR *Reactor) Receive(e p2p.Envelope) {
// removed from mempool but not the cache.
reqRes.SetCallback(func(res *abci.Response) {
if res.GetCheckTx().Code == abci.CodeTypeOK {
memR.addSender(tx.Key(), memR.ids.GetForPeer(e.Src))
memR.addSender(tx.Key(), e.Src.ID())
}
})
}
Expand All @@ -152,7 +136,6 @@ type PeerState interface {

// Send new mempool txs to peer.
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
peerID := memR.ids.GetForPeer(peer)
var next *clist.CElement

for {
Expand Down Expand Up @@ -203,7 +186,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796

if !memR.isSender(memTx.tx.Key(), peerID) {
if !memR.isSender(memTx.tx.Key(), peer.ID()) {
success := peer.Send(p2p.Envelope{
ChannelID: MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
Expand All @@ -226,23 +209,23 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}

func (memR *Reactor) isSender(txKey types.TxKey, peerID uint16) bool {
func (memR *Reactor) isSender(txKey types.TxKey, peerID p2p.ID) bool {
memR.txSendersMtx.Lock()
defer memR.txSendersMtx.Unlock()

sendersSet, ok := memR.txSenders[txKey]
return ok && sendersSet[peerID]
}

func (memR *Reactor) addSender(txKey types.TxKey, senderID uint16) bool {
func (memR *Reactor) addSender(txKey types.TxKey, senderID p2p.ID) bool {
memR.txSendersMtx.Lock()
defer memR.txSendersMtx.Unlock()

if sendersSet, ok := memR.txSenders[txKey]; ok {
sendersSet[senderID] = true
return false
}
memR.txSenders[txKey] = map[uint16]bool{senderID: true}
memR.txSenders[txKey] = map[p2p.ID]bool{senderID: true}
return true
}

Expand Down
29 changes: 14 additions & 15 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,16 @@ func TestReactorNoBroadcastToSender(t *testing.T) {
// create random transactions
txs := NewRandomTxs(numTxs, 20)

const peerID0 = 0
const peerID1 = 1
// the second peer sends all the transactions to the first peer
secondNodeID := reactors[1].Switch.NodeInfo().ID()
for _, tx := range txs {
reactors[0].addSender(tx.Key(), peerID1)
_, err := reactors[peerID0].mempool.CheckTx(tx)
reactors[0].addSender(tx.Key(), secondNodeID)
_, err := reactors[0].mempool.CheckTx(tx)
require.NoError(t, err)
}

// the second peer should not receive any transaction
ensureNoTxs(t, reactors[peerID1], 100*time.Millisecond)
ensureNoTxs(t, reactors[1], 100*time.Millisecond)
}

func TestReactor_MaxTxBytes(t *testing.T) {
Expand Down Expand Up @@ -281,19 +280,19 @@ func TestReactorTxSendersLocal(t *testing.T) {

tx1 := kvstore.NewTxFromID(1)
tx2 := kvstore.NewTxFromID(2)
require.False(t, reactor.isSender(types.Tx(tx1).Key(), 1))
require.False(t, reactor.isSender(types.Tx(tx1).Key(), "peer1"))

reactor.addSender(types.Tx(tx1).Key(), 1)
reactor.addSender(types.Tx(tx1).Key(), 2)
reactor.addSender(types.Tx(tx2).Key(), 1)
require.True(t, reactor.isSender(types.Tx(tx1).Key(), 1))
require.True(t, reactor.isSender(types.Tx(tx1).Key(), 2))
require.True(t, reactor.isSender(types.Tx(tx2).Key(), 1))
reactor.addSender(types.Tx(tx1).Key(), "peer1")
reactor.addSender(types.Tx(tx1).Key(), "peer2")
reactor.addSender(types.Tx(tx2).Key(), "peer1")
require.True(t, reactor.isSender(types.Tx(tx1).Key(), "peer1"))
require.True(t, reactor.isSender(types.Tx(tx1).Key(), "peer2"))
require.True(t, reactor.isSender(types.Tx(tx2).Key(), "peer1"))

reactor.removeSenders(types.Tx(tx1).Key())
require.False(t, reactor.isSender(types.Tx(tx1).Key(), 1))
require.False(t, reactor.isSender(types.Tx(tx1).Key(), 2))
require.True(t, reactor.isSender(types.Tx(tx2).Key(), 1))
require.False(t, reactor.isSender(types.Tx(tx1).Key(), "peer1"))
require.False(t, reactor.isSender(types.Tx(tx1).Key(), "peer2"))
require.True(t, reactor.isSender(types.Tx(tx2).Key(), "peer1"))
}

// Test that:
Expand Down

0 comments on commit 26ad115

Please sign in to comment.