Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add high priority broadcast routine #1071

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,12 @@ func (txmp *TxMempool) notifyTxsAvailable() {
}
}
}

// HasTx returns true if the mempool contains the given transaction.
func (txmp *TxMempool) HasTx(tx types.Tx) bool {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

_, ok := txmp.txByKey[tx.Key()]
return ok
}
114 changes: 114 additions & 0 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"github.com/cometbft/cometbft/types"
)

const (
MempoolPriorityChannel = byte(0x80)

mempoolPriorityInterval = 10 * time.Second
mempoolPriorityBroadcastMaxBytes = 2 * 1024 * 1024 // 2MB
)

// Reactor handles mempool tx broadcasting amongst peers.
// It maintains a map from peer ID to counter, to prevent gossiping txs to the
// peers you received it from.
Expand All @@ -25,6 +32,9 @@ type Reactor struct {
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs

sortedTxs []*WrappedTx // sorted by priority
mempoolPriorityIntervalChan chan struct{}
}

type mempoolIDs struct {
Expand Down Expand Up @@ -116,6 +126,8 @@ func (memR *Reactor) SetLogger(l log.Logger) {
func (memR *Reactor) OnStart() error {
if !memR.config.Broadcast {
memR.Logger.Info("Tx broadcasting is disabled")
} else {
go memR.priorityIntervalRoutine()
}
return nil
}
Expand All @@ -137,6 +149,12 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
RecvMessageCapacity: batchMsg.Size(),
MessageType: &protomem.Message{},
},
{
ID: MempoolPriorityChannel,
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MessageType: &protomem.Message{},
},
}
}

Expand All @@ -145,6 +163,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
go memR.broadcastTxRoutine(peer)
go memR.broadcastPriorityTxRoutine(peer)
}
}

Expand Down Expand Up @@ -211,6 +230,101 @@ type PeerState interface {
GetHeight() int64
}

// Sort txes by priority at a regular interval and signal the broadcast routine.
func (memR *Reactor) priorityIntervalRoutine() {
memR.mempoolPriorityIntervalChan = make(chan struct{}, 1)
lastRoutine := time.Now()
for {
// Sleep until the next interval.
select {
case <-memR.Quit():
return
case <-time.After(mempoolPriorityInterval - time.Since(lastRoutine)):
lastRoutine = time.Now()
}

// Sort txes by priority.
sortedTxs := memR.mempool.allEntriesSorted()

// Reap enough txes to fill mempoolPriorityBroadcastMaxBytes.
var totalSize int64
for i, tx := range sortedTxs {
totalSize += tx.Size()
if totalSize > mempoolPriorityBroadcastMaxBytes {
sortedTxs = sortedTxs[:i]
break
}
}

memR.sortedTxs = sortedTxs

// Signal the priority broadcast routine.
close(memR.mempoolPriorityIntervalChan)
memR.mempoolPriorityIntervalChan = make(chan struct{}, 1)
}
}

// Send new high priority mempool txs to peer.
func (memR *Reactor) broadcastPriorityTxRoutine(peer p2p.Peer) {
peerID := memR.ids.GetForPeer(peer)

for {
select {
case <-memR.mempoolPriorityIntervalChan:
// We have new high priority txs to broadcast.
case <-peer.Quit():
return

case <-memR.Quit():
return
}

// In case of both memR.mempoolPriorityIntervalChan and peer.Quit() are variable at the same time
if !memR.IsRunning() || !peer.IsRunning() {
return
}

// Make sure the peer is up to date.
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok {
// Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}

// Loop through all the high priority txs.
for _, memTx := range memR.sortedTxs {
// Check that tx is still in mempool.
if !memR.mempool.HasTx(memTx.tx) {
continue
}

// Allow for a lag of 1 block.
if peerState.GetHeight() < memTx.height-1 {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}

// NOTE: Transaction batching was disabled due to
// https://github.com/cometbft/cometbft/issues/5796
if !memTx.HasPeer(peerID) {
success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: mempool.MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
}, memR.Logger)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}
}
}
}
}

// Send new mempool txs to peer.
func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
peerID := memR.ids.GetForPeer(peer)
Expand Down
Loading