forked from LIMXTEC/BitCore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_test.py
223 lines (178 loc) · 8.74 KB
/
example_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
#!/usr/bin/env python3
# Copyright (c) 2017 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""An example functional test
The module-level docstring should include a high-level description of
what the test is doing. It's the first thing people see when they open
the file and should give the reader information about *what* the test
is testing and *how* it's being tested
"""
# Imports should be in PEP8 ordering (std library first, then third party
# libraries then local imports).
from collections import defaultdict
# Avoid wildcard * imports if possible
from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.mininode import (
CInv,
NetworkThread,
NodeConn,
NodeConnCB,
mininode_lock,
msg_block,
msg_getdata,
wait_until,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
connect_nodes,
p2p_port,
)
# NodeConnCB is a class containing callbacks to be executed when a P2P
# message is received from the node-under-test. Subclass NodeConnCB and
# override the on_*() methods if you need custom behaviour.
class BaseNode(NodeConnCB):
def __init__(self):
"""Initialize the NodeConnCB
Used to inialize custom properties for the Node that aren't
included by default in the base class. Be aware that the NodeConnCB
base class already stores a counter for each P2P message type and the
last received message of each type, which should be sufficient for the
needs of most tests.
Call super().__init__() first for standard initialization and then
initialize custom properties."""
super().__init__()
# Stores a dictionary of all blocks received
self.block_receive_map = defaultdict(int)
def on_block(self, conn, message):
"""Override the standard on_block callback
Store the hash of a received block in the dictionary."""
message.block.calc_sha256()
self.block_receive_map[message.block.sha256] += 1
def on_inv(self, conn, message):
"""Override the standard on_inv callback"""
pass
def custom_function():
"""Do some custom behaviour
If this function is more generally useful for other tests, consider
moving it to a module in test_framework."""
# self.log.info("running custom_function") # Oops! Can't run self.log outside the BitcoinTestFramework
pass
class ExampleTest(BitcoinTestFramework):
# Each functional test is a subclass of the BitcoinTestFramework class.
# Override the __init__(), add_options(), setup_chain(), setup_network()
# and setup_nodes() methods to customize the test setup as required.
def __init__(self):
"""Initialize the test
Call super().__init__() first, and then override any test parameters
for your individual test."""
super().__init__()
self.setup_clean_chain = True
self.num_nodes = 3
# Use self.extra_args to change command-line arguments for the nodes
self.extra_args = [[], ["-logips"], []]
# self.log.info("I've finished __init__") # Oops! Can't run self.log before run_test()
# Use add_options() to add specific command-line options for your test.
# In practice this is not used very much, since the tests are mostly written
# to be run in automated environments without command-line options.
# def add_options()
# pass
# Use setup_chain() to customize the node data directories. In practice
# this is not used very much since the default behaviour is almost always
# fine
# def setup_chain():
# pass
def setup_network(self):
"""Setup the test network topology
Often you won't need to override this, since the standard network topology
(linear: node0 <-> node1 <-> node2 <-> ...) is fine for most tests.
If you do override this method, remember to start the nodes, assign
them to self.nodes, connect them and then sync."""
self.setup_nodes()
# In this test, we're not connecting node2 to node0 or node1. Calls to
# sync_all() should not include node2, since we're not expecting it to
# sync.
connect_nodes(self.nodes[0], 1)
self.sync_all([self.nodes[0:1]])
# Use setup_nodes() to customize the node start behaviour (for example if
# you don't want to start all nodes at the start of the test).
# def setup_nodes():
# pass
def custom_method(self):
"""Do some custom behaviour for this test
Define it in a method here because you're going to use it repeatedly.
If you think it's useful in general, consider moving it to the base
BitcoinTestFramework class so other tests can use it."""
self.log.info("Running custom_method")
def run_test(self):
"""Main test logic"""
# Create a P2P connection to one of the nodes
node0 = BaseNode()
connections = []
connections.append(NodeConn('127.0.0.1', p2p_port(0), self.nodes[0], node0))
node0.add_connection(connections[0])
# Start up network handling in another thread. This needs to be called
# after the P2P connections have been created.
NetworkThread().start()
# wait_for_verack ensures that the P2P connection is fully up.
node0.wait_for_verack()
# Generating a block on one of the nodes will get us out of IBD
blocks = [int(self.nodes[0].generate(nblocks=1)[0], 16)]
self.sync_all([self.nodes[0:1]])
# Notice above how we called an RPC by calling a method with the same
# name on the node object. Notice also how we used a keyword argument
# to specify a named RPC argument. Neither of those are defined on the
# node object. Instead there's some __getattr__() magic going on under
# the covers to dispatch unrecognised attribute calls to the RPC
# interface.
# Logs are nice. Do plenty of them. They can be used in place of comments for
# breaking the test into sub-sections.
self.log.info("Starting test!")
self.log.info("Calling a custom function")
custom_function()
self.log.info("Calling a custom method")
self.custom_method()
self.log.info("Create some blocks")
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1
height = 1
for i in range(10):
# Use the mininode and blocktools functionality to manually build a block
# Calling the generate() rpc is easier, but this allows us to exactly
# control the blocks and transactions.
block = create_block(self.tip, create_coinbase(height), self.block_time)
block.solve()
block_message = msg_block(block)
# Send message is used to send a P2P message to the node over our NodeConn connection
node0.send_message(block_message)
self.tip = block.sha256
blocks.append(self.tip)
self.block_time += 1
height += 1
self.log.info("Wait for node1 to reach current tip (height 11) using RPC")
self.nodes[1].waitforblockheight(11)
self.log.info("Connect node2 and node1")
connect_nodes(self.nodes[1], 2)
self.log.info("Add P2P connection to node2")
node2 = BaseNode()
connections.append(NodeConn('127.0.0.1', p2p_port(2), self.nodes[2], node2))
node2.add_connection(connections[1])
node2.wait_for_verack()
self.log.info("Wait for node2 reach current tip. Test that it has propogated all the blocks to us")
getdata_request = msg_getdata()
for block in blocks:
getdata_request.inv.append(CInv(2, block))
node2.send_message(getdata_request)
# wait_until() will loop until a predicate condition is met. Use it to test properties of the
# NodeConnCB objects.
assert wait_until(lambda: sorted(blocks) == sorted(list(node2.block_receive_map.keys())), timeout=5)
self.log.info("Check that each block was received only once")
# The network thread uses a global lock on data access to the NodeConn objects when sending and receiving
# messages. The test thread should acquire the global lock before accessing any NodeConn data to avoid locking
# and synchronization issues. Note wait_until() acquires this global lock when testing the predicate.
with mininode_lock:
for block in node2.block_receive_map.values():
assert_equal(block, 1)
if __name__ == '__main__':
ExampleTest().main()