-
Notifications
You must be signed in to change notification settings - Fork 0
/
das.nim
227 lines (190 loc) · 7.28 KB
/
das.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import
std/[random, math, strformat],
chronicles,
chronos,
libp2pdht/discv5/crypto as dhtcrypto,
libp2pdht/discv5/protocol as discv5_protocol,
tests/dht/test_helper
logScope:
topics = "DAS emulator"
simTime = Moment.now() - simStartTime
let
simStartTime = Moment.now()
proc bootstrapNodes(
nodecount: int,
bootnodes: seq[SignedPeerRecord],
rng = newRng(),
delay: int = 0
) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} =
debug "---- STARTING BOOSTRAPS ---"
for i in 0..<nodecount:
try:
let privKey = PrivateKey.example(rng)
let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes)
await node.start()
result.add((node, privKey))
if delay > 0:
await sleepAsync(chronos.milliseconds(delay))
except TransportOsError as e:
echo "skipping node ",i ,":", e.msg
#await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs
proc bootstrapNetwork(
nodecount: int,
rng = newRng(),
delay: int = 0
) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} =
let
bootNodeKey = PrivateKey.fromHex(
"a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")
.expect("Valid private key hex")
bootNodeAddr = localAddress(20301)
bootNode = initDiscoveryNode(rng, bootNodeKey, bootNodeAddr, @[]) # just a shortcut for new and open
#waitFor bootNode.bootstrap() # immediate, since no bootnodes are defined above
var res = await bootstrapNodes(nodecount - 1,
@[bootnode.localNode.record],
rng,
delay)
res.insert((bootNode, bootNodeKey), 0)
return res
proc toNodeId(data: openArray[byte]): NodeId =
readUintBE[256](keccak256.digest(data).data)
proc segmentData(s: int, segmentsize: int) : seq[byte] =
result = newSeq[byte](segmentsize)
var
r = s
i = 0
while r > 0:
assert(i<segmentsize)
result[i] = byte(r mod 256)
r = r div 256
i+=1
proc sample(s: Slice[int], len: int): seq[int] =
# random sample without replacement
# TODO: not the best for small len
assert s.a <= s.b
var all = s.b - s.a + 1
var count = len
if len >= all div 10: # add better algo selector
var generated = newSeq[bool](all) # Initialized to false.
while count != 0:
let n = rand(s)
if not generated[n - s.a]:
generated[n - s.a] = true
result.add n
dec count
else:
while count != 0:
let n = rand(s)
if not (n in result):
result.add n
dec count
when isMainModule:
proc main() {.async.} =
var
nodecount = 1000 ## up to 12000 (500+sec)
samplingnodes = 200 # nodecount-1
delay_pernode = 10 # in millisec
blocksize = 32*32 ## up to 128x128
segmentsize = 2
samplesize = 70
samplethreshold = samplesize
sampling_jitter = 50000 # in microseconds
delay_init = 60.minutes
upload_timeout = 4.seconds
sampling_delay = 4.seconds
sampling_timeout = 8.seconds
filename: string
assert(log2(blocksize.float).ceil.int <= segmentsize * 8 )
assert(samplesize <= blocksize)
var
segmentIDs = newSeq[NodeId](blocksize)
# start network
let
rng = newRng()
nodes = await bootstrapNetwork(nodecount=nodecount, delay=delay_pernode)
# wait for network to settle
info "waiting for DHT to settle"
await sleepAsync(delay_init)
let uploadStartTime = Moment.now()
# generate block and push data
info "starting upload to DHT"
var uploads = newSeq[Future[seq[Node]]]()
for s in 0 ..< blocksize:
let
segment = segmentData(s, segmentsize)
key = toNodeId(segment)
segmentIDs[s] = key
# start measuring time
let upload = nodes[0][0].addValue(key, segment)
upload.addCallback proc(udata: pointer) =
info "uploaded to DHT", by = 0, time = Moment.now() - uploadStartTime
uploads.add(upload)
let
uploadFinishedByTimeout = allFutures(uploads).withTimeout(upload_timeout)
uploadAllFinished = allFutures(uploads)
# info "uploaded to DHT", by = 0, pass, time = allFinished.duration
uploadFinishedByTimeout.addCallback proc(udata: pointer) =
info "uploaded to DHT by timeout", by = 0, time = uploadFinishedByTimeout.duration
uploadAllFinished.addCallback proc(udata: pointer) =
info "uploaded to DHT all", by = 0, time = uploadAllFinished.duration
await sleepAsync(sampling_delay)
# sample
info "starting sampling"
proc sampleOne(sampler: discv5_protocol.Protocol, cid: NodeId, startdelay: Duration = 0.milliseconds) : Future[DiscResult[seq[byte]]] {.async.} =
await sleepAsync(startdelay)
return await sampler.findValue(cid)
proc startSamplingDA(n: discv5_protocol.Protocol): (seq[int], seq[Future[DiscResult[seq[byte]]]]) =
## Generate random sample and start the sampling process
var futs = newSeq[Future[DiscResult[seq[byte]]]]()
let sample = sample(0 ..< blocksize, samplesize)
debug "starting sampling", by = n, sample
for s in sample:
let fut = n.sampleOne(segmentIDs[s], rand(0 .. sampling_jitter).microseconds)
futs.add(fut)
return (sample, futs)
proc sampleDA(n: discv5_protocol.Protocol): Future[(bool, int, Duration)] {.async.} =
## Sample and return detailed results of sampling
let startTime = Moment.now()
var (sample, futs) = startSamplingDA(n)
# test is passed if all segments are retrieved in time
discard await allFutures(futs).withTimeout(sampling_timeout)
var passcount: int
for i in 0 ..< futs.len:
if futs[i].finished() and isOk(await futs[i]):
passcount += 1
else:
info "sample failed", by = n.localNode, s = sample[i], key = segmentIDs[sample[i]]
let
time = Moment.now() - startTime
pass = (passcount >= samplethreshold)
info "sample", by = n.localNode, pass, cnt = passcount, time
return (pass, passcount, time)
proc sampleDAMany() {.async.} =
# all nodes start sampling in parallel
var samplings = newSeq[Future[(bool, int, Duration)]]()
for n in sample(1 ..< nodecount, samplingnodes):
samplings.add(sampleDA(nodes[n][0]))
await allFutures(samplings)
# print statistics
let csvFile = open(fmt"{filename}.csv", fmWrite)
defer: csvFile.close()
var
passed = 0
for f in samplings:
if f.finished():
let (pass, passcount, time) = await f
passed += pass.int
debug "sampleStats", pass, cnt = passcount, time
if pass:
csvFile.writeLine(time.milliseconds)
else:
csvFile.writeLine("100000") # using large value here as Gnuplot has issues with NaN
else:
error "This should not happen!"
info "sampleStats", passed, total = samplings.len, ratio = passed/samplings.len
filename = fmt"n{nodecount},sn{samplingnodes},dpn{delay_pernode},dinit{delay_init},bs{blocksize},ss{samplesize},sthr{samplethreshold}"
await sampleDAMany()
waitfor main()
# proc teardownAll() =
# for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here
# await n.closeWait()