Skip to content

Commit

Permalink
Really really disable mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
marcopeereboom committed Dec 20, 2024
1 parent c5671e0 commit 17f2333
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 224 deletions.
20 changes: 10 additions & 10 deletions service/tbc/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,11 +616,11 @@ func (s *Server) unindexUtxosInBlocks(ctx context.Context, endHash *chainhash.Ha
}

// Add tx's back to the mempool.
if s.cfg.MempoolEnabled {
// XXX this may not be the right spot.
txHashes, _ := b.MsgBlock().TxHashes()
_ = s.mempool.txsRemove(ctx, txHashes)
}
//if s.cfg.MempoolEnabled {
// // XXX this may not be the right spot.
// txHashes, _ := b.MsgBlock().TxHashes()
// _ = s.mempool.txsRemove(ctx, txHashes)
//}

blocksProcessed++

Expand Down Expand Up @@ -1001,11 +1001,11 @@ func (s *Server) unindexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash
// This is probably not needed here since we alreayd dealt with
// it via the utxo unindexer but since it will be mostly a
// no-op just go ahead.
if s.cfg.MempoolEnabled {
// XXX this may not be the right spot.
txHashes, _ := b.MsgBlock().TxHashes()
_ = s.mempool.txsRemove(ctx, txHashes)
}
//if s.cfg.MempoolEnabled {
// // XXX this may not be the right spot.
// txHashes, _ := b.MsgBlock().TxHashes()
// _ = s.mempool.txsRemove(ctx, txHashes)
//}

blocksProcessed++

Expand Down
266 changes: 135 additions & 131 deletions service/tbc/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,134 +4,138 @@

package tbc

import (
"context"
"errors"
"fmt"
"sync"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
)

type mempool struct {
mtx sync.RWMutex

txs map[chainhash.Hash][]byte // when nil, tx has not been downloaded
size int // total memory used by mempool
}

func (m *mempool) getDataConstruct(ctx context.Context) (*wire.MsgGetData, error) {
log.Tracef("getDataConstruct")
defer log.Tracef("getDataConstruct exit")

getData := wire.NewMsgGetData()

m.mtx.RLock()
defer m.mtx.RUnlock()

for k, v := range m.txs {
if v != nil {
continue
}
if err := getData.AddInvVect(&wire.InvVect{
Type: wire.InvTypeTx,
Hash: k,
}); err != nil {
// only happens when asking max inventory
return nil, fmt.Errorf("construct get data: %w", err)
}
}
return getData, nil
}

func (m *mempool) txsInsert(ctx context.Context, msg *wire.MsgTx, raw []byte) error {
log.Tracef("txsInsert")
defer log.Tracef("txsInsert exit")

m.mtx.Lock()
defer m.mtx.Unlock()

if tx := m.txs[msg.TxHash()]; tx == nil {
m.txs[msg.TxHash()] = raw
m.size += len(raw)
}

return nil
}

func (m *mempool) invTxsInsert(ctx context.Context, inv *wire.MsgInv) error {
log.Tracef("invTxsInsert")
defer log.Tracef("invTxsInsert exit")

if len(inv.InvList) == 0 {
return errors.New("empty inventory")
}

m.mtx.Lock()
defer m.mtx.Unlock()

l := len(m.txs)
for _, v := range inv.InvList {
switch v.Type {
case wire.InvTypeTx:
if _, ok := m.txs[v.Hash]; !ok {
m.txs[v.Hash] = nil
}
}
}

// if the map length does not change, nothing was inserted.
if len(m.txs) != l {
return errors.New("insert inventory tx: already exists")
}
return nil
}

func (m *mempool) txsRemove(ctx context.Context, txs []chainhash.Hash) error {
log.Tracef("txsRemove")
defer log.Tracef("txsRemove exit")

if len(txs) == 0 {
return errors.New("no transactions provided")
}

m.mtx.Lock()
defer m.mtx.Unlock()

l := len(m.txs)
for k := range txs {
if tx, ok := m.txs[txs[k]]; ok {
m.size -= len(tx)
delete(m.txs, txs[k])
}
}

// if the map length does not change, nothing was deleted.
if len(m.txs) != l {
return errors.New("remove txs: nothing removed")
}
return nil
}

func (m *mempool) stats(ctx context.Context) (int, int) {
m.mtx.RLock()
defer m.mtx.RUnlock()

// Approximate size of mempool; map and cap overhead is missing.
return len(m.txs), m.size + (len(m.txs) * chainhash.HashSize)
}

func (m *mempool) Dump(ctx context.Context) string {
m.mtx.RLock()
defer m.mtx.RUnlock()

return spew.Sdump(m.txs)
}

func mempoolNew() (*mempool, error) {
return &mempool{
txs: make(map[chainhash.Hash][]byte, wire.MaxInvPerMsg),
}, nil
}
//type mempool struct {
// mtx sync.RWMutex
//
// txs map[chainhash.Hash][]byte // when nil, tx has not been downloaded
// size int // total memory used by mempool
//}
//
//func (m *mempool) getDataConstruct(ctx context.Context) (*wire.MsgGetData, error) {
// log.Tracef("getDataConstruct")
// defer log.Tracef("getDataConstruct exit")
//
// getData := wire.NewMsgGetData()
//
// m.mtx.RLock()
// defer m.mtx.RUnlock()
//
// for k, v := range m.txs {
// if v != nil {
// continue
// }
// if err := getData.AddInvVect(&wire.InvVect{
// Type: wire.InvTypeTx,
// Hash: k,
// }); err != nil {
// // only happens when asking max inventory
// return nil, fmt.Errorf("construct get data: %w", err)
// }
// }
// return getData, nil
//}
//
//func (m *mempool) txsInsert(ctx context.Context, msg *wire.MsgTx, raw []byte) error {
// log.Tracef("txsInsert")
// defer log.Tracef("txsInsert exit")
//
// if true {
// return fmt.Errorf("txsInsert: disabled")
// }
//
// m.mtx.Lock()
// defer m.mtx.Unlock()
//
// if tx := m.txs[msg.TxHash()]; tx == nil {
// m.txs[msg.TxHash()] = raw
// m.size += len(raw)
// }
//
// return nil
//}
//
//func (m *mempool) invTxsInsert(ctx context.Context, inv *wire.MsgInv) error {
// log.Tracef("invTxsInsert")
// defer log.Tracef("invTxsInsert exit")
//
// if true {
// return fmt.Errorf("invTxsInsert: disabled")
// }
//
// if len(inv.InvList) == 0 {
// return errors.New("empty inventory")
// }
//
// m.mtx.Lock()
// defer m.mtx.Unlock()
//
// l := len(m.txs)
// for _, v := range inv.InvList {
// switch v.Type {
// case wire.InvTypeTx:
// if _, ok := m.txs[v.Hash]; !ok {
// m.txs[v.Hash] = nil
// }
// }
// }
//
// // if the map length does not change, nothing was inserted.
// if len(m.txs) != l {
// return errors.New("insert inventory tx: already exists")
// }
// return nil
//}
//
//func (m *mempool) txsRemove(ctx context.Context, txs []chainhash.Hash) error {
// log.Tracef("txsRemove")
// defer log.Tracef("txsRemove exit")
//
// if true {
// return fmt.Errorf("txsRemove: disabled")
// }
//
// if len(txs) == 0 {
// return errors.New("no transactions provided")
// }
//
// m.mtx.Lock()
// defer m.mtx.Unlock()
//
// l := len(m.txs)
// for k := range txs {
// if tx, ok := m.txs[txs[k]]; ok {
// m.size -= len(tx)
// delete(m.txs, txs[k])
// }
// }
//
// // if the map length does not change, nothing was deleted.
// if len(m.txs) != l {
// return errors.New("remove txs: nothing removed")
// }
// return nil
//}
//
//func (m *mempool) stats(ctx context.Context) (int, int) {
// m.mtx.RLock()
// defer m.mtx.RUnlock()
//
// // Approximate size of mempool; map and cap overhead is missing.
// return len(m.txs), m.size + (len(m.txs) * chainhash.HashSize)
//}
//
//func (m *mempool) Dump(ctx context.Context) string {
// m.mtx.RLock()
// defer m.mtx.RUnlock()
//
// return spew.Sdump(m.txs)
//}
//
//func mempoolNew() (*mempool, error) {
// if true {
// return nil, fmt.Errorf("mempoolNew: disabled")
// }
// return &mempool{
// txs: make(map[chainhash.Hash][]byte, wire.MaxInvPerMsg),
// }, nil
//}
Loading

0 comments on commit 17f2333

Please sign in to comment.