Skip to content

Commit

Permalink
chain: detect and use gettxspendingprevout RPC instead of mempool
Browse files Browse the repository at this point in the history
This change uses the gettxspendingprevout RPC if the bitcoind version
is greater or equal to version 24.0.0. In this case, the mempool is
no longer used for zmq clients. The original behavior must be kept
for rpc polling clients since they notify on new transactions and can
only learn about them from polling the bitcoind mempool and keeping
track of its own btcwallet mempool.
  • Loading branch information
Crypt-iQ committed Sep 21, 2023
1 parent 07be54b commit a83b296
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 23 deletions.
37 changes: 36 additions & 1 deletion chain/bitcoind_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chain

import (
"encoding/json"
"fmt"

"github.com/btcsuite/btcd/chaincfg/chainhash"
Expand Down Expand Up @@ -62,5 +63,39 @@ func NewBitcoindEventSubscriber(cfg *BitcoindConfig,
"rpcpolling is disabled")
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client)
// Check if the bitcoind node is on a version that has the
// gettxspendingprevout RPC. If it does, then we don't need to maintain
// a mempool for ZMQ clients.
hasRPC, err := hasSpendingPrevoutRPC(client)
if err != nil {
return nil, err
}

return newBitcoindZMQEvents(cfg.ZMQConfig, client, hasRPC)
}

// hasSpendingPrevoutRPC returns whether or not the bitcoind has the newer
// gettxspendingprevout RPC.
func hasSpendingPrevoutRPC(client *rpcclient.Client) (bool, error) {
// Fetch the bitcoind version.
resp, err := client.RawRequest("getnetworkinfo", nil)
if err != nil {
return false, err
}

info := struct {
Version int64 `json:"version"`
}{}

if err := json.Unmarshal(resp, &info); err != nil {
return false, err
}

// Bitcoind returns a single value representing the semantic version:
// 10000 * CLIENT_VERSION_MAJOR + 100 * CLIENT_VERSION_MINOR
// + 1 * CLIENT_VERSION_BUILD
//
// The gettxspendingprevout call was added in version 24.0.0, so we
// return for versions >= 240000.
return info.Version >= 240000, nil
}
133 changes: 113 additions & 20 deletions chain/bitcoind_zmq_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chain

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -68,9 +69,17 @@ type bitcoindZMQEvents struct {

// mempool holds all the transactions that we currently see as being in
// the mempool. This is used so that we know which transactions we have
// already sent notifications for.
// already sent notifications for. This will be nil if we are using the
// gettxspendingprevout endpoint.
mempool *mempool

// client is an rpc client to the bitcoind backend.
client *rpcclient.Client

// hasPrevoutRPC is set when the bitcoind version is >= 24.0.0 and
// doesn't need to maintain its own mempool.
hasPrevoutRPC bool

wg sync.WaitGroup
quit chan struct{}
}
Expand All @@ -80,8 +89,10 @@ type bitcoindZMQEvents struct {
var _ BitcoindEvents = (*bitcoindZMQEvents)(nil)

// newBitcoindZMQEvents initialises the necessary zmq connections to bitcoind.
// If bitcoind is on a version with the gettxspendingprevout RPC, we can omit
// the mempool.
func newBitcoindZMQEvents(cfg *ZMQConfig,
client *rpcclient.Client) (*bitcoindZMQEvents, error) {
client *rpcclient.Client, hasRPC bool) (*bitcoindZMQEvents, error) {

// Check polling config.
if cfg.MempoolPollingInterval == 0 {
Expand Down Expand Up @@ -122,22 +133,29 @@ func newBitcoindZMQEvents(cfg *ZMQConfig,
"events: %v", err)
}

return &bitcoindZMQEvents{
cfg: cfg,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}, nil
zmqEvents := &bitcoindZMQEvents{
cfg: cfg,
client: client,
blockConn: zmqBlockConn,
txConn: zmqTxConn,
hasPrevoutRPC: hasRPC,
blockNtfns: make(chan *wire.MsgBlock),
txNtfns: make(chan *wire.MsgTx),
mempool: newMempool(client),
quit: make(chan struct{}),
}

return zmqEvents, nil
}

// Start spins off the bitcoindZMQEvent goroutines.
func (b *bitcoindZMQEvents) Start() error {
// Load the mempool so we don't miss transactions.
if err := b.mempool.LoadMempool(); err != nil {
return err
// Load the mempool so we don't miss transactions, but only if we need
// one.
if !b.hasPrevoutRPC {
if err := b.mempool.LoadMempool(); err != nil {
return err
}
}

b.wg.Add(3)
Expand Down Expand Up @@ -174,16 +192,84 @@ func (b *bitcoindZMQEvents) BlockNotifications() <-chan *wire.MsgBlock {
return b.blockNtfns
}

// getTxSpendingPrevOutReq is the rpc request format for bitcoind's
// gettxspendingprevout call.
type getTxSpendingPrevOutReq struct {
Txid string `json:"txid"`
Vout uint32 `json:"vout"`
}

// getTxSpendingPrevOutResp is the rpc response format for bitcoind's
// gettxspendingprevout call. It returns the "spendingtxid" if one exists in
// the mempool.
type getTxSpendingPrevOutResp struct {
Txid string `json:"txid"`
Vout float64 `json:"vout"`
SpendingTxid string `json:"spendingtxid"`
}

// LookupInputSpend returns the transaction that spends the given outpoint
// found in the mempool.
func (b *bitcoindZMQEvents) LookupInputSpend(
op wire.OutPoint) (chainhash.Hash, bool) {

b.mempool.RLock()
defer b.mempool.RUnlock()
if !b.hasPrevoutRPC {
b.mempool.RLock()
defer b.mempool.RUnlock()

// Check whether the input is in mempool.
return b.mempool.containsInput(op)
// Check whether the input is in mempool.
return b.mempool.containsInput(op)
}

// Otherwise, we aren't maintaining a mempool and can use the
// gettxspendingprevout RPC. Create an RPC-style prevout that will be
// the lone item in an array.
prevoutReq := &getTxSpendingPrevOutReq{
Txid: op.Hash.String(), Vout: op.Index,
}

// The RPC takes an array of prevouts so we have an array with a single
// item since we don't yet batch calls to LookupInputSpend.
prevoutArr := []*getTxSpendingPrevOutReq{prevoutReq}

req, err := json.Marshal(prevoutArr)
if err != nil {
return chainhash.Hash{}, false
}

resp, err := b.client.RawRequest(
"gettxspendingprevout", []json.RawMessage{req},
)
if err != nil {
return chainhash.Hash{}, false
}

var prevoutResps []getTxSpendingPrevOutResp
err = json.Unmarshal(resp, &prevoutResps)
if err != nil {
return chainhash.Hash{}, false
}

// We should only get a single item back since we only requested with a
// single item.
if len(prevoutResps) != 1 {
return chainhash.Hash{}, false
}

result := prevoutResps[0]

// If the "spendingtxid" field is empty, then the utxo has no spend in
// the mempool at the moment.
if result.SpendingTxid == "" {
return chainhash.Hash{}, false
}

spendHash, err := chainhash.NewHashFromStr(result.SpendingTxid)
if err != nil {
return chainhash.Hash{}, false
}

return *spendHash, true
}

// blockEventHandler reads raw blocks events from the ZMQ block socket and
Expand Down Expand Up @@ -358,8 +444,10 @@ func (b *bitcoindZMQEvents) txEventHandler() {
continue
}

// Add the tx to mempool.
b.mempool.Add(tx)
// Add the tx to mempool if we're using one.
if !b.hasPrevoutRPC {
b.mempool.Add(tx)
}

select {
case b.txNtfns <- tx:
Expand All @@ -386,6 +474,11 @@ func (b *bitcoindZMQEvents) txEventHandler() {
func (b *bitcoindZMQEvents) mempoolPoller() {
defer b.wg.Done()

if b.hasPrevoutRPC {
// Exit if we're not using a mempool.
return
}

// We'll wait to start the main reconciliation loop until we're doing
// the initial mempool load.
b.mempool.WaitForInit()
Expand Down
16 changes: 15 additions & 1 deletion chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (

if _, ok := d.blocksQueried[*block]; !ok {
log.Debugf("Queuing new block %v for request", *block)
inv := wire.NewInvVect(wire.InvTypeBlock, block)
inv := wire.NewInvVect(wire.InvTypeWitnessBlock, block)
if err := getData.AddInvVect(inv); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -632,6 +632,20 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
}
}

err = blockchain.ValidateWitnessCommitment(btcutil.NewBlock(block))
if err != nil {
d.blockMtx.Unlock()

log.Warnf("Received invalid block %v from peer %v: %v",
blockHash, peer, err)
d.banPeer(peer)

return query.Progress{
Progressed: false,
Finished: false,
}
}

// Once validated, we can safely remove it.
delete(d.blocksQueried, blockHash)

Expand Down
2 changes: 1 addition & 1 deletion chain/pruned_block_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer {

for _, inv := range msg.InvList {
// Invs should always be for blocks.
require.Equal(h.t, wire.InvTypeBlock, inv.Type)
require.Equal(h.t, wire.InvTypeWitnessBlock, inv.Type)

// Invs should always be for known blocks.
block, ok := h.blocks[inv.Hash]
Expand Down

0 comments on commit a83b296

Please sign in to comment.