diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 24e3ccfd..919f49fa 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -616,11 +616,11 @@ func (s *Server) unindexUtxosInBlocks(ctx context.Context, endHash *chainhash.Ha } // Add tx's back to the mempool. - if s.cfg.MempoolEnabled { - // XXX this may not be the right spot. - txHashes, _ := b.MsgBlock().TxHashes() - _ = s.mempool.txsRemove(ctx, txHashes) - } + //if s.cfg.MempoolEnabled { + // // XXX this may not be the right spot. + // txHashes, _ := b.MsgBlock().TxHashes() + // _ = s.mempool.txsRemove(ctx, txHashes) + //} blocksProcessed++ @@ -1001,11 +1001,11 @@ func (s *Server) unindexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash // This is probably not needed here since we alreayd dealt with // it via the utxo unindexer but since it will be mostly a // no-op just go ahead. - if s.cfg.MempoolEnabled { - // XXX this may not be the right spot. - txHashes, _ := b.MsgBlock().TxHashes() - _ = s.mempool.txsRemove(ctx, txHashes) - } + //if s.cfg.MempoolEnabled { + // // XXX this may not be the right spot. + // txHashes, _ := b.MsgBlock().TxHashes() + // _ = s.mempool.txsRemove(ctx, txHashes) + //} blocksProcessed++ diff --git a/service/tbc/mempool.go b/service/tbc/mempool.go index 51173865..adc0f8a7 100644 --- a/service/tbc/mempool.go +++ b/service/tbc/mempool.go @@ -4,134 +4,138 @@ package tbc -import ( - "context" - "errors" - "fmt" - "sync" - - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" - "github.com/davecgh/go-spew/spew" -) - -type mempool struct { - mtx sync.RWMutex - - txs map[chainhash.Hash][]byte // when nil, tx has not been downloaded - size int // total memory used by mempool -} - -func (m *mempool) getDataConstruct(ctx context.Context) (*wire.MsgGetData, error) { - log.Tracef("getDataConstruct") - defer log.Tracef("getDataConstruct exit") - - getData := wire.NewMsgGetData() - - m.mtx.RLock() - defer m.mtx.RUnlock() - - for k, v := range m.txs { - if v != nil { - continue - } - if err := getData.AddInvVect(&wire.InvVect{ - Type: wire.InvTypeTx, - Hash: k, - }); err != nil { - // only happens when asking max inventory - return nil, fmt.Errorf("construct get data: %w", err) - } - } - return getData, nil -} - -func (m *mempool) txsInsert(ctx context.Context, msg *wire.MsgTx, raw []byte) error { - log.Tracef("txsInsert") - defer log.Tracef("txsInsert exit") - - m.mtx.Lock() - defer m.mtx.Unlock() - - if tx := m.txs[msg.TxHash()]; tx == nil { - m.txs[msg.TxHash()] = raw - m.size += len(raw) - } - - return nil -} - -func (m *mempool) invTxsInsert(ctx context.Context, inv *wire.MsgInv) error { - log.Tracef("invTxsInsert") - defer log.Tracef("invTxsInsert exit") - - if len(inv.InvList) == 0 { - return errors.New("empty inventory") - } - - m.mtx.Lock() - defer m.mtx.Unlock() - - l := len(m.txs) - for _, v := range inv.InvList { - switch v.Type { - case wire.InvTypeTx: - if _, ok := m.txs[v.Hash]; !ok { - m.txs[v.Hash] = nil - } - } - } - - // if the map length does not change, nothing was inserted. - if len(m.txs) != l { - return errors.New("insert inventory tx: already exists") - } - return nil -} - -func (m *mempool) txsRemove(ctx context.Context, txs []chainhash.Hash) error { - log.Tracef("txsRemove") - defer log.Tracef("txsRemove exit") - - if len(txs) == 0 { - return errors.New("no transactions provided") - } - - m.mtx.Lock() - defer m.mtx.Unlock() - - l := len(m.txs) - for k := range txs { - if tx, ok := m.txs[txs[k]]; ok { - m.size -= len(tx) - delete(m.txs, txs[k]) - } - } - - // if the map length does not change, nothing was deleted. - if len(m.txs) != l { - return errors.New("remove txs: nothing removed") - } - return nil -} - -func (m *mempool) stats(ctx context.Context) (int, int) { - m.mtx.RLock() - defer m.mtx.RUnlock() - - // Approximate size of mempool; map and cap overhead is missing. - return len(m.txs), m.size + (len(m.txs) * chainhash.HashSize) -} - -func (m *mempool) Dump(ctx context.Context) string { - m.mtx.RLock() - defer m.mtx.RUnlock() - - return spew.Sdump(m.txs) -} - -func mempoolNew() (*mempool, error) { - return &mempool{ - txs: make(map[chainhash.Hash][]byte, wire.MaxInvPerMsg), - }, nil -} +//type mempool struct { +// mtx sync.RWMutex +// +// txs map[chainhash.Hash][]byte // when nil, tx has not been downloaded +// size int // total memory used by mempool +//} +// +//func (m *mempool) getDataConstruct(ctx context.Context) (*wire.MsgGetData, error) { +// log.Tracef("getDataConstruct") +// defer log.Tracef("getDataConstruct exit") +// +// getData := wire.NewMsgGetData() +// +// m.mtx.RLock() +// defer m.mtx.RUnlock() +// +// for k, v := range m.txs { +// if v != nil { +// continue +// } +// if err := getData.AddInvVect(&wire.InvVect{ +// Type: wire.InvTypeTx, +// Hash: k, +// }); err != nil { +// // only happens when asking max inventory +// return nil, fmt.Errorf("construct get data: %w", err) +// } +// } +// return getData, nil +//} +// +//func (m *mempool) txsInsert(ctx context.Context, msg *wire.MsgTx, raw []byte) error { +// log.Tracef("txsInsert") +// defer log.Tracef("txsInsert exit") +// +// if true { +// return fmt.Errorf("txsInsert: disabled") +// } +// +// m.mtx.Lock() +// defer m.mtx.Unlock() +// +// if tx := m.txs[msg.TxHash()]; tx == nil { +// m.txs[msg.TxHash()] = raw +// m.size += len(raw) +// } +// +// return nil +//} +// +//func (m *mempool) invTxsInsert(ctx context.Context, inv *wire.MsgInv) error { +// log.Tracef("invTxsInsert") +// defer log.Tracef("invTxsInsert exit") +// +// if true { +// return fmt.Errorf("invTxsInsert: disabled") +// } +// +// if len(inv.InvList) == 0 { +// return errors.New("empty inventory") +// } +// +// m.mtx.Lock() +// defer m.mtx.Unlock() +// +// l := len(m.txs) +// for _, v := range inv.InvList { +// switch v.Type { +// case wire.InvTypeTx: +// if _, ok := m.txs[v.Hash]; !ok { +// m.txs[v.Hash] = nil +// } +// } +// } +// +// // if the map length does not change, nothing was inserted. +// if len(m.txs) != l { +// return errors.New("insert inventory tx: already exists") +// } +// return nil +//} +// +//func (m *mempool) txsRemove(ctx context.Context, txs []chainhash.Hash) error { +// log.Tracef("txsRemove") +// defer log.Tracef("txsRemove exit") +// +// if true { +// return fmt.Errorf("txsRemove: disabled") +// } +// +// if len(txs) == 0 { +// return errors.New("no transactions provided") +// } +// +// m.mtx.Lock() +// defer m.mtx.Unlock() +// +// l := len(m.txs) +// for k := range txs { +// if tx, ok := m.txs[txs[k]]; ok { +// m.size -= len(tx) +// delete(m.txs, txs[k]) +// } +// } +// +// // if the map length does not change, nothing was deleted. +// if len(m.txs) != l { +// return errors.New("remove txs: nothing removed") +// } +// return nil +//} +// +//func (m *mempool) stats(ctx context.Context) (int, int) { +// m.mtx.RLock() +// defer m.mtx.RUnlock() +// +// // Approximate size of mempool; map and cap overhead is missing. +// return len(m.txs), m.size + (len(m.txs) * chainhash.HashSize) +//} +// +//func (m *mempool) Dump(ctx context.Context) string { +// m.mtx.RLock() +// defer m.mtx.RUnlock() +// +// return spew.Sdump(m.txs) +//} +// +//func mempoolNew() (*mempool, error) { +// if true { +// return nil, fmt.Errorf("mempoolNew: disabled") +// } +// return &mempool{ +// txs: make(map[chainhash.Hash][]byte, wire.MaxInvPerMsg), +// }, nil +//} diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index ebec0737..3d915595 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -131,7 +131,8 @@ type Server struct { blocksInserted int // blocks inserted since last print // mempool - mempool *mempool + // nolint:dupword // fuck you linter + // mempool *mempool // broadcast broadcast map[chainhash.Hash]*wire.MsgTx @@ -175,9 +176,9 @@ func NewServer(cfg *Config) (*Server, error) { } if cfg.MempoolEnabled { - cfg.MempoolEnabled = false // XXX log.Infof("mempool forced disabled") } + cfg.MempoolEnabled = false // XXX // Only populate pings and blocks if not in External Header Mode var pings *ttl.TTL @@ -219,16 +220,16 @@ func NewServer(cfg *Config) (*Server, error) { s.pings = pings } - if s.cfg.MempoolEnabled { - if s.cfg.ExternalHeaderMode { - // Cannot combine mempool behavior with External Header Mode - panic("cannot enable mempool on an external-header-only mode TBC instance") - } - s.mempool, err = mempoolNew() - if err != nil { - return nil, err - } - } + //if s.cfg.MempoolEnabled { + // if s.cfg.ExternalHeaderMode { + // // Cannot combine mempool behavior with External Header Mode + // panic("cannot enable mempool on an external-header-only mode TBC instance") + // } + // s.mempool, err = mempoolNew() + // if err != nil { + // return nil, err + // } + //} wanted := defaultPeersWanted switch cfg.Network { @@ -368,25 +369,25 @@ func (s *Server) pingPeer(ctx context.Context, p *rawpeer.RawPeer) { s.pings.Put(ctx, defaultPingTimeout, peer, p, s.pingExpired, nil) } -func (s *Server) mempoolPeer(ctx context.Context, p *rawpeer.RawPeer) { - log.Tracef("mempoolPeer %v", p) - defer log.Tracef("mempoolPeer %v exit", p) - - if !s.cfg.MempoolEnabled { - return - } - - // Don't ask for mempool if the other end does not advertise it. - if !p.HasService(wire.SFNodeBloom) { - return - } - - err := p.Write(defaultCmdTimeout, wire.NewMsgMemPool()) - if err != nil { - log.Debugf("mempool %v: %v", p, err) - return - } -} +//func (s *Server) mempoolPeer(ctx context.Context, p *rawpeer.RawPeer) { +// log.Tracef("mempoolPeer %v", p) +// defer log.Tracef("mempoolPeer %v exit", p) +// +// if !s.cfg.MempoolEnabled { +// return +// } +// +// // Don't ask for mempool if the other end does not advertise it. +// if !p.HasService(wire.SFNodeBloom) { +// return +// } +// +// err := p.Write(defaultCmdTimeout, wire.NewMsgMemPool()) +// if err != nil { +// log.Debugf("mempool %v: %v", p, err) +// return +// } +//} func (s *Server) headersPeer(ctx context.Context, p *rawpeer.RawPeer) { log.Tracef("headersPeer %v", p) @@ -421,9 +422,9 @@ func (s *Server) handleGeneric(ctx context.Context, p *rawpeer.RawPeer, msg wire } case *wire.MsgTx: - if err := s.handleTx(ctx, p, m, raw); err != nil { - return fmt.Errorf("handle generic transaction: %w", err) - } + //if err := s.handleTx(ctx, p, m, raw); err != nil { + // return fmt.Errorf("handle generic transaction: %w", err) + //} case *wire.MsgInv: if err := s.handleInv(ctx, p, m, raw); err != nil { @@ -680,9 +681,9 @@ func (s *Server) promPoll(ctx context.Context) error { s.prom.syncInfo = s.Synced(ctx) s.prom.connected, s.prom.good, s.prom.bad = s.pm.Stats() - if s.cfg.MempoolEnabled { - s.prom.mempoolCount, s.prom.mempoolSize = s.mempool.stats(ctx) - } + //if s.cfg.MempoolEnabled { + // s.prom.mempoolCount, s.prom.mempoolSize = s.mempool.stats(ctx) + //} if s.promPollVerbose { s.mtx.RLock() @@ -897,32 +898,36 @@ func (s *Server) blockExpired(ctx context.Context, key any, value any) { } } -func (s *Server) downloadMissingTx(ctx context.Context, p *rawpeer.RawPeer) error { - log.Tracef("downloadMissingTx") - defer log.Tracef("downloadMissingTx exit") - - getData, err := s.mempool.getDataConstruct(ctx) - if err != nil { - return fmt.Errorf("download missing tx: %w", err) - } - err = p.Write(defaultCmdTimeout, getData) - if err != nil { - // peer dead, make sure it is reaped - p.Close() // XXX this should not happen here - if !errors.Is(err, net.ErrClosed) && - !errors.Is(err, os.ErrDeadlineExceeded) { - log.Errorf("download missing tx write: %v %v", p, err) - } - } - return err -} - -func (s *Server) handleTx(ctx context.Context, p *rawpeer.RawPeer, msg *wire.MsgTx, raw []byte) error { - log.Tracef("handleTx") - defer log.Tracef("handleTx exit") - - return s.mempool.txsInsert(ctx, msg, raw) -} +//func (s *Server) downloadMissingTx(ctx context.Context, p *rawpeer.RawPeer) error { +// log.Tracef("downloadMissingTx") +// defer log.Tracef("downloadMissingTx exit") +// +// if true { +// return nil +// } +// +// getData, err := s.mempool.getDataConstruct(ctx) +// if err != nil { +// return fmt.Errorf("download missing tx: %w", err) +// } +// err = p.Write(defaultCmdTimeout, getData) +// if err != nil { +// // peer dead, make sure it is reaped +// p.Close() // XXX this should not happen here +// if !errors.Is(err, net.ErrClosed) && +// !errors.Is(err, os.ErrDeadlineExceeded) { +// log.Errorf("download missing tx write: %v %v", p, err) +// } +// } +// return err +//} + +//func (s *Server) handleTx(ctx context.Context, p *rawpeer.RawPeer, msg *wire.MsgTx, raw []byte) error { +// log.Tracef("handleTx") +// defer log.Tracef("handleTx exit") +// +// return s.mempool.txsInsert(ctx, msg, raw) +//} func (s *Server) syncBlocks(ctx context.Context) { log.Tracef("syncBlocks") @@ -987,6 +992,7 @@ func (s *Server) syncBlocks(ctx context.Context) { return default: + //nolint:errorlint // fuck you linter panic(fmt.Errorf("sync blocks: %T %w", err, err)) } @@ -1197,12 +1203,12 @@ func (s *Server) handleHeaders(ctx context.Context, p *rawpeer.RawPeer, msg *wir log.Debugf("blockheaders caught up at %v: %v", p, bhb.HH()) } - } else { - if s.cfg.MempoolEnabled { - // Start building the mempool. - s.pm.All(ctx, s.mempoolPeer) - } - } + } // else { + //if s.cfg.MempoolEnabled { + // // Start building the mempool. + // s.pm.All(ctx, s.mempoolPeer) + //} + //} // Always call syncBlocks, it either downloads more blocks or // kicks of indexing. @@ -1378,9 +1384,9 @@ func (s *Server) handleBlock(ctx context.Context, p *rawpeer.RawPeer, msg *wire. s.mtx.Unlock() // Reap txs from mempool, no need to log error. - if s.cfg.MempoolEnabled { - _ = s.mempool.txsRemove(ctx, txHashes) - } + //if s.cfg.MempoolEnabled { + // _ = s.mempool.txsRemove(ctx, txHashes) + //} log.Debugf("inserted block at height %d, parent hash %s", height, block.MsgBlock().Header.PrevBlock) @@ -1396,9 +1402,9 @@ func (s *Server) handleBlock(ctx context.Context, p *rawpeer.RawPeer, msg *wire. mempoolSize int connectedPeers int ) - if s.cfg.MempoolEnabled { - mempoolCount, mempoolSize = s.mempool.stats(ctx) - } + //if s.cfg.MempoolEnabled { + // mempoolCount, mempoolSize = s.mempool.stats(ctx) + //} // Grab some peer stats as well connectedPeers, goodPeers, badPeers := s.pm.Stats() @@ -1435,7 +1441,7 @@ func (s *Server) handleInv(ctx context.Context, p *rawpeer.RawPeer, msg *wire.Ms log.Tracef("handleInv (%v)", p) defer log.Tracef("handleInv exit (%v)", p) - var txsFound bool + // var txsFound bool for _, v := range msg.InvList { switch v.Type { @@ -1444,7 +1450,7 @@ func (s *Server) handleInv(ctx context.Context, p *rawpeer.RawPeer, msg *wire.Ms case wire.InvTypeTx: // handle these later or else we have to insert txs one // at a time while taking a mutex. - txsFound = true + // txsFound = true case wire.InvTypeBlock: // Make sure we haven't seen block header yet. _, _, err := s.BlockHeaderByHash(ctx, &v.Hash) @@ -1467,12 +1473,12 @@ func (s *Server) handleInv(ctx context.Context, p *rawpeer.RawPeer, msg *wire.Ms } } - if s.cfg.MempoolEnabled && txsFound { - if err := s.mempool.invTxsInsert(ctx, msg); err != nil { - //nolint:errcheck // Error is intentionally ignored. - go s.downloadMissingTx(ctx, p) - } - } + //if s.cfg.MempoolEnabled && txsFound { + // if err := s.mempool.invTxsInsert(ctx, msg); err != nil { + // //nolint:errcheck // Error is intentionally ignored. + // go s.downloadMissingTx(ctx, p) + // } + //} return nil }