diff --git a/qa/rpc-tests/p2p-tx-download.py b/qa/rpc-tests/p2p-tx-download.py index 381a143c261..34333ef2f11 100644 --- a/qa/rpc-tests/p2p-tx-download.py +++ b/qa/rpc-tests/p2p-tx-download.py @@ -16,8 +16,11 @@ GETDATA_TX_INTERVAL = 30 # seconds TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL # 5 minutes INBOUND_PEER_TX_DELAY = 2 # seconds -TXID_RELAY_DELAY = 2 # seconds +OVERLOADED_PEER_DELAY = 2 # seconds MAX_GETDATA_IN_FLIGHT = 100 +MAX_PEER_TX_ANNOUNCEMENTS = 5000 + +MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY class TxDownloadTestNode(SingleNodeConnCB): def __init__(self): @@ -51,6 +54,11 @@ def is_closed(): return self.connection.state == "closed" return wait_until(is_closed, timeout=30) + def wait_until_numgetdata(self, num): + def has_num(): + return len(self.tx_getdata_received) == num + return wait_until(has_num, timeout=60) + def disconnect(self): self.connection.disconnect_node() return self.wait_for_disconnect() @@ -78,9 +86,11 @@ def run_test(self): self.test_tx_request() self.test_invblock_resolution() - self.test_max_inflight() self.test_disconnect_fallback() self.test_notfound_fallback() + self.test_max_announcements() + self.test_inflight_throttling() + self.test_expiry_fallback() def setup_network(self): # set up full nodes @@ -94,14 +104,10 @@ def setup_network(self): connect_nodes(self.nodes[0], 1) self.sync_all() - # set up incoming (non-honest) peers - self.incoming_peers = [] - for i in range(8): - self.incoming_peers.append(self.create_testnode()) - + # create a single control peer that is only used for ping sync + self.control_peer = self.create_testnode() NetworkThread().start() - for peer in self.incoming_peers: - peer.wait_for_verack() + self.control_peer.wait_for_verack() def create_testnode(self, node_idx=0): node = TxDownloadTestNode() @@ -109,6 +115,20 @@ def create_testnode(self, node_idx=0): node.add_connection(conn) return node + def connect_incoming_peers(self, num): + peers = [] + for _ in range(num): + peer = self.create_testnode() + peer.wait_for_verack() + peers.append(peer) + return peers + + def disconnect_incoming_peers(self, peers): + for peer in peers: + if not peer.disconnect(): + return False + return True + def any_received_getdata(self, hash, peers): for peer in peers: if hash in peer.tx_getdata_received: @@ -129,6 +149,11 @@ def getdata_received(): return True return wait_until(getdata_received, timeout=10) + def wait_for_mocktime(self, node): + def mocktime_is_good(): + return node.getmocktime() >= self.mocktime + return wait_until(mocktime_is_good, timeout=10) + def find_winning_peer(self, peers, hash): # detect which peer won a race for getting a getdata hash selected = None @@ -148,15 +173,20 @@ def forward_mocktime(self, delta_time): self.mocktime += delta_time for node in self.nodes: node.setmocktime(self.mocktime) - # give the nodes some time to process the new mocktime - # can be removed when we have getmocktime - time.sleep(0.1) + if not self.wait_for_mocktime(node): + return False + # sync the control peer with ping so that we're 100% sure we have + # entered a new message handling loop + self.control_peer.sync_with_ping() + return True def forward_mocktime_step2(self, iterations): # forward mocktime in steps of 2 seconds to allow the nodes # time to recognize they have to do something for i in range(iterations): - self.forward_mocktime(2) + if not self.forward_mocktime(2): + return False + return True def next_fake_txid(self): self.fake_txid += 1 @@ -164,24 +194,32 @@ def next_fake_txid(self): def test_tx_request(self): txid = self.next_fake_txid() - self.forward_mocktime(0) + assert self.forward_mocktime(1) - # use incoming peers 0 and 1 - peerset = self.incoming_peers[0:4] - for peer in peerset: + # use 4 peers + peers = self.connect_incoming_peers(4) + for peer in peers: peer.send_tx_inv([txid]) - # To make sure we eventually ask the tx from all 4 nodes that announced + # use 2 more peers that do not send invs + otherpeers = self.connect_incoming_peers(2) + + # To make sure we eventually ask the tx from all 4 peers that announced # to us, we're now jumping 4 * (2 + 2 + 30) = 136 seconds to the future - warp = 4 * (INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + GETDATA_TX_INTERVAL) + warp = 4 * MAX_GETDATA_INBOUND_WAIT self.forward_mocktime_step2(warp//2) # All peers that sent the inv should now have received a getdata request - assert self.wait_for_getdata([txid], peerset) + assert self.wait_for_getdata([txid], peers) # Make sure the other peers did not receive the getdata because they # didn't indicate they have the tx - assert not self.any_received_getdata(txid, self.incoming_peers[4:8]) + assert not self.any_received_getdata(txid, otherpeers) + + # cleanup + assert self.disconnect_incoming_peers(peers) + assert self.disconnect_incoming_peers(otherpeers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) def test_invblock_resolution(self): inputs = [self.nodes[1].listunspent()[0]] @@ -192,127 +230,187 @@ def test_invblock_resolution(self): tx.rehash() txid = int(tx.hash, 16) - self.forward_mocktime(0) + assert self.forward_mocktime(1) # make sure that node 1 is outbound for node 0 assert self.nodes[0].getpeerinfo()[0]['inbound'] == False - # use all peers that only inv but never respond to getdata - for peer in self.incoming_peers: + # use 8 peers that only inv but never respond to getdata + peers = self.connect_incoming_peers(8) + for peer in peers: peer.send_tx_inv([txid]) # send from our honest node last self.nodes[1].sendrawtransaction(tx_hex) - # We jump forward 2x (2 + 2) + 30 + 2 (margin) = 40 seconds to make sure + # We jump forward 2x max inbound wait time to make sure # that we get to the point where we re-evaluate the transaction in 2 # second steps - warp = 2 * (INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY) + GETDATA_TX_INTERVAL + 2 - self.forward_mocktime_step2(warp//2) + warp = 2 * MAX_GETDATA_INBOUND_WAIT + assert self.forward_mocktime_step2(warp//2) assert tx.hash in self.nodes[0].getrawmempool() - def test_max_inflight(self): + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + + def test_inflight_throttling(self): # First, forward time by 2x inflight timeout, so that we have clean # registers for each peer self.forward_mocktime(2 * TX_EXPIRY_INTERVAL) - # now send MAX_GETDATA_IN_FLIGHT (=100) invs with peer 0 - peer = self.incoming_peers[0] - invd = [] + # now send MAX_GETDATA_IN_FLIGHT (=100) invs with 1 peer + peer = self.connect_incoming_peers(1)[0] + invs = [] for i in range(MAX_GETDATA_IN_FLIGHT): txid = self.next_fake_txid() - peer.send_tx_inv([txid]) - invd.append(txid) + invs.append(txid) - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + peer.send_tx_inv(invs) - # test that we got all the getdatas - assert self.wait_for_getdata(invd, [peer]) + # warp forward 3 seconds in steps of 1 second + warp = INBOUND_PEER_TX_DELAY + 1 + for _ in range(warp): + assert self.forward_mocktime(1) + peer.sync_with_ping() - # send one more inv with our now maxed out peer - txid_failed = self.next_fake_txid() - peer.send_tx_inv([txid_failed]) - # and send one inv with another peer - txid_success = self.next_fake_txid() - self.incoming_peers[1].send_tx_inv([txid_success]) + # test that we got all the getdata + assert peer.wait_until_numgetdata(MAX_GETDATA_IN_FLIGHT) - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + peer.send_tx_inv([self.next_fake_txid()]) - # test that we got a getdata for the successful tx with peer 1 - assert self.wait_for_getdata([txid_success], [self.incoming_peers[1]]) - # test that we did not get a getdata for the failed txid with peer 0 - assert not self.any_received_getdata(txid_failed, [peer]) + # warp forward 3 seconds again + warp = INBOUND_PEER_TX_DELAY + 1 + for _ in range(warp): + assert self.forward_mocktime(1) + peer.sync_with_ping() - # clear out the inflight register by expiring all requests - self.forward_mocktime(TX_EXPIRY_INTERVAL) + # test that we haven't received the getdata request yet + assert len(peer.tx_getdata_received) == MAX_GETDATA_IN_FLIGHT - # send one inv with 4 txs - txids = [] - for i in range(4): - txids.append(self.next_fake_txid()) - peer.send_tx_inv(txids) + # additionally warp the overloaded peer delay time second margin + warp = OVERLOADED_PEER_DELAY + for _ in range(warp): + assert self.forward_mocktime(1) + peer.sync_with_ping() - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # test that we now received the getdata + assert peer.wait_until_numgetdata(MAX_GETDATA_IN_FLIGHT + 1) + + # cleanup + assert self.disconnect_incoming_peers([peer]) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) - # test that we got a getdata for the final inv with peer 0 - assert self.wait_for_getdata(txids, [peer]) + def test_expiry_fallback(self): + # create 2 new peers + peers = self.connect_incoming_peers(2) + + txid = self.next_fake_txid() + assert self.forward_mocktime(1) + + for peer in peers: + peer.send_tx_inv([txid]) + + # warp forward 2 + 2 (margin) = 4 seconds in steps of 2 + warp = INBOUND_PEER_TX_DELAY + 2 + assert self.forward_mocktime_step2(warp//2) + + winner, loser = self.find_winning_peer(peers, txid) + + # expire the request from the winning peer by doing nothing + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) + + # the losing peer is now the fallback and received a getdata message + assert self.wait_for_getdata([txid], [loser]) + + #cleanup + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) def test_notfound_fallback(self): - # use peer 4 and 5 to concurrently send 2 invs - peers = self.incoming_peers[4:6] + # use 2 peers to concurrently send 2 invs + peers = self.connect_incoming_peers(2) txid = self.next_fake_txid() - self.forward_mocktime(1) + assert self.forward_mocktime(1) for peer in peers: peer.send_tx_inv([txid]) - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward 2 + 2 (margin) = 4 seconds in steps of 2 + warp = INBOUND_PEER_TX_DELAY + 2 + assert self.forward_mocktime_step2(warp//2) winner, loser = self.find_winning_peer(peers, txid) # send a reject message from the peer that won the race winner.send_tx_notfound([txid]) - # warp forward 30 + 2 + 2 + 2 (margin) = 36 seconds in steps of 2 - warp = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward the max wait time in steps of 2 + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) # the losing peer is now the fallback and received a getdata message assert self.wait_for_getdata([txid], [loser]) + #cleanup + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + def test_disconnect_fallback(self): - # use peer 6 and 7 to concurrently send 2 invs - peers = self.incoming_peers[6:8] + # use 2 peers to concurrently send 2 invs + peers = self.connect_incoming_peers(2) txid = self.next_fake_txid() - self.forward_mocktime(1) + assert self.forward_mocktime(1) for peer in peers: peer.send_tx_inv([txid]) - # warp forward 2 + 2 + 2 (margin) = 6 seconds in steps of 2 - warp = INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward 2 + 2 (margin) = 4 seconds in steps of 2 + warp = INBOUND_PEER_TX_DELAY + 2 + assert self.forward_mocktime_step2(warp//2) winner, loser = self.find_winning_peer(peers, txid) # drop connection from the peer that won the race assert winner.disconnect() - # warp forward 30 + 2 + 2 + 2 (margin) = 36 seconds in steps of 2 - warp = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY + TXID_RELAY_DELAY + 2 - self.forward_mocktime_step2(warp//2) + # warp forward the max wait time in steps of 2 + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) # the losing peer is now the fallback and received a getdata message assert self.wait_for_getdata([txid], [loser]) + #cleanup + assert self.disconnect_incoming_peers(peers) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + + def test_max_announcements(self): + # create a test node + peer = self.connect_incoming_peers(1)[0] + + assert self.forward_mocktime(1) + + hashes = [] + for _ in range(MAX_PEER_TX_ANNOUNCEMENTS): + hashes.append(self.next_fake_txid()) + + peer.send_tx_inv(hashes) + + # wait the maximum time before expiry minus 2 seconds to receive all + # getdata requests with this peer + warp = MAX_GETDATA_INBOUND_WAIT - 2 + assert self.forward_mocktime_step2(warp//2) + assert peer.wait_until_numgetdata(MAX_PEER_TX_ANNOUNCEMENTS) + peer.sync_with_ping() + + # send one more and wait the maximum time - this should never come back. + extratx = self.next_fake_txid() + peer.send_tx_inv([extratx]) + assert self.forward_mocktime_step2(MAX_GETDATA_INBOUND_WAIT//2) + assert not self.any_received_getdata(extratx, [peer]) + + #cleanup + assert self.disconnect_incoming_peers([peer]) + assert self.forward_mocktime(TX_EXPIRY_INTERVAL) + if __name__ == '__main__': TxDownloadTest().main() diff --git a/src/Makefile.am b/src/Makefile.am index a6496ffe676..006a5aa1894 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -111,7 +111,6 @@ BITCOIN_CORE_H = \ key.h \ keystore.h \ dbwrapper.h \ - limitedmap.h \ memusage.h \ merkleblock.h \ miner.h \ @@ -154,9 +153,11 @@ BITCOIN_CORE_H = \ torcontrol.h \ txdb.h \ txmempool.h \ + txrequest.h \ ui_interface.h \ undo.h \ util.h \ + utilmemory.h \ utilmoneystr.h \ utiltime.h \ utilstring.h \ @@ -218,6 +219,7 @@ libdogecoin_server_a_SOURCES = \ torcontrol.cpp \ txdb.cpp \ txmempool.cpp \ + txrequest.cpp \ ui_interface.cpp \ validation.cpp \ validationinterface.cpp \ diff --git a/src/Makefile.test.include b/src/Makefile.test.include index c285889c8d9..9ba0e989e54 100644 --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -100,7 +100,6 @@ BITCOIN_TESTS =\ test/getarg_tests.cpp \ test/hash_tests.cpp \ test/key_tests.cpp \ - test/limitedmap_tests.cpp \ test/dbwrapper_tests.cpp \ test/main_tests.cpp \ test/mempool_tests.cpp \ @@ -136,6 +135,7 @@ BITCOIN_TESTS =\ test/testutil.h \ test/timedata_tests.cpp \ test/transaction_tests.cpp \ + test/txrequest_tests.cpp \ test/txvalidationcache_tests.cpp \ test/versionbits_tests.cpp \ test/uint256_tests.cpp \ diff --git a/src/limitedmap.h b/src/limitedmap.h deleted file mode 100644 index e9dcb6defdb..00000000000 --- a/src/limitedmap.h +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (c) 2012-2016 The Bitcoin Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#ifndef BITCOIN_LIMITEDMAP_H -#define BITCOIN_LIMITEDMAP_H - -#include -#include - -/** STL-like map container that only keeps the N elements with the highest value. */ -template -class limitedmap -{ -public: - typedef K key_type; - typedef V mapped_type; - typedef std::pair value_type; - typedef typename std::map::const_iterator const_iterator; - typedef typename std::map::size_type size_type; - -protected: - std::map map; - typedef typename std::map::iterator iterator; - std::multimap rmap; - typedef typename std::multimap::iterator rmap_iterator; - size_type nMaxSize; - -public: - limitedmap(size_type nMaxSizeIn) - { - assert(nMaxSizeIn > 0); - nMaxSize = nMaxSizeIn; - } - const_iterator begin() const { return map.begin(); } - const_iterator end() const { return map.end(); } - size_type size() const { return map.size(); } - bool empty() const { return map.empty(); } - const_iterator find(const key_type& k) const { return map.find(k); } - size_type count(const key_type& k) const { return map.count(k); } - void insert(const value_type& x) - { - std::pair ret = map.insert(x); - if (ret.second) { - if (map.size() > nMaxSize) { - map.erase(rmap.begin()->second); - rmap.erase(rmap.begin()); - } - rmap.insert(make_pair(x.second, ret.first)); - } - } - void erase(const key_type& k) - { - iterator itTarget = map.find(k); - if (itTarget == map.end()) - return; - std::pair itPair = rmap.equal_range(itTarget->second); - for (rmap_iterator it = itPair.first; it != itPair.second; ++it) - if (it->second == itTarget) { - rmap.erase(it); - map.erase(itTarget); - return; - } - // Shouldn't ever get here - assert(0); - } - void update(const_iterator itIn, const mapped_type& v) - { - // Using map::erase() with empty range instead of map::find() to get a non-const iterator, - // since it is a constant time operation in C++11. For more details, see - // https://stackoverflow.com/questions/765148/how-to-remove-constness-of-const-iterator - iterator itTarget = map.erase(itIn, itIn); - - if (itTarget == map.end()) - return; - std::pair itPair = rmap.equal_range(itTarget->second); - for (rmap_iterator it = itPair.first; it != itPair.second; ++it) - if (it->second == itTarget) { - rmap.erase(it); - itTarget->second = v; - rmap.insert(make_pair(v, itTarget)); - return; - } - // Shouldn't ever get here - assert(0); - } - size_type max_size() const { return nMaxSize; } - size_type max_size(size_type s) - { - assert(s > 0); - while (map.size() > s) { - map.erase(rmap.begin()->second); - rmap.erase(rmap.begin()); - } - nMaxSize = s; - return nMaxSize; - } -}; - -#endif // BITCOIN_LIMITEDMAP_H diff --git a/src/net.h b/src/net.h index 466ae5384c6..7d78c0eac84 100644 --- a/src/net.h +++ b/src/net.h @@ -13,7 +13,6 @@ #include "bloom.h" #include "compat.h" #include "hash.h" -#include "limitedmap.h" #include "netaddress.h" #include "protocol.h" #include "random.h" diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 74cd842f3be..54a95193fc7 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -25,6 +25,7 @@ #include "random.h" #include "tinyformat.h" #include "txmempool.h" +#include "txrequest.h" #include "ui_interface.h" #include "util.h" #include "utilmoneystr.h" @@ -51,18 +52,21 @@ struct IteratorComparator /** Maximum number of in-flight transactions from a peer */ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; -/** Maximum number of announced transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** + * Maximum number of transactions to consider for requesting, per peer. + * + * It provides a reasonable DoS limit to per-peer memory usage spent on + * announcements, while covering peers continuously sending INVs at the maximum + * rate for several minutes (see INVENTORY_BROADCAST_MAX), while not receiving + * the actual transaction (from any peer) in response to requests for them. + */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 5000; /** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds +static constexpr int64_t NONPREF_PEER_TX_DELAY = 2 * 1000000; // 2 seconds +/** How many microseconds to delay requesting transactions from overloaded peers */ +static constexpr int64_t OVERLOADED_PEER_TX_DELAY = 2 * 1000000; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ static constexpr int64_t GETDATA_TX_INTERVAL = 30 * 1000000; // 30 seconds -/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds -/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ -static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL; -static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, -"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ static const unsigned int MAX_GETDATA_SZ = 1000; @@ -79,6 +83,8 @@ void EraseOrphansFor(NodeId peer) EXCLUSIVE_LOCKS_REQUIRED(cs_main); static size_t vExtraTxnForCompactIt = 0; static std::vector> vExtraTxnForCompact GUARDED_BY(cs_main); +static TxRequestTracker g_txrequest GUARDED_BY(cs_main); + static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8] // Internal stuff @@ -218,70 +224,6 @@ struct CNodeState { */ bool fSupportsDesiredCmpctVersion; - /* - * State associated with transaction download. - * - * Tx download algorithm: - * - * When inv comes in, queue up (process_time, txid) inside the peer's - * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer - * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). - * - * The process_time for a transaction is set to current_time for outbound - * peers, current_time + 2 seconds for inbound peers. This is the time at - * which we'll consider trying to request the transaction from the peer in - * SendMessages(). The delay for inbound peers is to allow outbound peers - * a chance to announce before we request from inbound peers, to prevent - * an adversary from using inbound connections to blind us to a - * transaction (InvBlock). - * - * When we call SendMessages() for a given peer, - * we will loop over the transactions in m_tx_process_time, looking - * at the transactions whose process_time <= current_time. We'll request - * each such transaction that we don't have already and that hasn't been - * requested from another peer recently, up until we hit the - * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested txid, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate transaction - * requests amongst our peers. - * - * For transactions that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, txid) - * back into the peer's m_tx_process_time at the point in the future at - * which the most recent GETDATA request would time out (ie - * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). - * We add an additional delay for inbound peers, again to prefer - * attempting download from outbound peers first. - * We also add an extra small random delay up to 2 seconds - * to avoid biasing some peers over others. (e.g., due to fixed ordering - * of peer processing in ThreadMessageHandler). - * - * When we receive a transaction from a peer, we remove the txid from the - * peer's m_tx_in_flight set and from their recently announced set - * (m_tx_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the transaction is not accepted but also not added to - * the reject filter, then we will eventually redownload from other - * peers. - */ - - struct TxDownloadState { - /* Track when to attempt download of announced transactions (process - * time in micros -> txid) - */ - std::multimap m_tx_process_time; - - //! Store all the transactions a peer has recently announced - std::set m_tx_announced; - - //! Store transactions which were requested by us, with timestamp - std::map m_tx_in_flight; - - //! Periodically check for stuck getdata requests - int64_t m_check_expiry_timer{0}; - }; - - TxDownloadState m_tx_download; - CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; nMisbehavior = 0; @@ -305,10 +247,8 @@ struct CNodeState { fWantsCmpctWitness = false; fSupportsDesiredCmpctVersion = false; } -}; -// Keeps track of the time (in microseconds) when transactions were requested last time -limitedmap g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); +}; /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -358,6 +298,7 @@ void InitializeNode(CNode *pnode, CConnman& connman) { { LOCK(cs_main); mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); + assert(g_txrequest.Count(nodeid) == 0); } if(!pnode->fInbound) PushNodeVersion(pnode, connman, GetTime()); @@ -379,6 +320,8 @@ void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { mapBlocksInFlight.erase(entry.hash); } EraseOrphansFor(nodeid); + g_txrequest.DisconnectedPeer(nodeid); + nPreferredDownload -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); @@ -390,6 +333,7 @@ void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { assert(mapBlocksInFlight.empty()); assert(nPreferredDownload == 0); assert(nPeersWithValidatedDownloads == 0); + assert(g_txrequest.Size() == 0); } } @@ -643,70 +587,34 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectorsecond; - } - return 0; -} - -void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(txid); - - if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(txid, request_time)); - } else { - g_already_asked_for.update(it, request_time); - } -} +} // anon namespace -int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void AddTxAnnouncement(CNode* node, const uint256& txhash, int64_t current_time) { - int64_t process_time; - int64_t last_request_time = GetTxRequestTime(txid); - // First time requesting this tx - if (last_request_time == 0) { - process_time = current_time; - } else { - // Randomize the delay to avoid biasing some peers over others (such as due to - // fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); - } + AssertLockHeld(cs_main); // For g_txrequest + NodeId nodeid = node->GetId(); - // We delay processing announcements from inbound peers - if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; - - return process_time; -} - -void RequestTx(CNodeState* state, const uint256& txid, int64_t current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(txid)) { - // Too many queued announcements from this peer, or we already have - // this announcement + if (!node->fWhitelisted && g_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) { + // Too many queued announcements from this peer return; } - peer_download_state.m_tx_announced.insert(txid); - // Calculate the time to try requesting this transaction. Use - // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload); + // Decide the TxRequestTracker parameters for this announcement: + // - "preferred": if fPreferredDownload is set (= outbound, or whitelisted) + // - "reqtime": current time plus delays for: + // - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections + // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least + // MAX_PEER_TX_IN_FLIGHT requests in flight and aren't whitelisted. + const CNodeState* state = State(nodeid); + const bool preferred = state->fPreferredDownload; + const bool overloaded = (!node->fWhitelisted && g_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_IN_FLIGHT); - peer_download_state.m_tx_process_time.emplace(process_time, txid); -} + int64_t delay = 0; + if (!preferred) delay += NONPREF_PEER_TX_DELAY; + if (overloaded) delay += OVERLOADED_PEER_TX_DELAY; -} // anon namespace + g_txrequest.ReceivedInv(nodeid, txhash, preferred, current_time + delay); +} bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { LOCK(cs_main); @@ -941,6 +849,9 @@ void PeerLogicValidation::SyncTransaction(const CTransaction& tx, const CBlockIn } LogPrint("mempool", "Erased %d orphan tx included or conflicted by block\n", nErased); } + + // Forget tracked announcements for transactions included in a block. + g_txrequest.ForgetTxHash(tx.GetHash()); } static CCriticalSection cs_most_recent_block; @@ -1848,7 +1759,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (fBlocksOnly) LogPrint("net", "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->id); else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload()) - RequestTx(State(pfrom->GetId()), inv.hash, current_time); + AddTxAnnouncement(pfrom, inv.hash, current_time); } } @@ -2089,15 +2000,18 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fMissingInputs = false; CValidationState state; - CNodeState* nodestate = State(pfrom->GetId()); - nodestate->m_tx_download.m_tx_announced.erase(inv.hash); - nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); - EraseTxRequest(inv.hash); + // Mark the tx as received + g_txrequest.ReceivedResponse(pfrom->GetId(), inv.hash); std::list lRemovedTxn; if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, true, &fMissingInputs, &lRemovedTxn)) { mempool.check(pcoinsTip); + + // As this version of the transaction was acceptable, we can forget + // about any requests for it. + g_txrequest.ForgetTxHash(tx.GetHash()); + RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); @@ -2134,10 +2048,16 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr BOOST_FOREACH(const CTxIn& txin, tx.vin) { CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time); + if (!AlreadyHave(_inv)) { + AddTxAnnouncement(pfrom, _inv.hash, current_time); + } } AddOrphanTx(ptx, pfrom->GetId()); + // Once added to the orphan pool, a tx is considered + // AlreadyHave, and we shouldn't request it anymore. + g_txrequest.ForgetTxHash(tx.GetHash()); + // DoS prevention: do not allow mapOrphanTransactions to grow unbounded unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS)); unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx); @@ -2148,6 +2068,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // We will continue to reject this tx since it has rejected // parents so avoid re-requesting it from other peers. recentRejects->insert(tx.GetHash()); + g_txrequest.ForgetTxHash(tx.GetHash()); } } else { if (!tx.HasWitness() && !state.CorruptionPossible()) { @@ -2156,6 +2077,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // See https://github.com/bitcoin/bitcoin/issues/8279 for details. assert(recentRejects); recentRejects->insert(tx.GetHash()); + g_txrequest.ForgetTxHash(tx.GetHash()); if (RecursiveDynamicUsage(*ptx) < 100000) { AddToCompactExtraTransactions(ptx); } @@ -2851,23 +2773,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr else if (strCommand == NetMsgType::NOTFOUND) { // Remove the NOTFOUND transactions from the peer LOCK(cs_main); - CNodeState *state = State(pfrom->GetId()); std::vector vInv; vRecv >> vInv; if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { for (CInv &inv : vInv) { if (inv.type == MSG_TX || inv.type == MSG_WITNESS_TX) { - // If we receive a NOTFOUND message for a txid we requested, erase - // it from our data structures for this peer. - auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); - if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { - // Skip any further work if this is a spurious NOTFOUND - // message. - continue; - } - state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(inv.hash); - LogPrint("net", "received: notfound tx %s from peer=%d\n", inv.hash.ToString(), pfrom->id); + // If we receive a NOTFOUND message for a txid we requested, we + // mark the announcement as completed in TxRequestTracker. + g_txrequest.ReceivedResponse(pfrom->GetId(), inv.hash); } } } @@ -3543,59 +3456,25 @@ bool SendMessages(CNode* pto, CConnman& connman, const std::atomic& interr // // Message: getdata (non-blocks) // - - // For robustness, expire old requests after a long timeout, so that - // we can resume downloading transactions from a peer even if they - // were unresponsive in the past. - // Eventually we should consider disconnecting peers, but this is - // conservative. - if (state.m_tx_download.m_check_expiry_timer <= current_time) { - for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= current_time - TX_EXPIRY_INTERVAL) { - LogPrint("net", "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(it->first); - state.m_tx_download.m_tx_in_flight.erase(it++); - } else { - ++it; - } - } - // On average, we do this check every TX_EXPIRY_INTERVAL/3.75. Randomize - // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL/5 + GetRand(TX_EXPIRY_INTERVAL/5); + std::vector> expired; + auto requestable = g_txrequest.GetRequestable(pto->GetId(), current_time, &expired); + for (const auto& entry : expired) { + LogPrint("net", "timeout of inflight tx %s from peer=%d\n", entry.second.ToString(), entry.first); } - auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const uint256 txid = tx_process_time.begin()->second; - // Erase this entry from tx_process_time (it may be added back for - // processing at a later time, see below) - tx_process_time.erase(tx_process_time.begin()); - CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txid); + for (const uint256& txhash : requestable) { + CInv inv(MSG_TX | GetFetchFlags(pto, chainActive.Tip(), consensusParams), txhash); if (!AlreadyHave(inv)) { - // If this transaction was last requested more than 1 minute ago, - // then request. - int64_t last_request_time = GetTxRequestTime(inv.hash); - if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { - LogPrint("net", "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= MAX_GETDATA_SZ) { - connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); - } - UpdateTxRequestTime(inv.hash, current_time); - state.m_tx_download.m_tx_in_flight.emplace(inv.hash, current_time); - } else { - // This transaction is in flight from someone else; queue - // up processing to happen after the download times out - // (with a slight delay for inbound peers, to prefer - // requests to outbound peers). - int64_t next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload); - tx_process_time.emplace(next_process_time, txid); + LogPrint("net", "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.emplace_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); } + g_txrequest.RequestedTx(pto->GetId(), txhash, current_time + GETDATA_TX_INTERVAL); } else { // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(inv.hash); - state.m_tx_download.m_tx_in_flight.erase(inv.hash); + g_txrequest.ForgetTxHash(txhash); } } diff --git a/src/test/limitedmap_tests.cpp b/src/test/limitedmap_tests.cpp deleted file mode 100644 index b071ab117b9..00000000000 --- a/src/test/limitedmap_tests.cpp +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) 2012-2016 The Bitcoin Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#include "limitedmap.h" - -#include "test/test_bitcoin.h" - -#include - -BOOST_FIXTURE_TEST_SUITE(limitedmap_tests, BasicTestingSetup) - -BOOST_AUTO_TEST_CASE(limitedmap_test) -{ - // create a limitedmap capped at 10 items - limitedmap map(10); - - // check that the max size is 10 - BOOST_CHECK(map.max_size() == 10); - - // check that it's empty - BOOST_CHECK(map.size() == 0); - - // insert (-1, -1) - map.insert(std::pair(-1, -1)); - - // make sure that the size is updated - BOOST_CHECK(map.size() == 1); - - // make sure that the new item is in the map - BOOST_CHECK(map.count(-1) == 1); - - // insert 10 new items - for (int i = 0; i < 10; i++) { - map.insert(std::pair(i, i + 1)); - } - - // make sure that the map now contains 10 items... - BOOST_CHECK(map.size() == 10); - - // ...and that the first item has been discarded - BOOST_CHECK(map.count(-1) == 0); - - // iterate over the map, both with an index and an iterator - limitedmap::const_iterator it = map.begin(); - for (int i = 0; i < 10; i++) { - // make sure the item is present - BOOST_CHECK(map.count(i) == 1); - - // use the iterator to check for the expected key and value - BOOST_CHECK(it->first == i); - BOOST_CHECK(it->second == i + 1); - - // use find to check for the value - BOOST_CHECK(map.find(i)->second == i + 1); - - // update and recheck - map.update(it, i + 2); - BOOST_CHECK(map.find(i)->second == i + 2); - - it++; - } - - // check that we've exhausted the iterator - BOOST_CHECK(it == map.end()); - - // resize the map to 5 items - map.max_size(5); - - // check that the max size and size are now 5 - BOOST_CHECK(map.max_size() == 5); - BOOST_CHECK(map.size() == 5); - - // check that items less than 5 have been discarded - // and items greater than 5 are retained - for (int i = 0; i < 10; i++) { - if (i < 5) { - BOOST_CHECK(map.count(i) == 0); - } else { - BOOST_CHECK(map.count(i) == 1); - } - } - - // erase some items not in the map - for (int i = 100; i < 1000; i += 100) { - map.erase(i); - } - - // check that the size is unaffected - BOOST_CHECK(map.size() == 5); - - // erase the remaining elements - for (int i = 5; i < 10; i++) { - map.erase(i); - } - - // check that the map is now empty - BOOST_CHECK(map.empty()); -} - -BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/txrequest_tests.cpp b/src/test/txrequest_tests.cpp new file mode 100644 index 00000000000..74f87f15997 --- /dev/null +++ b/src/test/txrequest_tests.cpp @@ -0,0 +1,735 @@ +// Copyright (c) 2020 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + + +#include "txrequest.h" +#include "uint256.h" + +#include "test/test_bitcoin.h" +#include "test/test_random.h" + +#include +#include +#include + +#include + +BOOST_FIXTURE_TEST_SUITE(txrequest_tests, TestingSetup) + +namespace { + +OpenSSLRandomContext g_insecure_rand_ctx; + +constexpr int64_t MIN_TIME = 0; // per GetTimeMicros() in utiltime.cpp +constexpr int64_t MAX_TIME = std::numeric_limits::max(); +constexpr int64_t MICROSECOND = 1; +constexpr int64_t NO_TIME = 0; + +/** An Action is a function to call at a particular (simulated) timestamp. */ +using Action = std::pair>; + +/** Object that stores actions from multiple interleaved scenarios, and data shared across them. + * + * The Scenario below is used to fill this. + */ +struct Runner +{ + /** The TxRequestTracker being tested. */ + TxRequestTracker txrequest; + + /** List of actions to be executed (in order of increasing timestamp). */ + std::vector actions; + + /** Which node ids have been assigned already (to prevent reuse). */ + std::set peerset; + + /** Which txhashes have been assigned already (to prevent reuse). */ + std::set txhashset; + + /** + * Which (peer, txhash) combinations are known to be expired. These need + * to be accumulated here instead of checked directly in the GetRequestable + * return value to avoid introducing a dependency between the various + * parallel tests. + */ + std::multiset> expired; +}; + +int64_t RandomTime8s() { return int64_t(1 + (insecure_rand() % (1 << 9))); } +int64_t RandomTime1y() { return int64_t(1 + GetRand((uint64_t(1) << 46) - 1)); } + +/** A proxy for a Runner that helps build a sequence of consecutive test actions on a TxRequestTracker. + * + * Each Scenario is a proxy through which actions for the (sequential) execution of various tests are added to a + * Runner. The actions from multiple scenarios are then run concurrently, resulting in these tests being performed + * against a TxRequestTracker in parallel. Every test has its own unique txhashes and NodeIds which are not + * reused in other tests, and thus they should be independent from each other. Running them in parallel however + * means that we verify the behavior (w.r.t. one test's txhashes and NodeIds) even when the state of the data + * structure is more complicated due to the presence of other tests. + */ +class Scenario +{ + Runner& m_runner; + int64_t m_now; + std::string m_testname; + +public: + Scenario(Runner& runner, int64_t starttime) : m_runner(runner), m_now(starttime) {} + + /** Set a name for the current test, to give more clear error messages. */ + void SetTestName(std::string testname) + { + m_testname = std::move(testname); + } + + /** Advance this Scenario's time; this affects the timestamps newly scheduled events get. */ + void AdvanceTime(int64_t amount) + { + assert(amount >= 0); + m_now += amount; + } + + /** Schedule a ForgetTxHash call at the Scheduler's current time. */ + void ForgetTxHash(const uint256& txhash) + { + auto& runner = m_runner; + runner.actions.emplace_back(m_now, [=,&runner]() { + runner.txrequest.ForgetTxHash(txhash); + runner.txrequest.SanityCheck(); + }); + } + + /** Schedule a ReceivedInv call at the Scheduler's current time. */ + void ReceivedInv(NodeId peer, uint256& txhash, bool pref, int64_t reqtime) + { + auto& runner = m_runner; + runner.actions.emplace_back(m_now, [=,&runner]() { + runner.txrequest.ReceivedInv(peer, txhash, pref, reqtime); + runner.txrequest.SanityCheck(); + }); + } + + /** Schedule a DisconnectedPeer call at the Scheduler's current time. */ + void DisconnectedPeer(NodeId peer) + { + auto& runner = m_runner; + runner.actions.emplace_back(m_now, [=,&runner]() { + runner.txrequest.DisconnectedPeer(peer); + runner.txrequest.SanityCheck(); + }); + } + + /** Schedule a RequestedTx call at the Scheduler's current time. */ + void RequestedTx(NodeId peer, const uint256& txhash, int64_t exptime) + { + auto& runner = m_runner; + runner.actions.emplace_back(m_now, [=,&runner]() { + runner.txrequest.RequestedTx(peer, txhash, exptime); + runner.txrequest.SanityCheck(); + }); + } + + /** Schedule a ReceivedResponse call at the Scheduler's current time. */ + void ReceivedResponse(NodeId peer, const uint256& txhash) + { + auto& runner = m_runner; + runner.actions.emplace_back(m_now, [=,&runner]() { + runner.txrequest.ReceivedResponse(peer, txhash); + runner.txrequest.SanityCheck(); + }); + } + + /** Schedule calls to verify the TxRequestTracker's state at the Scheduler's current time. + * + * @param peer The peer whose state will be inspected. + * @param expected The expected return value for GetRequestable(peer) + * @param candidates The expected return value CountCandidates(peer) + * @param inflight The expected return value CountInFlight(peer) + * @param completed The expected return value of Count(peer), minus candidates and inflight. + * @param checkname An arbitrary string to include in error messages, for test identificatrion. + * @param offset Offset with the current time to use (must be <= 0). This allows simulations of time going + * backwards (but note that the ordering of this event only follows the scenario's m_now. + */ + void Check(NodeId peer, const std::vector& expected, size_t candidates, size_t inflight, + size_t completed, const std::string& checkname, + int64_t offset = int64_t(0)) + { + const auto comment = m_testname + " " + checkname; + auto& runner = m_runner; + const auto now = m_now; + assert(offset <= 0); + runner.actions.emplace_back(m_now, [=,&runner]() { + std::vector> expired_now; + auto ret = runner.txrequest.GetRequestable(peer, now + offset, &expired_now); + for (const auto& entry : expired_now) runner.expired.insert(entry); + runner.txrequest.SanityCheck(); + runner.txrequest.PostGetRequestableSanityCheck(now + offset); + size_t total = candidates + inflight + completed; + size_t real_total = runner.txrequest.Count(peer); + size_t real_candidates = runner.txrequest.CountCandidates(peer); + size_t real_inflight = runner.txrequest.CountInFlight(peer); + BOOST_CHECK_MESSAGE(real_total == total, strprintf("[" + comment + "] total %i (%i expected)", real_total, total)); + BOOST_CHECK_MESSAGE(real_inflight == inflight, strprintf("[" + comment + "] inflight %i (%i expected)", real_inflight, inflight)); + BOOST_CHECK_MESSAGE(real_candidates == candidates, strprintf("[" + comment + "] candidates %i (%i expected)", real_candidates, candidates)); + BOOST_CHECK_MESSAGE(ret == expected, "[" + comment + "] mismatching requestables"); + }); + } + + /** Verify that an announcement for txhash by peer has expired some time before this check is scheduled. + * + * Every expected expiration should be accounted for through exactly one call to this function. + */ + void CheckExpired(NodeId peer, uint256 txhash) + { + const auto& testname = m_testname; + auto& runner = m_runner; + runner.actions.emplace_back(m_now, [=,&runner]() { + auto it = runner.expired.find(std::pair{peer, txhash}); + BOOST_CHECK_MESSAGE(it != runner.expired.end(), "[" + testname + "] missing expiration"); + if (it != runner.expired.end()) runner.expired.erase(it); + }); + } + + /** Generate a random txhash, whose priorities for certain peers are constrained. + * + * For example, NewTxHash({{p1,p2,p3},{p2,p4,p5}}) will generate a txhash T such that both: + * - priority(p1,T) > priority(p2,T) > priority(p3,T) + * - priority(p2,T) > priority(p4,T) > priority(p5,T) + * where priority is the predicted internal TxRequestTracker's priority, assuming all announcements + * are within the same preferredness class. + */ + uint256 NewTxHash(const std::vector>& orders = {}) + { + uint256 ret; + bool ok; + do { + ret = GetRandHash(); + ok = true; + for (const auto& order : orders) { + for (size_t pos = 1; pos < order.size(); ++pos) { + uint64_t prio_prev = m_runner.txrequest.ComputePriority(ret, order[pos - 1], true); + uint64_t prio_cur = m_runner.txrequest.ComputePriority(ret, order[pos], true); + if (prio_prev <= prio_cur) { + ok = false; + break; + } + } + if (!ok) break; + } + if (ok) { + ok = m_runner.txhashset.insert(ret).second; + } + } while(!ok); + return ret; + } + + /** Generate a new random NodeId to use as peer. The same NodeId is never returned twice + * (across all Scenarios combined). */ + NodeId NewPeer() + { + bool ok; + NodeId ret; + do { + ret = GetRand(std::numeric_limits::max()); + ok = m_runner.peerset.insert(ret).second; + } while(!ok); + return ret; + } + + int64_t Now() const { return m_now; } +}; + +/** Add to scenario a test with a single tx announced by a single peer. + * + * config is an integer in [0, 32), which controls which variant of the test is used. + */ +void BuildSingleTest(Scenario& scenario, int config) +{ + auto peer = scenario.NewPeer(); + uint256 txhash = scenario.NewTxHash(); + bool immediate = config & 1; + bool preferred = config & 2; + auto delay = immediate ? NO_TIME : RandomTime8s(); + + scenario.SetTestName(strprintf("Single(config=%i)", config)); + + // Receive an announcement, either immediately requestable or delayed. + scenario.ReceivedInv(peer, txhash, preferred, immediate ? MIN_TIME : scenario.Now() + delay); + if (immediate) { + scenario.Check(peer, {txhash}, 1, 0, 0, "s1"); + } else { + scenario.Check(peer, {}, 1, 0, 0, "s2"); + scenario.AdvanceTime(delay - MICROSECOND); + scenario.Check(peer, {}, 1, 0, 0, "s3"); + scenario.AdvanceTime(MICROSECOND); + scenario.Check(peer, {txhash}, 1, 0, 0, "s4"); + } + + if (config >> 3) { // We'll request the transaction + scenario.AdvanceTime(RandomTime8s()); + auto expiry = RandomTime8s(); + scenario.Check(peer, {txhash}, 1, 0, 0, "s5"); + scenario.RequestedTx(peer, txhash, scenario.Now() + expiry); + scenario.Check(peer, {}, 0, 1, 0, "s6"); + + if ((config >> 3) == 1) { // The request will time out + scenario.AdvanceTime(expiry - MICROSECOND); + scenario.Check(peer, {}, 0, 1, 0, "s7"); + scenario.AdvanceTime(MICROSECOND); + scenario.Check(peer, {}, 0, 0, 0, "s8"); + scenario.CheckExpired(peer, txhash); + return; + } else { + scenario.AdvanceTime(int64_t(GetRand(expiry))); + scenario.Check(peer, {}, 0, 1, 0, "s9"); + if ((config >> 3) == 3) { // A response will arrive for the transaction + scenario.ReceivedResponse(peer, txhash); + scenario.Check(peer, {}, 0, 0, 0, "s10"); + return; + } + } + } + + if (config & 4) { // The peer will go offline + scenario.DisconnectedPeer(peer); + } else { // The transaction is no longer needed + scenario.ForgetTxHash(txhash); + } + scenario.Check(peer, {}, 0, 0, 0, "s11"); +} + +/** Add to scenario a test with a single tx announced by two peers, to verify the + * right peer is selected for requests. + * + * config is an integer in [0, 32), which controls which variant of the test is used. + */ +void BuildPriorityTest(Scenario& scenario, int config) +{ + scenario.SetTestName(strprintf("Priority(config=%i)", config)); + + // Two peers. They will announce in order {peer1, peer2}. + auto peer1 = scenario.NewPeer(), peer2 = scenario.NewPeer(); + // Construct a transaction that under random rules would be preferred by peer2 or peer1, + // depending on configuration. + bool prio1 = config & 1; + auto txhash = prio1 ? scenario.NewTxHash({{peer1, peer2}}) : scenario.NewTxHash({{peer2, peer1}}); + bool pref1 = config & 2, pref2 = config & 4; + + scenario.ReceivedInv(peer1, txhash, pref1, MIN_TIME); + scenario.Check(peer1, {txhash}, 1, 0, 0, "p1"); + if (insecure_rand() % 2) { + scenario.AdvanceTime(RandomTime8s()); + scenario.Check(peer1, {txhash}, 1, 0, 0, "p2"); + } + + scenario.ReceivedInv(peer2, txhash, pref2, MIN_TIME); + bool stage2_prio = + // At this point, peer2 will be given priority if: + // - It is preferred and peer1 is not + (pref2 && !pref1) || + // - They're in the same preference class, + // and the randomized priority favors peer2 over peer1. + (pref1 == pref2 && !prio1); + NodeId priopeer = stage2_prio ? peer2 : peer1, otherpeer = stage2_prio ? peer1 : peer2; + scenario.Check(otherpeer, {}, 1, 0, 0, "p3"); + scenario.Check(priopeer, {txhash}, 1, 0, 0, "p4"); + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.Check(otherpeer, {}, 1, 0, 0, "p5"); + scenario.Check(priopeer, {txhash}, 1, 0, 0, "p6"); + + // We possibly request from the selected peer. + if (config & 8) { + scenario.RequestedTx(priopeer, txhash, MAX_TIME); + scenario.Check(priopeer, {}, 0, 1, 0, "p7"); + scenario.Check(otherpeer, {}, 1, 0, 0, "p8"); + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + } + + // The peer which was selected (or requested from) now goes offline, or a NOTFOUND is received from them. + if (config & 16) { + scenario.DisconnectedPeer(priopeer); + } else { + scenario.ReceivedResponse(priopeer, txhash); + } + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.Check(priopeer, {}, 0, 0, !(config & 16), "p8"); + scenario.Check(otherpeer, {txhash}, 1, 0, 0, "p9"); + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + + // Now the other peer goes offline. + scenario.DisconnectedPeer(otherpeer); + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.Check(peer1, {}, 0, 0, 0, "p10"); + scenario.Check(peer2, {}, 0, 0, 0, "p11"); +} + +/** Add to scenario a randomized test in which N peers announce the same transaction, to verify + * the order in which they are requested. */ +void BuildBigPriorityTest(Scenario& scenario, int peers) +{ + scenario.SetTestName(strprintf("BigPriority(peers=%i)", peers)); + + // We will have N peers announce the same transaction. + std::map preferred; + std::vector pref_peers, npref_peers; + int num_pref = GetRand(peers + 1) ; // Some preferred, ... + int num_npref = peers - num_pref; // some not preferred. + for (int i = 0; i < num_pref; ++i) { + pref_peers.push_back(scenario.NewPeer()); + preferred[pref_peers.back()] = true; + } + for (int i = 0; i < num_npref; ++i) { + npref_peers.push_back(scenario.NewPeer()); + preferred[npref_peers.back()] = false; + } + // Make a list of all peers, in order of intended request order (concatenation of pref_peers and npref_peers). + std::vector request_order; + for (int i = 0; i < num_pref; ++i) request_order.push_back(pref_peers[i]); + for (int i = 0; i < num_npref; ++i) request_order.push_back(npref_peers[i]); + + // Determine the announcement order randomly. + std::vector announce_order = request_order; + std::shuffle(announce_order.begin(), announce_order.end(), g_insecure_rand_ctx); + + // Find a gtxid whose txhash prioritization is consistent with the required ordering within pref_peers and + // within npref_peers. + auto txhash = scenario.NewTxHash({pref_peers, npref_peers}); + + // Decide reqtimes in opposite order of the expected request order. This means that as time passes we expect the + // to-be-requested-from-peer will change every time a subsequent reqtime is passed. + std::map reqtimes; + auto reqtime = scenario.Now(); + for (int i = peers - 1; i >= 0; --i) { + reqtime += RandomTime8s(); + reqtimes[request_order[i]] = reqtime; + } + + // Actually announce from all peers simultaneously (but in announce_order). + for (const auto peer : announce_order) { + scenario.ReceivedInv(peer, txhash, preferred[peer], reqtimes[peer]); + } + for (const auto peer : announce_order) { + scenario.Check(peer, {}, 1, 0, 0, "b1"); + } + + // Let time pass and observe the to-be-requested-from peer change, from nonpreferred to preferred, and from + // high priority to low priority within each class. + for (int i = peers - 1; i >= 0; --i) { + scenario.AdvanceTime(reqtimes[request_order[i]] - scenario.Now() - MICROSECOND); + scenario.Check(request_order[i], {}, 1, 0, 0, "b2"); + scenario.AdvanceTime(MICROSECOND); + scenario.Check(request_order[i], {txhash}, 1, 0, 0, "b3"); + } + + // Peers now in random order go offline, or send NOTFOUNDs. At every point in time the new to-be-requested-from + // peer should be the best remaining one, so verify this after every response. + for (int i = 0; i < peers; ++i) { + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + const int pos = GetRand(request_order.size()); + const auto peer = request_order[pos]; + request_order.erase(request_order.begin() + pos); + if (insecure_rand() % 2) { + scenario.DisconnectedPeer(peer); + scenario.Check(peer, {}, 0, 0, 0, "b4"); + } else { + scenario.ReceivedResponse(peer, txhash); + scenario.Check(peer, {}, 0, 0, request_order.size() > 0, "b5"); + } + if (request_order.size()) { + scenario.Check(request_order[0], {txhash}, 1, 0, 0, "b6"); + } + } + + // Everything is gone in the end. + for (const auto peer : announce_order) { + scenario.Check(peer, {}, 0, 0, 0, "b7"); + } +} + +/** Add to scenario a test with one peer announcing two transactions, to verify they are + * fetched in announcement order. + * + * config is an integer in [0, 4) inclusive, and selects the variant of the test. + */ +void BuildRequestOrderTest(Scenario& scenario, int config) +{ + scenario.SetTestName(strprintf("RequestOrder(config=%i)", config)); + + auto peer = scenario.NewPeer(); + auto txhash1 = scenario.NewTxHash(); + auto txhash2 = scenario.NewTxHash(); + + auto reqtime2 = scenario.Now() + RandomTime8s(); + auto reqtime1 = reqtime2 + RandomTime8s(); + + scenario.ReceivedInv(peer, txhash1, config & 1, reqtime1); + // Simulate time going backwards by giving the second announcement an earlier reqtime. + scenario.ReceivedInv(peer, txhash2, config & 2, reqtime2); + + scenario.AdvanceTime(reqtime2 - MICROSECOND - scenario.Now()); + scenario.Check(peer, {}, 2, 0, 0, "o1"); + scenario.AdvanceTime(MICROSECOND); + scenario.Check(peer, {txhash2}, 2, 0, 0, "o2"); + scenario.AdvanceTime(reqtime1 - MICROSECOND - scenario.Now()); + scenario.Check(peer, {txhash2}, 2, 0, 0, "o3"); + scenario.AdvanceTime(MICROSECOND); + // Even with time going backwards in between announcements, the return value of GetRequestable is in + // announcement order. + scenario.Check(peer, {txhash1, txhash2}, 2, 0, 0, "o4"); + + scenario.DisconnectedPeer(peer); + scenario.Check(peer, {}, 0, 0, 0, "o5"); +} + +// /** Add to scenario a test that verifies behavior related to both txid and wtxid with the same +// * hash being announced. +// * +// * config is an integer in [0, 4) inclusive, and selects the variant of the test used. +// */ +// void BuildWtxidTest(Scenario& scenario, int config) +// { +// scenario.SetTestName(strprintf("Wtxid(config=%i)", config)); +// +// auto peerT = scenario.NewPeer(); +// auto peerW = scenario.NewPeer(); +// auto txhash = scenario.NewTxHash(); +// GenTxid txid{false, txhash}; +// GenTxid wtxid{true, txhash}; +// +// auto reqtimeT = InsecureRandBool() ? MIN_TIME : scenario.Now() + RandomTime8s(); +// auto reqtimeW = InsecureRandBool() ? MIN_TIME : scenario.Now() + RandomTime8s(); +// +// // Announce txid first or wtxid first. +// if (config & 1) { +// scenario.ReceivedInv(peerT, txid, config & 2, reqtimeT); +// if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s()); +// scenario.ReceivedInv(peerW, wtxid, !(config & 2), reqtimeW); +// } else { +// scenario.ReceivedInv(peerW, wtxid, !(config & 2), reqtimeW); +// if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s()); +// scenario.ReceivedInv(peerT, txid, config & 2, reqtimeT); +// } +// +// // Let time pass if needed, and check that the preferred announcement (txid or wtxid) +// // is correctly to-be-requested (and with the correct wtxidness). +// auto max_reqtime = std::max(reqtimeT, reqtimeW); +// if (max_reqtime > scenario.Now()) scenario.AdvanceTime(max_reqtime - scenario.Now()); +// if (config & 2) { +// scenario.Check(peerT, {txid}, 1, 0, 0, "w1"); +// scenario.Check(peerW, {}, 1, 0, 0, "w2"); +// } else { +// scenario.Check(peerT, {}, 1, 0, 0, "w3"); +// scenario.Check(peerW, {wtxid}, 1, 0, 0, "w4"); +// } +// +// // Let the preferred announcement be requested. It's not going to be delivered. +// auto expiry = RandomTime8s(); +// if (config & 2) { +// scenario.RequestedTx(peerT, txid.GetHash(), scenario.Now() + expiry); +// scenario.Check(peerT, {}, 0, 1, 0, "w5"); +// scenario.Check(peerW, {}, 1, 0, 0, "w6"); +// } else { +// scenario.RequestedTx(peerW, wtxid.GetHash(), scenario.Now() + expiry); +// scenario.Check(peerT, {}, 1, 0, 0, "w7"); +// scenario.Check(peerW, {}, 0, 1, 0, "w8"); +// } +// +// // After reaching expiration time of the preferred announcement, verify that the +// // remaining one is requestable +// scenario.AdvanceTime(expiry); +// if (config & 2) { +// scenario.Check(peerT, {}, 0, 0, 1, "w9"); +// scenario.Check(peerW, {wtxid}, 1, 0, 0, "w10"); +// } else { +// scenario.Check(peerT, {txid}, 1, 0, 0, "w11"); +// scenario.Check(peerW, {}, 0, 0, 1, "w12"); +// } +// +// // If a good transaction with either that hash as wtxid or txid arrives, both +// // announcements are gone. +// if (InsecureRandBool()) scenario.AdvanceTime(RandomTime8s()); +// scenario.ForgetTxHash(txhash); +// scenario.Check(peerT, {}, 0, 0, 0, "w13"); +// scenario.Check(peerW, {}, 0, 0, 0, "w14"); +// } + +/** Add to scenario a test that exercises clocks that go backwards. */ +void BuildTimeBackwardsTest(Scenario& scenario) +{ + auto peer1 = scenario.NewPeer(); + auto peer2 = scenario.NewPeer(); + auto txhash = scenario.NewTxHash({{peer1, peer2}}); + + // Announce from peer2. + auto reqtime = scenario.Now() + RandomTime8s(); + scenario.ReceivedInv(peer2, txhash, true, reqtime); + scenario.Check(peer2, {}, 1, 0, 0, "r1"); + scenario.AdvanceTime(reqtime - scenario.Now()); + scenario.Check(peer2, {txhash}, 1, 0, 0, "r2"); + // Check that if the clock goes backwards by 1us, the transaction would stop being requested. + scenario.Check(peer2, {}, 1, 0, 0, "r3", -MICROSECOND); + // But it reverts to being requested if time goes forward again. + scenario.Check(peer2, {txhash}, 1, 0, 0, "r4"); + + // Announce from peer1. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.ReceivedInv(peer1, txhash, true, MAX_TIME); + scenario.Check(peer2, {txhash}, 1, 0, 0, "r5"); + scenario.Check(peer1, {}, 1, 0, 0, "r6"); + + // Request from peer1. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + auto expiry = scenario.Now() + RandomTime8s(); + scenario.RequestedTx(peer1, txhash, expiry); + scenario.Check(peer1, {}, 0, 1, 0, "r7"); + scenario.Check(peer2, {}, 1, 0, 0, "r8"); + + // Expiration passes. + scenario.AdvanceTime(expiry - scenario.Now()); + scenario.Check(peer1, {}, 0, 0, 1, "r9"); + scenario.Check(peer2, {txhash}, 1, 0, 0, "r10"); // Request goes back to peer2. + scenario.CheckExpired(peer1, txhash); + scenario.Check(peer1, {}, 0, 0, 1, "r11", -MICROSECOND); // Going back does not unexpire. + scenario.Check(peer2, {txhash}, 1, 0, 0, "r12", -MICROSECOND); + + // Peer2 goes offline, meaning no viable announcements remain. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.DisconnectedPeer(peer2); + scenario.Check(peer1, {}, 0, 0, 0, "r13"); + scenario.Check(peer2, {}, 0, 0, 0, "r14"); +} + +/** Add to scenario a test that involves RequestedTx() calls for txhashes not returned by GetRequestable. */ +void BuildWeirdRequestsTest(Scenario& scenario) +{ + auto peer1 = scenario.NewPeer(); + auto peer2 = scenario.NewPeer(); + auto txhash1 = scenario.NewTxHash({{peer1, peer2}}); + auto txhash2 = scenario.NewTxHash({{peer2, peer1}}); + + // Announce gtxid1 by peer1. + scenario.ReceivedInv(peer1, txhash1, true, MIN_TIME); + scenario.Check(peer1, {txhash1}, 1, 0, 0, "q1"); + + // Announce gtxid2 by peer2. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.ReceivedInv(peer2, txhash2, true, MIN_TIME); + scenario.Check(peer1, {txhash1}, 1, 0, 0, "q2"); + scenario.Check(peer2, {txhash2}, 1, 0, 0, "q3"); + + // We request gtxid2 from *peer1* - no effect. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.RequestedTx(peer1, txhash2, MAX_TIME); + scenario.Check(peer1, {txhash1}, 1, 0, 0, "q4"); + scenario.Check(peer2, {txhash2}, 1, 0, 0, "q5"); + + // Now request gtxid1 from peer1 - marks it as REQUESTED. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + auto expiryA = scenario.Now() + RandomTime8s(); + scenario.RequestedTx(peer1, txhash1, expiryA); + scenario.Check(peer1, {}, 0, 1, 0, "q6"); + scenario.Check(peer2, {txhash2}, 1, 0, 0, "q7"); + + // Request it a second time - nothing happens, as it's already REQUESTED. + auto expiryB = expiryA + RandomTime8s(); + scenario.RequestedTx(peer1, txhash1, expiryB); + scenario.Check(peer1, {}, 0, 1, 0, "q8"); + scenario.Check(peer2, {txhash2}, 1, 0, 0, "q9"); + + // Also announce gtxid1 from peer2 now, so that the txhash isn't forgotten when the peer1 request expires. + scenario.ReceivedInv(peer2, txhash1, true, MIN_TIME); + scenario.Check(peer1, {}, 0, 1, 0, "q10"); + scenario.Check(peer2, {txhash2}, 2, 0, 0, "q11"); + + // When reaching expiryA, it expires (not expiryB, which is later). + scenario.AdvanceTime(expiryA - scenario.Now()); + scenario.Check(peer1, {}, 0, 0, 1, "q12"); + scenario.Check(peer2, {txhash2, txhash1}, 2, 0, 0, "q13"); + scenario.CheckExpired(peer1, txhash1); + + // Requesting it yet again from peer1 doesn't do anything, as it's already COMPLETED. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.RequestedTx(peer1, txhash1, MAX_TIME); + scenario.Check(peer1, {}, 0, 0, 1, "q14"); + scenario.Check(peer2, {txhash2, txhash1}, 2, 0, 0, "q15"); + + // Now announce gtxid2 from peer1. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.ReceivedInv(peer1, txhash2, true, MIN_TIME); + scenario.Check(peer1, {}, 1, 0, 1, "q16"); + scenario.Check(peer2, {txhash2, txhash1}, 2, 0, 0, "q17"); + + // And request it from peer1 (weird as peer2 has the preference). + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.RequestedTx(peer1, txhash2, MAX_TIME); + scenario.Check(peer1, {}, 0, 1, 1, "q18"); + scenario.Check(peer2, {txhash1}, 2, 0, 0, "q19"); + + // If peer2 now (normally) requests gtxid2, the existing request by peer1 becomes COMPLETED. + if (insecure_rand() % 2) scenario.AdvanceTime(RandomTime8s()); + scenario.RequestedTx(peer2, txhash2, MAX_TIME); + scenario.Check(peer1, {}, 0, 0, 2, "q20"); + scenario.Check(peer2, {txhash1}, 1, 1, 0, "q21"); + + // If peer2 goes offline, no viable announcements remain. + scenario.DisconnectedPeer(peer2); + scenario.Check(peer1, {}, 0, 0, 0, "q22"); + scenario.Check(peer2, {}, 0, 0, 0, "q23"); +} + +void TestInterleavedScenarios() +{ + // Create a list of functions which add tests to scenarios. + std::vector> builders; + // Add instances of every test, for every configuration. + for (int n = 0; n < 64; ++n) { + builders.emplace_back([n](Scenario& scenario){ BuildRequestOrderTest(scenario, n & 3); }); + builders.emplace_back([n](Scenario& scenario){ BuildSingleTest(scenario, n & 31); }); + builders.emplace_back([n](Scenario& scenario){ BuildPriorityTest(scenario, n & 31); }); + builders.emplace_back([n](Scenario& scenario){ BuildBigPriorityTest(scenario, (n & 7) + 1); }); + builders.emplace_back([](Scenario& scenario){ BuildTimeBackwardsTest(scenario); }); + builders.emplace_back([](Scenario& scenario){ BuildWeirdRequestsTest(scenario); }); + } + // Randomly shuffle all those functions. + std::shuffle(builders.begin(), builders.end(), g_insecure_rand_ctx); + + Runner runner; + auto starttime = RandomTime1y(); + // Construct many scenarios, and run (up to) 10 randomly-chosen tests consecutively in each. + while (builders.size()) { + // Introduce some variation in the start time of each scenario, so they don't all start off + // concurrently, but get a more random interleaving. + auto scenario_start = starttime + RandomTime8s() + RandomTime8s() + RandomTime8s(); + Scenario scenario(runner, scenario_start); + for (int j = 0; builders.size() && j < 10; ++j) { + builders.back()(scenario); + builders.pop_back(); + } + } + // Sort all the actions from all those scenarios chronologically, resulting in the actions from + // distinct scenarios to become interleaved. Use stable_sort so that actions from one scenario + // aren't reordered w.r.t. each other. + std::stable_sort(runner.actions.begin(), runner.actions.end(), [](const Action& a1, const Action& a2) { + return a1.first < a2.first; + }); + + // Run all actions from all scenarios, in order. + for (auto& action : runner.actions) { + action.second(); + } + + BOOST_CHECK_EQUAL(runner.txrequest.Size(), 0U); + BOOST_CHECK(runner.expired.empty()); +} + +} // namespace + +BOOST_AUTO_TEST_CASE(TxRequestTest) +{ + for (int i = 0; i < 5; ++i) { + TestInterleavedScenarios(); + } +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/txrequest.cpp b/src/txrequest.cpp new file mode 100644 index 00000000000..9eda4187503 --- /dev/null +++ b/src/txrequest.cpp @@ -0,0 +1,738 @@ +// Copyright (c) 2020 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include "hash.h" +#include "net.h" +#include "random.h" +#include "uint256.h" +#include "utilmemory.h" + +#include +#include + +#include +#include +#include + +#include + +namespace { + +/** The various states a (txhash,peer) pair can be in. + * + * Note that CANDIDATE is split up into 3 substates (DELAYED, BEST, READY), allowing more efficient implementation. + * Also note that the sorting order of ByTxHashView relies on the specific order of values in this enum. + * + * Expected behaviour is: + * - When first announced by a peer, the state is CANDIDATE_DELAYED until reqtime is reached. + * - Announcements that have reached their reqtime but not been requested will be either CANDIDATE_READY or + * CANDIDATE_BEST. Neither of those has an expiration time; they remain in that state until they're requested or + * no longer needed. CANDIDATE_READY announcements are promoted to CANDIDATE_BEST when they're the best one left. + * - When requested, an announcement will be in state REQUESTED until expiry is reached. + * - If expiry is reached, or the peer replies to the request (either with NOTFOUND or the tx), the state becomes + * COMPLETED. + */ +enum class State : uint8_t { + /** A CANDIDATE announcement whose reqtime is in the future. */ + CANDIDATE_DELAYED, + /** A CANDIDATE announcement that's not CANDIDATE_DELAYED or CANDIDATE_BEST. */ + CANDIDATE_READY, + /** The best CANDIDATE for a given txhash; only if there is no REQUESTED announcement already for that txhash. + * The CANDIDATE_BEST is the highest-priority announcement among all CANDIDATE_READY (and _BEST) ones for that + * txhash. */ + CANDIDATE_BEST, + /** A REQUESTED announcement. */ + REQUESTED, + /** A COMPLETED announcement. */ + COMPLETED, +}; + +//! Type alias for sequence numbers. +using SequenceNumber = uint64_t; + +/** An announcement. This is the data we track for each txid or wtxid that is announced to us by each peer. */ +struct Announcement { + /** Txid or wtxid that was announced. */ + const uint256 m_txhash; + /** For CANDIDATE_{DELAYED,BEST,READY} the reqtime; for REQUESTED the expiry. */ + int64_t m_time; + /** What peer the request was from. */ + const NodeId m_peer; + /** What sequence number this announcement has. */ + const SequenceNumber m_sequence : 59; + /** Whether the request is preferred. */ + const bool m_preferred : 1; + + /** What state this announcement is in. */ + State m_state : 3; + + /** Whether this announcement is selected. There can be at most 1 selected peer per txhash. */ + bool IsSelected() const + { + return m_state == State::CANDIDATE_BEST || m_state == State::REQUESTED; + } + + /** Whether this announcement is waiting for a certain time to pass. */ + bool IsWaiting() const + { + return m_state == State::REQUESTED || m_state == State::CANDIDATE_DELAYED; + } + + /** Whether this announcement can feasibly be selected if the current IsSelected() one disappears. */ + bool IsSelectable() const + { + return m_state == State::CANDIDATE_READY || m_state == State::CANDIDATE_BEST; + } + + /** Construct a new announcement from scratch, initially in CANDIDATE_DELAYED state. */ + Announcement(const uint256& txhash, NodeId peer, bool preferred, int64_t reqtime, + SequenceNumber sequence) : + m_txhash(txhash), m_time(reqtime), m_peer(peer), m_sequence(sequence), m_preferred(preferred), + m_state(State::CANDIDATE_DELAYED) {} +}; + +//! Type alias for priorities. +using Priority = uint64_t; + +/** A functor with embedded salt that computes priority of an announcement. + * + * Higher priorities are selected first. + */ +class PriorityComputer { + const uint64_t m_k0, m_k1; +public: + explicit PriorityComputer(bool deterministic) : + m_k0{deterministic ? 0 : GetRand(0xFFFFFFFFFFFFFFFF)}, + m_k1{deterministic ? 0 : GetRand(0xFFFFFFFFFFFFFFFF)} {} + + Priority operator()(const uint256& txhash, NodeId peer, bool preferred) const + { + uint64_t low_bits = CSipHasher(m_k0, m_k1).Write(txhash.begin(), txhash.size()).Write(peer).Finalize() >> 1; + return low_bits | uint64_t{preferred} << 63; + } + + Priority operator()(const Announcement& ann) const + { + return operator()(ann.m_txhash, ann.m_peer, ann.m_preferred); + } +}; + +// Definitions for the 3 indexes used in the main data structure. +// +// Each index has a By* type to identify it, a By*View data type to represent the view of announcement it is sorted +// by, and an By*ViewExtractor type to convert an announcement into the By*View type. +// See https://www.boost.org/doc/libs/1_58_0/libs/multi_index/doc/reference/key_extraction.html#key_extractors +// for more information about the key extraction concept. + +// The ByPeer index is sorted by (peer, state == CANDIDATE_BEST, txhash) +// +// Uses: +// * Looking up existing announcements by peer/txhash, by checking both (peer, false, txhash) and +// (peer, true, txhash). +// * Finding all CANDIDATE_BEST announcements for a given peer in GetRequestable. +struct ByPeer {}; +using ByPeerView = std::tuple; +struct ByPeerViewExtractor +{ + using result_type = ByPeerView; + result_type operator()(const Announcement& ann) const + { + return ByPeerView{ann.m_peer, ann.m_state == State::CANDIDATE_BEST, ann.m_txhash}; + } +}; + +// The ByTxHash index is sorted by (txhash, state, priority). +// +// Note: priority == 0 whenever state != CANDIDATE_READY. +// +// Uses: +// * Deleting all announcements with a given txhash in ForgetTxHash. +// * Finding the best CANDIDATE_READY to convert to CANDIDATE_BEST, when no other CANDIDATE_READY or REQUESTED +// announcement exists for that txhash. +// * Determining when no more non-COMPLETED announcements for a given txhash exist, so the COMPLETED ones can be +// deleted. +struct ByTxHash {}; +using ByTxHashView = std::tuple; +class ByTxHashViewExtractor { + const PriorityComputer& m_computer; +public: + ByTxHashViewExtractor(const PriorityComputer& computer) : m_computer(computer) {} + using result_type = ByTxHashView; + result_type operator()(const Announcement& ann) const + { + const Priority prio = (ann.m_state == State::CANDIDATE_READY) ? m_computer(ann) : 0; + return ByTxHashView{ann.m_txhash, ann.m_state, prio}; + } +}; + +enum class WaitState { + //! Used for announcements that need efficient testing of "is their timestamp in the future?". + FUTURE_EVENT, + //! Used for announcements whose timestamp is not relevant. + NO_EVENT, + //! Used for announcements that need efficient testing of "is their timestamp in the past?". + PAST_EVENT, +}; + +WaitState GetWaitState(const Announcement& ann) +{ + if (ann.IsWaiting()) return WaitState::FUTURE_EVENT; + if (ann.IsSelectable()) return WaitState::PAST_EVENT; + return WaitState::NO_EVENT; +} + +// The ByTime index is sorted by (wait_state, time). +// +// All announcements with a timestamp in the future can be found by iterating the index forward from the beginning. +// All announcements with a timestamp in the past can be found by iterating the index backwards from the end. +// +// Uses: +// * Finding CANDIDATE_DELAYED announcements whose reqtime has passed, and REQUESTED announcements whose expiry has +// passed. +// * Finding CANDIDATE_READY/BEST announcements whose reqtime is in the future (when the clock time went backwards). +struct ByTime {}; +using ByTimeView = std::pair; +struct ByTimeViewExtractor +{ + using result_type = ByTimeView; + result_type operator()(const Announcement& ann) const + { + return ByTimeView{GetWaitState(ann), ann.m_time}; + } +}; + +/** Data type for the main data structure (Announcement objects with ByPeer/ByTxHash/ByTime indexes). */ +using Index = boost::multi_index_container< + Announcement, + boost::multi_index::indexed_by< + boost::multi_index::ordered_unique, ByPeerViewExtractor>, + boost::multi_index::ordered_non_unique, ByTxHashViewExtractor>, + boost::multi_index::ordered_non_unique, ByTimeViewExtractor> + > +>; + +/** Helper type to simplify syntax of iterator types. */ +template +using Iter = typename Index::index::type::iterator; + +/** Per-peer statistics object. */ +struct PeerInfo { + size_t m_total = 0; //!< Total number of announcements for this peer. + size_t m_completed = 0; //!< Number of COMPLETED announcements for this peer. + size_t m_requested = 0; //!< Number of REQUESTED announcements for this peer. +}; + +/** Per-txhash statistics object. Only used for sanity checking. */ +struct TxHashInfo +{ + //! Number of CANDIDATE_DELAYED announcements for this txhash. + size_t m_candidate_delayed = 0; + //! Number of CANDIDATE_READY announcements for this txhash. + size_t m_candidate_ready = 0; + //! Number of CANDIDATE_BEST announcements for this txhash (at most one). + size_t m_candidate_best = 0; + //! Number of REQUESTED announcements for this txhash (at most one; mutually exclusive with CANDIDATE_BEST). + size_t m_requested = 0; + //! The priority of the CANDIDATE_BEST announcement if one exists, or max() otherwise. + Priority m_priority_candidate_best = std::numeric_limits::max(); + //! The highest priority of all CANDIDATE_READY announcements (or min() if none exist). + Priority m_priority_best_candidate_ready = std::numeric_limits::min(); + //! All peers we have an announcement for this txhash for. + std::vector m_peers; +}; + +/** Compare two PeerInfo objects. Only used for sanity checking. */ +bool operator==(const PeerInfo& a, const PeerInfo& b) +{ + return std::tie(a.m_total, a.m_completed, a.m_requested) == + std::tie(b.m_total, b.m_completed, b.m_requested); +}; + +/** (Re)compute the PeerInfo map from the index. Only used for sanity checking. */ +std::unordered_map RecomputePeerInfo(const Index& index) +{ + std::unordered_map ret; + for (const Announcement& ann : index) { + PeerInfo& info = ret[ann.m_peer]; + ++info.m_total; + info.m_requested += (ann.m_state == State::REQUESTED); + info.m_completed += (ann.m_state == State::COMPLETED); + } + return ret; +} + +/** Compute the TxHashInfo map. Only used for sanity checking. */ +std::map ComputeTxHashInfo(const Index& index, const PriorityComputer& computer) +{ + std::map ret; + for (const Announcement& ann : index) { + TxHashInfo& info = ret[ann.m_txhash]; + // Classify how many announcements of each state we have for this txhash. + info.m_candidate_delayed += (ann.m_state == State::CANDIDATE_DELAYED); + info.m_candidate_ready += (ann.m_state == State::CANDIDATE_READY); + info.m_candidate_best += (ann.m_state == State::CANDIDATE_BEST); + info.m_requested += (ann.m_state == State::REQUESTED); + // And track the priority of the best CANDIDATE_READY/CANDIDATE_BEST announcements. + if (ann.m_state == State::CANDIDATE_BEST) { + info.m_priority_candidate_best = computer(ann); + } + if (ann.m_state == State::CANDIDATE_READY) { + info.m_priority_best_candidate_ready = std::max(info.m_priority_best_candidate_ready, computer(ann)); + } + // Also keep track of which peers this txhash has an announcement for (so we can detect duplicates). + info.m_peers.push_back(ann.m_peer); + } + return ret; +} + +} // namespace + +/** Actual implementation for TxRequestTracker's data structure. */ +class TxRequestTracker::Impl { + //! The current sequence number. Increases for every announcement. This is used to sort txhashes returned by + //! GetRequestable in announcement order. + SequenceNumber m_current_sequence{0}; + + //! This tracker's priority computer. + const PriorityComputer m_computer; + + //! This tracker's main data structure. See SanityCheck() for the invariants that apply to it. + Index m_index; + + //! Map with this tracker's per-peer statistics. + std::unordered_map m_peerinfo; + +public: + void SanityCheck() const + { + // Recompute m_peerdata from m_index. This verifies the data in it as it should just be caching statistics + // on m_index. It also verifies the invariant that no PeerInfo announcements with m_total==0 exist. + assert(m_peerinfo == RecomputePeerInfo(m_index)); + + // Calculate per-txhash statistics from m_index, and validate invariants. + for (auto& item : ComputeTxHashInfo(m_index, m_computer)) { + TxHashInfo& info = item.second; + + // Cannot have only COMPLETED peer (txhash should have been forgotten already) + assert(info.m_candidate_delayed + info.m_candidate_ready + info.m_candidate_best + info.m_requested > 0); + + // Can have at most 1 CANDIDATE_BEST/REQUESTED peer + assert(info.m_candidate_best + info.m_requested <= 1); + + // If there are any CANDIDATE_READY announcements, there must be exactly one CANDIDATE_BEST or REQUESTED + // announcement. + if (info.m_candidate_ready > 0) { + assert(info.m_candidate_best + info.m_requested == 1); + } + + // If there is both a CANDIDATE_READY and a CANDIDATE_BEST announcement, the CANDIDATE_BEST one must be + // at least as good (equal or higher priority) as the best CANDIDATE_READY. + if (info.m_candidate_ready && info.m_candidate_best) { + assert(info.m_priority_candidate_best >= info.m_priority_best_candidate_ready); + } + + // No txhash can have been announced by the same peer twice. + std::sort(info.m_peers.begin(), info.m_peers.end()); + assert(std::adjacent_find(info.m_peers.begin(), info.m_peers.end()) == info.m_peers.end()); + } + } + + void PostGetRequestableSanityCheck(int64_t now) const + { + for (const Announcement& ann : m_index) { + if (ann.IsWaiting()) { + // REQUESTED and CANDIDATE_DELAYED must have a time in the future (they should have been converted + // to COMPLETED/CANDIDATE_READY respectively). + assert(ann.m_time > now); + } else if (ann.IsSelectable()) { + // CANDIDATE_READY and CANDIDATE_BEST cannot have a time in the future (they should have remained + // CANDIDATE_DELAYED, or should have been converted back to it if time went backwards). + assert(ann.m_time <= now); + } + } + } + +private: + //! Wrapper around Index::...::erase that keeps m_peerinfo up to date. + template + Iter Erase(Iter it) + { + auto peerit = m_peerinfo.find(it->m_peer); + peerit->second.m_completed -= it->m_state == State::COMPLETED; + peerit->second.m_requested -= it->m_state == State::REQUESTED; + if (--peerit->second.m_total == 0) m_peerinfo.erase(peerit); + return m_index.get().erase(it); + } + + //! Wrapper around Index::...::modify that keeps m_peerinfo up to date. + template + void Modify(Iter it, Modifier modifier) + { + auto peerit = m_peerinfo.find(it->m_peer); + peerit->second.m_completed -= it->m_state == State::COMPLETED; + peerit->second.m_requested -= it->m_state == State::REQUESTED; + m_index.get().modify(it, std::move(modifier)); + peerit->second.m_completed += it->m_state == State::COMPLETED; + peerit->second.m_requested += it->m_state == State::REQUESTED; + } + + //! Convert a CANDIDATE_DELAYED announcement into a CANDIDATE_READY. If this makes it the new best + //! CANDIDATE_READY (and no REQUESTED exists) and better than the CANDIDATE_BEST (if any), it becomes the new + //! CANDIDATE_BEST. + void PromoteCandidateReady(Iter it) + { + assert(it != m_index.get().end()); + assert(it->m_state == State::CANDIDATE_DELAYED); + // Convert CANDIDATE_DELAYED to CANDIDATE_READY first. + Modify(it, [](Announcement& ann){ ann.m_state = State::CANDIDATE_READY; }); + // The following code relies on the fact that the ByTxHash is sorted by txhash, and then by state (first + // _DELAYED, then _READY, then _BEST/REQUESTED). Within the _READY announcements, the best one (highest + // priority) comes last. Thus, if an existing _BEST exists for the same txhash that this announcement may + // be preferred over, it must immediately follow the newly created _READY. + auto it_next = std::next(it); + if (it_next == m_index.get().end() || it_next->m_txhash != it->m_txhash || + it_next->m_state == State::COMPLETED) { + // This is the new best CANDIDATE_READY, and there is no IsSelected() announcement for this txhash + // already. + Modify(it, [](Announcement& ann){ ann.m_state = State::CANDIDATE_BEST; }); + } else if (it_next->m_state == State::CANDIDATE_BEST) { + Priority priority_old = m_computer(*it_next); + Priority priority_new = m_computer(*it); + if (priority_new > priority_old) { + // There is a CANDIDATE_BEST announcement already, but this one is better. + Modify(it_next, [](Announcement& ann){ ann.m_state = State::CANDIDATE_READY; }); + Modify(it, [](Announcement& ann){ ann.m_state = State::CANDIDATE_BEST; }); + } + } + } + + //! Change the state of an announcement to something non-IsSelected(). If it was IsSelected(), the next best + //! announcement will be marked CANDIDATE_BEST. + void ChangeAndReselect(Iter it, State new_state) + { + assert(new_state == State::COMPLETED || new_state == State::CANDIDATE_DELAYED); + assert(it != m_index.get().end()); + if (it->IsSelected() && it != m_index.get().begin()) { + auto it_prev = std::prev(it); + // The next best CANDIDATE_READY, if any, immediately precedes the REQUESTED or CANDIDATE_BEST + // announcement in the ByTxHash index. + if (it_prev->m_txhash == it->m_txhash && it_prev->m_state == State::CANDIDATE_READY) { + // If one such CANDIDATE_READY exists (for this txhash), convert it to CANDIDATE_BEST. + Modify(it_prev, [](Announcement& ann){ ann.m_state = State::CANDIDATE_BEST; }); + } + } + Modify(it, [new_state](Announcement& ann){ ann.m_state = new_state; }); + } + + //! Check if 'it' is the only announcement for a given txhash that isn't COMPLETED. + bool IsOnlyNonCompleted(Iter it) + { + assert(it != m_index.get().end()); + assert(it->m_state != State::COMPLETED); // Not allowed to call this on COMPLETED announcements. + + // This announcement has a predecessor that belongs to the same txhash. Due to ordering, and the + // fact that 'it' is not COMPLETED, its predecessor cannot be COMPLETED here. + if (it != m_index.get().begin() && std::prev(it)->m_txhash == it->m_txhash) return false; + + // This announcement has a successor that belongs to the same txhash, and is not COMPLETED. + if (std::next(it) != m_index.get().end() && std::next(it)->m_txhash == it->m_txhash && + std::next(it)->m_state != State::COMPLETED) return false; + + return true; + } + + /** Convert any announcement to a COMPLETED one. If there are no non-COMPLETED announcements left for this + * txhash, they are deleted. If this was a REQUESTED announcement, and there are other CANDIDATEs left, the + * best one is made CANDIDATE_BEST. Returns whether the announcement still exists. */ + bool MakeCompleted(Iter it) + { + assert(it != m_index.get().end()); + + // Nothing to be done if it's already COMPLETED. + if (it->m_state == State::COMPLETED) return true; + + if (IsOnlyNonCompleted(it)) { + // This is the last non-COMPLETED announcement for this txhash. Delete all. + uint256 txhash = it->m_txhash; + do { + it = Erase(it); + } while (it != m_index.get().end() && it->m_txhash == txhash); + return false; + } + + // Mark the announcement COMPLETED, and select the next best announcement (the first CANDIDATE_READY) if + // needed. + ChangeAndReselect(it, State::COMPLETED); + + return true; + } + + //! Make the data structure consistent with a given point in time: + //! - REQUESTED annoucements with expiry <= now are turned into COMPLETED. + //! - CANDIDATE_DELAYED announcements with reqtime <= now are turned into CANDIDATE_{READY,BEST}. + //! - CANDIDATE_{READY,BEST} announcements with reqtime > now are turned into CANDIDATE_DELAYED. + void SetTimePoint(int64_t now, std::vector>* expired) + { + if (expired) expired->clear(); + + // Iterate over all CANDIDATE_DELAYED and REQUESTED from old to new, as long as they're in the past, + // and convert them to CANDIDATE_READY and COMPLETED respectively. + while (!m_index.empty()) { + auto it = m_index.get().begin(); + if (it->m_state == State::CANDIDATE_DELAYED && it->m_time <= now) { + PromoteCandidateReady(m_index.project(it)); + } else if (it->m_state == State::REQUESTED && it->m_time <= now) { + if (expired) expired->emplace_back(it->m_peer, it->m_txhash); + MakeCompleted(m_index.project(it)); + } else { + break; + } + } + + while (!m_index.empty()) { + // If time went backwards, we may need to demote CANDIDATE_BEST and CANDIDATE_READY announcements back + // to CANDIDATE_DELAYED. This is an unusual edge case, and unlikely to matter in production. However, + // it makes it much easier to specify and test TxRequestTracker::Impl's behaviour. + auto it = std::prev(m_index.get().end()); + if (it->IsSelectable() && it->m_time > now) { + ChangeAndReselect(m_index.project(it), State::CANDIDATE_DELAYED); + } else { + break; + } + } + } + +public: + Impl(bool deterministic) : + m_computer(deterministic), + // Explicitly initialize m_index as we need to pass a reference to m_computer to ByTxHashViewExtractor. + m_index(boost::make_tuple( + boost::make_tuple(ByPeerViewExtractor(), std::less()), + boost::make_tuple(ByTxHashViewExtractor(m_computer), std::less()), + boost::make_tuple(ByTimeViewExtractor(), std::less()) + )) {} + + // Disable copying and assigning (a default copy won't work due the stateful ByTxHashViewExtractor). + Impl(const Impl&) = delete; + Impl& operator=(const Impl&) = delete; + + void DisconnectedPeer(NodeId peer) + { + auto& index = m_index.get(); + auto it = index.lower_bound(ByPeerView{peer, false, uint256::ZERO}); + while (it != index.end() && it->m_peer == peer) { + // Check what to continue with after this iteration. 'it' will be deleted in what follows, so we need to + // decide what to continue with afterwards. There are a number of cases to consider: + // - std::next(it) is end() or belongs to a different peer. In that case, this is the last iteration + // of the loop (denote this by setting it_next to end()). + // - 'it' is not the only non-COMPLETED announcement for its txhash. This means it will be deleted, but + // no other Announcement objects will be modified. Continue with std::next(it) if it belongs to the + // same peer, but decide this ahead of time (as 'it' may change position in what follows). + // - 'it' is the only non-COMPLETED announcement for its txhash. This means it will be deleted along + // with all other announcements for the same txhash - which may include std::next(it). However, other + // than 'it', no announcements for the same peer can be affected (due to (peer, txhash) uniqueness). + // In other words, the situation where std::next(it) is deleted can only occur if std::next(it) + // belongs to a different peer but the same txhash as 'it'. This is covered by the first bulletpoint + // already, and we'll have set it_next to end(). + auto it_next = (std::next(it) == index.end() || std::next(it)->m_peer != peer) ? index.end() : + std::next(it); + // If the announcement isn't already COMPLETED, first make it COMPLETED (which will mark other + // CANDIDATEs as CANDIDATE_BEST, or delete all of a txhash's announcements if no non-COMPLETED ones are + // left). + if (MakeCompleted(m_index.project(it))) { + // Then actually delete the announcement (unless it was already deleted by MakeCompleted). + Erase(it); + } + it = it_next; + } + } + + void ForgetTxHash(const uint256& txhash) + { + auto it = m_index.get().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0}); + while (it != m_index.get().end() && it->m_txhash == txhash) { + it = Erase(it); + } + } + + void ReceivedInv(NodeId peer, const uint256& txhash, bool preferred, int64_t reqtime) + { + // Bail out if we already have a CANDIDATE_BEST announcement for this (txhash, peer) combination. The case + // where there is a non-CANDIDATE_BEST announcement already will be caught by the uniqueness property of the + // ByPeer index when we try to emplace the new object below. + if (m_index.get().count(ByPeerView{peer, true, txhash})) return; + + // Try creating the announcement with CANDIDATE_DELAYED state (which will fail due to the uniqueness + // of the ByPeer index if a non-CANDIDATE_BEST announcement already exists with the same txhash and peer). + // Bail out in that case. + auto ret = m_index.get().emplace(txhash, peer, preferred, reqtime, m_current_sequence); + if (!ret.second) return; + + // Update accounting metadata. + ++m_peerinfo[peer].m_total; + ++m_current_sequence; + } + + //! Find the txhashes to request now from peer. + std::vector GetRequestable(NodeId peer, int64_t now, std::vector>* expired) + { + // Move time. + SetTimePoint(now, expired); + + // Find all CANDIDATE_BEST announcements for this peer. + std::vector selected; + auto it_peer = m_index.get().lower_bound(ByPeerView{peer, true, uint256::ZERO}); + while (it_peer != m_index.get().end() && it_peer->m_peer == peer && + it_peer->m_state == State::CANDIDATE_BEST) { + selected.emplace_back(&*it_peer); + ++it_peer; + } + + // Sort by sequence number. + std::sort(selected.begin(), selected.end(), [](const Announcement* a, const Announcement* b) { + return a->m_sequence < b->m_sequence; + }); + + // Convert to vector of txhashes + std::vector ret; + ret.reserve(selected.size()); + std::transform(selected.begin(), selected.end(), std::back_inserter(ret), [](const Announcement* ann) { + return ann->m_txhash; + }); + return ret; + } + + void RequestedTx(NodeId peer, const uint256& txhash, int64_t expiry) + { + auto it = m_index.get().find(ByPeerView{peer, true, txhash}); + if (it == m_index.get().end()) { + // There is no CANDIDATE_BEST announcement, look for a _READY or _DELAYED instead. If the caller only + // ever invokes RequestedTx with the values returned by GetRequestable, and no other non-const functions + // other than ForgetTxHash and GetRequestable in between, this branch will never execute (as txhashes + // returned by GetRequestable always correspond to CANDIDATE_BEST announcements). + + it = m_index.get().find(ByPeerView{peer, false, txhash}); + if (it == m_index.get().end() || (it->m_state != State::CANDIDATE_DELAYED && + it->m_state != State::CANDIDATE_READY)) { + // There is no CANDIDATE announcement tracked for this peer, so we have nothing to do. Either this + // txhash wasn't tracked at all (and the caller should have called ReceivedInv), or it was already + // requested and/or completed for other reasons and this is just a superfluous RequestedTx call. + return; + } + + // Look for an existing CANDIDATE_BEST or REQUESTED with the same txhash. We only need to do this if the + // found announcement had a different state than CANDIDATE_BEST. If it did, invariants guarantee that no + // other CANDIDATE_BEST or REQUESTED can exist. + auto it_old = m_index.get().lower_bound(ByTxHashView{txhash, State::CANDIDATE_BEST, 0}); + if (it_old != m_index.get().end() && it_old->m_txhash == txhash) { + if (it_old->m_state == State::CANDIDATE_BEST) { + // The data structure's invariants require that there can be at most one CANDIDATE_BEST or one + // REQUESTED announcement per txhash (but not both simultaneously), so we have to convert any + // existing CANDIDATE_BEST to another CANDIDATE_* when constructing another REQUESTED. + // It doesn't matter whether we pick CANDIDATE_READY or _DELAYED here, as SetTimePoint() + // will correct it at GetRequestable() time. If time only goes forward, it will always be + // _READY, so pick that to avoid extra work in SetTimePoint(). + Modify(it_old, [](Announcement& ann) { ann.m_state = State::CANDIDATE_READY; }); + } else if (it_old->m_state == State::REQUESTED) { + // As we're no longer waiting for a response to the previous REQUESTED announcement, convert it + // to COMPLETED. This also helps guaranteeing progress. + Modify(it_old, [](Announcement& ann) { ann.m_state = State::COMPLETED; }); + } + } + } + + Modify(it, [expiry](Announcement& ann) { + ann.m_state = State::REQUESTED; + ann.m_time = expiry; + }); + } + + void ReceivedResponse(NodeId peer, const uint256& txhash) + { + // We need to search the ByPeer index for both (peer, false, txhash) and (peer, true, txhash). + auto it = m_index.get().find(ByPeerView{peer, false, txhash}); + if (it == m_index.get().end()) { + it = m_index.get().find(ByPeerView{peer, true, txhash}); + } + if (it != m_index.get().end()) MakeCompleted(m_index.project(it)); + } + + size_t CountInFlight(NodeId peer) const + { + auto it = m_peerinfo.find(peer); + if (it != m_peerinfo.end()) return it->second.m_requested; + return 0; + } + + size_t CountCandidates(NodeId peer) const + { + auto it = m_peerinfo.find(peer); + if (it != m_peerinfo.end()) return it->second.m_total - it->second.m_requested - it->second.m_completed; + return 0; + } + + size_t Count(NodeId peer) const + { + auto it = m_peerinfo.find(peer); + if (it != m_peerinfo.end()) return it->second.m_total; + return 0; + } + + //! Count how many announcements are being tracked in total across all peers and transactions. + size_t Size() const { return m_index.size(); } + + uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const + { + // Return Priority as a uint64_t as Priority is internal. + return uint64_t{m_computer(txhash, peer, preferred)}; + } + +}; + +TxRequestTracker::TxRequestTracker(bool deterministic) : + m_impl{MakeUnique(deterministic)} {} + +TxRequestTracker::~TxRequestTracker() = default; + +void TxRequestTracker::ForgetTxHash(const uint256& txhash) { m_impl->ForgetTxHash(txhash); } +void TxRequestTracker::DisconnectedPeer(NodeId peer) { m_impl->DisconnectedPeer(peer); } +size_t TxRequestTracker::CountInFlight(NodeId peer) const { return m_impl->CountInFlight(peer); } +size_t TxRequestTracker::CountCandidates(NodeId peer) const { return m_impl->CountCandidates(peer); } +size_t TxRequestTracker::Count(NodeId peer) const { return m_impl->Count(peer); } +size_t TxRequestTracker::Size() const { return m_impl->Size(); } +void TxRequestTracker::SanityCheck() const { m_impl->SanityCheck(); } + +void TxRequestTracker::PostGetRequestableSanityCheck(int64_t now) const +{ + m_impl->PostGetRequestableSanityCheck(now); +} + +void TxRequestTracker::ReceivedInv(NodeId peer, const uint256& txhash, bool preferred, + int64_t reqtime) +{ + m_impl->ReceivedInv(peer, txhash, preferred, reqtime); +} + +void TxRequestTracker::RequestedTx(NodeId peer, const uint256& txhash, int64_t expiry) +{ + m_impl->RequestedTx(peer, txhash, expiry); +} + +void TxRequestTracker::ReceivedResponse(NodeId peer, const uint256& txhash) +{ + m_impl->ReceivedResponse(peer, txhash); +} + +std::vector TxRequestTracker::GetRequestable(NodeId peer, int64_t now, + std::vector>* expired) +{ + return m_impl->GetRequestable(peer, now, expired); +} + +uint64_t TxRequestTracker::ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const +{ + return m_impl->ComputePriority(txhash, peer, preferred); +} diff --git a/src/txrequest.h b/src/txrequest.h new file mode 100644 index 00000000000..c5f8daf55ab --- /dev/null +++ b/src/txrequest.h @@ -0,0 +1,202 @@ +// Copyright (c) 2020 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_TXREQUEST_H +#define BITCOIN_TXREQUEST_H + +#include "net.h" // For NodeId +#include "uint256.h" + +#include + +#include + +/** Data structure to keep track of, and schedule, transaction downloads from peers. + * + * === Specification === + * + * We keep track of which peers have announced which transactions, and use that to determine which requests + * should go to which peer, when, and in what order. + * + * The following information is tracked per peer/tx combination ("announcement"): + * - Which peer announced it (through their NodeId) + * - The txhash of the transaction + * - What the earliest permitted time is that that transaction can be requested from that peer (called "reqtime"). + * - Whether it's from a "preferred" peer or not. Which announcements get this flag is determined by the caller, but + * this is designed for outbound peers, or other peers that we have a higher level of trust in. Even when the + * peers' preferredness changes, the preferred flag of existing announcements from that peer won't change. + * - Whether or not the transaction was requested already, and if so, when it times out (called "expiry"). + * - Whether or not the transaction request failed already (timed out, or invalid transaction or NOTFOUND was + * received). + * + * Transaction requests are then assigned to peers, following these rules: + * + * - No transaction is requested as long as another request for the same txhash is outstanding (it needs to fail + * first by passing expiry, or a NOTFOUND or invalid transaction has to be received for it). + * + * Rationale: to avoid wasting bandwidth on multiple copies of the same transaction. Note that this only works + * per txhash, so if the same transaction is announced both through txid and wtxid, we have no means + * to prevent fetching both (the caller can however mitigate this by delaying one, see further). + * + * - The same transaction is never requested twice from the same peer, unless the announcement was forgotten in + * between, and re-announced. Announcements are forgotten only: + * - If a peer goes offline, all its announcements are forgotten. + * - If a transaction has been successfully received, or is otherwise no longer needed, the caller can call + * ForgetTxHash, which removes all announcements across all peers with the specified txhash. + * - If for a given txhash only already-failed announcements remain, they are all forgotten. + * + * Rationale: giving a peer multiple chances to announce a transaction would allow them to bias requests in their + * favor, worsening transaction censoring attacks. The flip side is that as long as an attacker manages + * to prevent us from receiving a transaction, failed announcements (including those from honest peers) + * will linger longer, increasing memory usage somewhat. The impact of this is limited by imposing a + * cap on the number of tracked announcements per peer. As failed requests in response to announcements + * from honest peers should be rare, this almost solely hinders attackers. + * Transaction censoring attacks can be done by announcing transactions quickly while not answering + * requests for them. See https://allquantor.at/blockchainbib/pdf/miller2015topology.pdf for more + * information. + * + * - Transactions are not requested from a peer until its reqtime has passed. + * + * Rationale: enable the calling code to define a delay for less-than-ideal peers, so that (presumed) better + * peers have a chance to give their announcement first. + * + * - If multiple viable candidate peers exist according to the above rules, pick a peer as follows: + * + * - If any preferred peers are available, non-preferred peers are not considered for what follows. + * + * Rationale: preferred peers are more trusted by us, so are less likely to be under attacker control. + * + * - Pick a uniformly random peer among the candidates. + * + * Rationale: random assignments are hard to influence for attackers. + * + * Together these rules strike a balance between being fast in non-adverserial conditions and minimizing + * susceptibility to censorship attacks. An attacker that races the network: + * - Will be unsuccessful if all preferred connections are honest (and there is at least one preferred connection). + * - If there are P preferred connections of which Ph>=1 are honest, the attacker can delay us from learning + * about a transaction by k expiration periods, where k ~ 1 + NHG(N=P-1,K=P-Ph-1,r=1), which has mean + * P/(Ph+1) (where NHG stands for Negative Hypergeometric distribution). The "1 +" is due to the fact that the + * attacker can be the first to announce through a preferred connection in this scenario, which very likely means + * they get the first request. + * - If all P preferred connections are to the attacker, and there are NP non-preferred connections of which NPh>=1 + * are honest, where we assume that the attacker can disconnect and reconnect those connections, the distribution + * becomes k ~ P + NB(p=1-NPh/NP,r=1) (where NB stands for Negative Binomial distribution), which has mean + * P-1+NP/NPh. + * + * Complexity: + * - Memory usage is proportional to the total number of tracked announcements (Size()) plus the number of + * peers with a nonzero number of tracked announcements. + * - CPU usage is generally logarithmic in the total number of tracked announcements, plus the number of + * announcements affected by an operation (amortized O(1) per announcement). + */ +class TxRequestTracker { + // Avoid littering this header file with implementation details. + class Impl; + const std::unique_ptr m_impl; + +public: + //! Construct a TxRequestTracker. + explicit TxRequestTracker(bool deterministic = false); + ~TxRequestTracker(); + + // Conceptually, the data structure consists of a collection of "announcements", one for each peer/txhash + // combination: + // + // - CANDIDATE announcements represent transactions that were announced by a peer, and that become available for + // download after their reqtime has passed. + // + // - REQUESTED announcements represent transactions that have been requested, and which we're awaiting a + // response for from that peer. Their expiry value determines when the request times out. + // + // - COMPLETED announcements represent transactions that have been requested from a peer, and a NOTFOUND or a + // transaction was received in response (valid or not), or they timed out. They're only kept around to + // prevent requesting them again. If only COMPLETED announcements for a given txhash remain (so no CANDIDATE + // or REQUESTED ones), all of them are deleted (this is an invariant, and maintained by all operations below). + // + // The operations below manipulate the data structure. + + /** Adds a new CANDIDATE announcement. + * + * Does nothing if one already exists for that (txhash, peer) combination (whether it's CANDIDATE, REQUESTED, or + * COMPLETED). + */ + void ReceivedInv(NodeId peer, const uint256& txhash, bool preferred, int64_t reqtime); + + /** Deletes all announcements for a given peer. + * + * It should be called when a peer goes offline. + */ + void DisconnectedPeer(NodeId peer); + + /** Deletes all announcements for a given txhash (both txid and wtxid ones). + * + * This should be called when a transaction is no longer needed. The caller should ensure that new announcements + * for the same txhash will not trigger new ReceivedInv calls, at least in the short term after this call. + */ + void ForgetTxHash(const uint256& txhash); + + /** Find the txids to request now from peer. + * + * It does the following: + * - Convert all REQUESTED announcements (for all txhashes/peers) with (expiry <= now) to COMPLETED ones. + * - Requestable announcements are selected: CANDIDATE announcements from the specified peer with + * (reqtime <= now) for which no existing REQUESTED announcement with the same txhash from a different peer + * exists, and for which the specified peer is the best choice among all (reqtime <= now) CANDIDATE + * announcements with the same txhash (subject to preferredness rules, and tiebreaking using a deterministic + * salted hash of peer and txhash). + * - The selected announcements are returned in announcement order (even if multiple were added at the same + * time, or when the clock went backwards while they were being added). This is done to minimize + * disruption from dependent transactions being requested out of order: if multiple dependent transactions + * are announced simultaneously by one peer, and end up being requested from them, the requests will happen + * in announcement order. + */ + std::vector GetRequestable(NodeId peer, int64_t now, + std::vector>* expired = nullptr); + + /** Marks a transaction as requested, with a specified expiry. + * + * If no CANDIDATE announcement for the provided peer and txhash exists, this call has no effect. Otherwise: + * - That announcement is converted to REQUESTED. + * - If any other REQUESTED announcement for the same txhash already existed, it means an unexpected request + * was made (GetRequestable will never advise doing so). In this case it is converted to COMPLETED, as we're + * no longer waiting for a response to it. + */ + void RequestedTx(NodeId peer, const uint256& txhash, int64_t expiry); + + /** Converts a CANDIDATE or REQUESTED announcement to a COMPLETED one. If no such announcement exists for the + * provided peer and txhash, nothing happens. + * + * It should be called whenever a transaction or NOTFOUND was received from a peer. When the transaction is + * not needed entirely anymore, ForgetTxhash should be called instead of, or in addition to, this call. + */ + void ReceivedResponse(NodeId peer, const uint256& txhash); + + // The operations below inspect the data structure. + + /** Count how many REQUESTED announcements a peer has. */ + size_t CountInFlight(NodeId peer) const; + + /** Count how many CANDIDATE announcements a peer has. */ + size_t CountCandidates(NodeId peer) const; + + /** Count how many announcements a peer has (REQUESTED, CANDIDATE, and COMPLETED combined). */ + size_t Count(NodeId peer) const; + + /** Count how many announcements are being tracked in total across all peers and transaction hashes. */ + size_t Size() const; + + /** Access to the internal priority computation (testing only) */ + uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const; + + /** Run internal consistency check (testing only). */ + void SanityCheck() const; + + /** Run a time-dependent internal consistency check (testing only). + * + * This can only be called immediately after GetRequestable, with the same 'now' parameter. + */ + void PostGetRequestableSanityCheck(int64_t now) const; +}; + +#endif // BITCOIN_TXREQUEST_H diff --git a/src/uint256.cpp b/src/uint256.cpp index bd3d0170853..314a2bb00d1 100644 --- a/src/uint256.cpp +++ b/src/uint256.cpp @@ -80,3 +80,6 @@ template std::string base_blob<256>::GetHex() const; template std::string base_blob<256>::ToString() const; template void base_blob<256>::SetHex(const char*); template void base_blob<256>::SetHex(const std::string&); + +const uint256 uint256::ZERO(0); +const uint256 uint256::ONE(1); diff --git a/src/uint256.h b/src/uint256.h index a92ce07f111..4ebeffd082d 100644 --- a/src/uint256.h +++ b/src/uint256.h @@ -27,6 +27,9 @@ class base_blob memset(data, 0, sizeof(data)); } + /* constructor for constants between 1 and 255 */ + constexpr explicit base_blob(uint8_t v) : data{v} {} + explicit base_blob(const std::vector& vch); bool IsNull() const @@ -124,6 +127,7 @@ class uint256 : public base_blob<256> { public: uint256() {} uint256(const base_blob<256>& b) : base_blob<256>(b) {} + constexpr explicit uint256(uint8_t v) : base_blob<256>(v) {} explicit uint256(const std::vector& vch) : base_blob<256>(vch) {} /** A cheap hash function that just returns 64 bits from the result, it can be @@ -135,6 +139,9 @@ class uint256 : public base_blob<256> { { return ReadLE64(data); } + + static const uint256 ZERO; + static const uint256 ONE; }; /* uint256 from const char *. diff --git a/src/utilmemory.h b/src/utilmemory.h new file mode 100644 index 00000000000..15ecf8f80d4 --- /dev/null +++ b/src/utilmemory.h @@ -0,0 +1,19 @@ +// Copyright (c) 2009-2010 Satoshi Nakamoto +// Copyright (c) 2009-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_UTIL_MEMORY_H +#define BITCOIN_UTIL_MEMORY_H + +#include +#include + +//! Substitute for C++14 std::make_unique. +template +std::unique_ptr MakeUnique(Args&&... args) +{ + return std::unique_ptr(new T(std::forward(args)...)); +} + +#endif