From a83b296e613cd3010ffa5844c038b72badba2023 Mon Sep 17 00:00:00 2001 From: eugene Date: Thu, 7 Sep 2023 09:49:37 -0400 Subject: [PATCH] chain: detect and use gettxspendingprevout RPC instead of mempool This change uses the gettxspendingprevout RPC if the bitcoind version is greater or equal to version 24.0.0. In this case, the mempool is no longer used for zmq clients. The original behavior must be kept for rpc polling clients since they notify on new transactions and can only learn about them from polling the bitcoind mempool and keeping track of its own btcwallet mempool. --- chain/bitcoind_events.go | 37 ++++++- chain/bitcoind_zmq_events.go | 133 ++++++++++++++++++++++---- chain/pruned_block_dispatcher.go | 16 +++- chain/pruned_block_dispatcher_test.go | 2 +- 4 files changed, 165 insertions(+), 23 deletions(-) diff --git a/chain/bitcoind_events.go b/chain/bitcoind_events.go index 004791e718..74d19e307d 100644 --- a/chain/bitcoind_events.go +++ b/chain/bitcoind_events.go @@ -1,6 +1,7 @@ package chain import ( + "encoding/json" "fmt" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -62,5 +63,39 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig, "rpcpolling is disabled") } - return newBitcoindZMQEvents(cfg.ZMQConfig, client) + // Check if the bitcoind node is on a version that has the + // gettxspendingprevout RPC. If it does, then we don't need to maintain + // a mempool for ZMQ clients. + hasRPC, err := hasSpendingPrevoutRPC(client) + if err != nil { + return nil, err + } + + return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC) +} + +// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer +// gettxspendingprevout RPC. +func hasSpendingPrevoutRPC(client *rpcclient.Client) (bool, error) { + // Fetch the bitcoind version. + resp, err := client.RawRequest("getnetworkinfo", nil) + if err != nil { + return false, err + } + + info := struct { + Version int64 `json:"version"` + }{} + + if err := json.Unmarshal(resp, &info); err != nil { + return false, err + } + + // Bitcoind returns a single value representing the semantic version: + // 10000 * CLIENT_VERSION_MAJOR + 100 * CLIENT_VERSION_MINOR + // + 1 * CLIENT_VERSION_BUILD + // + // The gettxspendingprevout call was added in version 24.0.0, so we + // return for versions >= 240000. + return info.Version >= 240000, nil } diff --git a/chain/bitcoind_zmq_events.go b/chain/bitcoind_zmq_events.go index 3f9ba4ca41..0091c1f5bf 100644 --- a/chain/bitcoind_zmq_events.go +++ b/chain/bitcoind_zmq_events.go @@ -2,6 +2,7 @@ package chain import ( "bytes" + "encoding/json" "fmt" "io" "math/rand" @@ -68,9 +69,17 @@ type bitcoindZMQEvents struct { // mempool holds all the transactions that we currently see as being in // the mempool. This is used so that we know which transactions we have - // already sent notifications for. + // already sent notifications for. This will be nil if we are using the + // gettxspendingprevout endpoint. mempool *mempool + // client is an rpc client to the bitcoind backend. + client *rpcclient.Client + + // hasPrevoutRPC is set when the bitcoind version is >= 24.0.0 and + // doesn't need to maintain its own mempool. + hasPrevoutRPC bool + wg sync.WaitGroup quit chan struct{} } @@ -80,8 +89,10 @@ type bitcoindZMQEvents struct { var _ BitcoindEvents = (*bitcoindZMQEvents)(nil) // newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind. +// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit +// the mempool. func newBitcoindZMQEvents(cfg *ZMQConfig, - client *rpcclient.Client) (*bitcoindZMQEvents, error) { + client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) { // Check polling config. if cfg.MempoolPollingInterval == 0 { @@ -122,22 +133,29 @@ func newBitcoindZMQEvents(cfg *ZMQConfig, "events: %v", err) } - return &bitcoindZMQEvents{ - cfg: cfg, - blockConn: zmqBlockConn, - txConn: zmqTxConn, - blockNtfns: make(chan *wire.MsgBlock), - txNtfns: make(chan *wire.MsgTx), - mempool: newMempool(client), - quit: make(chan struct{}), - }, nil + zmqEvents := &bitcoindZMQEvents{ + cfg: cfg, + client: client, + blockConn: zmqBlockConn, + txConn: zmqTxConn, + hasPrevoutRPC: hasRPC, + blockNtfns: make(chan *wire.MsgBlock), + txNtfns: make(chan *wire.MsgTx), + mempool: newMempool(client), + quit: make(chan struct{}), + } + + return zmqEvents, nil } // Start spins off the bitcoindZMQEvent goroutines. func (b *bitcoindZMQEvents) Start() error { - // Load the mempool so we don't miss transactions. - if err := b.mempool.LoadMempool(); err != nil { - return err + // Load the mempool so we don't miss transactions, but only if we need + // one. + if !b.hasPrevoutRPC { + if err := b.mempool.LoadMempool(); err != nil { + return err + } } b.wg.Add(3) @@ -174,16 +192,84 @@ func (b *bitcoindZMQEvents) BlockNotifications() <-chan *wire.MsgBlock { return b.blockNtfns } +// getTxSpendingPrevOutReq is the rpc request format for bitcoind's +// gettxspendingprevout call. +type getTxSpendingPrevOutReq struct { + Txid string `json:"txid"` + Vout uint32 `json:"vout"` +} + +// getTxSpendingPrevOutResp is the rpc response format for bitcoind's +// gettxspendingprevout call. It returns the "spendingtxid" if one exists in +// the mempool. +type getTxSpendingPrevOutResp struct { + Txid string `json:"txid"` + Vout float64 `json:"vout"` + SpendingTxid string `json:"spendingtxid"` +} + // LookupInputSpend returns the transaction that spends the given outpoint // found in the mempool. func (b *bitcoindZMQEvents) LookupInputSpend( op wire.OutPoint) (chainhash.Hash, bool) { - b.mempool.RLock() - defer b.mempool.RUnlock() + if !b.hasPrevoutRPC { + b.mempool.RLock() + defer b.mempool.RUnlock() - // Check whether the input is in mempool. - return b.mempool.containsInput(op) + // Check whether the input is in mempool. + return b.mempool.containsInput(op) + } + + // Otherwise, we aren't maintaining a mempool and can use the + // gettxspendingprevout RPC. Create an RPC-style prevout that will be + // the lone item in an array. + prevoutReq := &getTxSpendingPrevOutReq{ + Txid: op.Hash.String(), Vout: op.Index, + } + + // The RPC takes an array of prevouts so we have an array with a single + // item since we don't yet batch calls to LookupInputSpend. + prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq} + + req, err := json.Marshal(prevoutArr) + if err != nil { + return chainhash.Hash{}, false + } + + resp, err := b.client.RawRequest( + "gettxspendingprevout", []json.RawMessage{req}, + ) + if err != nil { + return chainhash.Hash{}, false + } + + var prevoutResps []getTxSpendingPrevOutResp + err = json.Unmarshal(resp, &prevoutResps) + if err != nil { + return chainhash.Hash{}, false + } + + // We should only get a single item back since we only requested with a + // single item. + if len(prevoutResps) != 1 { + return chainhash.Hash{}, false + } + + result := prevoutResps[0] + + // If the "spendingtxid" field is empty, then the utxo has no spend in + // the mempool at the moment. + if result.SpendingTxid == "" { + return chainhash.Hash{}, false + } + + spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid) + if err != nil { + return chainhash.Hash{}, false + } + + return *spendHash, true } // blockEventHandler reads raw blocks events from the ZMQ block socket and @@ -358,8 +444,10 @@ func (b *bitcoindZMQEvents) txEventHandler() { continue } - // Add the tx to mempool. - b.mempool.Add(tx) + // Add the tx to mempool if we're using one. + if !b.hasPrevoutRPC { + b.mempool.Add(tx) + } select { case b.txNtfns <- tx: @@ -386,6 +474,11 @@ func (b *bitcoindZMQEvents) txEventHandler() { func (b *bitcoindZMQEvents) mempoolPoller() { defer b.wg.Done() + if b.hasPrevoutRPC { + // Exit if we're not using a mempool. + return + } + // We'll wait to start the main reconciliation loop until we're doing // the initial mempool load. b.mempool.WaitForInit() diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go index 2709e9b319..9c2d5e6168 100644 --- a/chain/pruned_block_dispatcher.go +++ b/chain/pruned_block_dispatcher.go @@ -543,7 +543,7 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( if _, ok := d.blocksQueried[*block]; !ok { log.Debugf("Queuing new block %v for request", *block) - inv := wire.NewInvVect(wire.InvTypeBlock, block) + inv := wire.NewInvVect(wire.InvTypeWitnessBlock, block) if err := getData.AddInvVect(inv); err != nil { return nil, nil, err } @@ -632,6 +632,20 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, } } + err = blockchain.ValidateWitnessCommitment(btcutil.NewBlock(block)) + if err != nil { + d.blockMtx.Unlock() + + log.Warnf("Received invalid block %v from peer %v: %v", + blockHash, peer, err) + d.banPeer(peer) + + return query.Progress{ + Progressed: false, + Finished: false, + } + } + // Once validated, we can safely remove it. delete(d.blocksQueried, blockHash) diff --git a/chain/pruned_block_dispatcher_test.go b/chain/pruned_block_dispatcher_test.go index b01deb89ca..b7f801d8b5 100644 --- a/chain/pruned_block_dispatcher_test.go +++ b/chain/pruned_block_dispatcher_test.go @@ -233,7 +233,7 @@ func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer { for _, inv := range msg.InvList { // Invs should always be for blocks. - require.Equal(h.t, wire.InvTypeBlock, inv.Type) + require.Equal(h.t, wire.InvTypeWitnessBlock, inv.Type) // Invs should always be for known blocks. block, ok := h.blocks[inv.Hash]