Skip to content

Commit

Permalink
Updated stats for V22.0DB1. Update vote_analysis.py to perform better…
Browse files Browse the repository at this point in the history
… and utilize election time rather than websocket time
  • Loading branch information
Srayman committed Aug 4, 2020
1 parent d72f873 commit 7c30946
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 22 deletions.
2 changes: 1 addition & 1 deletion node_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def main():
# data['block_processor_generator'] = response3['node']['block_processor']['generator']['state_blocks']['count']
data['block_arrival_count'] = response3['node']['block_arrival']['arrival']['count']
data['online_reps_arrival_count'] = response3['node']['online_reps']['reps']['count']
data['votes_cache_count'] = response3['node']['votes_cache']['cache']['count']
# data['votes_cache_count'] = response3['node']['votes_cache']['cache']['count']
data['block_uniquer_count'] = response3['node']['block_uniquer']['blocks']['count']
data['vote_uniquer_count'] = response3['node']['vote_uniquer']['votes']['count']
data['confirmation_height_count'] = response3['node']['confirmation_height_processor']['awaiting_processing']['count']
Expand Down
103 changes: 82 additions & 21 deletions vote_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import math
import config
import copy
import ujson
#from aiohttp_requests import requests
from collections import defaultdict
from sys import exit
from time import sleep
from datetime import datetime
from aiofile import AIOFile

parser = argparse.ArgumentParser()
parser.add_argument('-host', '--node_url', type=str, help='Nano node url', default=config.host)
Expand All @@ -30,6 +32,7 @@
parser.add_argument('-delay', '--delay', type=int, help='Sending delay (in seconds)', default=config.delay)
parser.add_argument('-send', '--send', type=str, help='Send/Receive use true (default false)', default=config.send)
parser.add_argument('-beta', '--beta', type=str, help='Is this the beta network? (default true)', default='true')
parser.add_argument('-vbh', '--vbh', type=int, help='Time interval (in seconds) for vote by hash recording', default=60)
args = parser.parse_args()

if args.send == 'true':
Expand All @@ -49,6 +52,7 @@

VOTE_BY_HASH_MAX = 12
votes = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX)

VOTE_COUNT_MAX = 200
counts = defaultdict(lambda: [0]*VOTE_COUNT_MAX)

Expand Down Expand Up @@ -141,26 +145,33 @@ async def getWork(self, hash, account):

class VoteAnalysis():
def __init__(self):
self.vote_data_copy = []
self.vote_data = []
self.conf_data = []
self.hashes = []
self.confirmedHashes = []
self.vote_by_hash = {}
self.vbh = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX)
self.block_data = defaultdict(dict)
self.hash = ''
self.work = None
self.balance = None
self.pending = None
self.timestamp = None

def writeBkup(self):
def recordVBH(self):
self.vote_by_hash[str(time.time())] = self.vbh
self.vbh = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX)

async def writeBkup(self):
print(str(time.time())+' - processing votes')
# Enable votes.json for debugging all votes compared to filtered votes
# print(str(time.time())+' - Writing to votes.json .....')
# with open('votes.json', 'a') as jsonfile:
# jsonfile.write(json.dumps(self.vote_data))
vote_data_copy = copy.deepcopy(self.vote_data)
self.vote_data = []
print(str(time.time())+' - {} votes received'.format(len(vote_data_copy)))
for x in vote_data_copy:
beforeTime = time.time()
print(str(time.time())+' - {} votes received'.format(len(self.vote_data_copy)))
for x in self.vote_data_copy:
for z in x['blocks']:
if z in self.hashes:
data = {}
Expand All @@ -175,13 +186,21 @@ def writeBkup(self):
self.block_data[z][x['account']] = data
print(str(time.time())+' - '+str(len(self.block_data[self.hash]))+" votes for Hash: "+self.hash)
print("")
afterTime = time.time()
print('For Loop Time: '+str(afterTime-beforeTime))
if len(self.conf_data)%args.save == 0:
print('\nWriting to vote_hashes.json .....')
with open('vote_hashes.json', 'w') as jsonfile:
jsonfile.write(json.dumps(analysis.conf_data))
async with AIOFile("vote_hashes.json", 'w+') as jsonfile:
await jsonfile.write(json.dumps(analysis.conf_data))
await jsonfile.fsync()
# with open('vote_hashes.json', 'w') as jsonfile:
# jsonfile.write(json.dumps(analysis.conf_data))
print('\nWriting to vote_data.json .....')
with open('vote_data.json', 'w') as jsonfile:
jsonfile.write(json.dumps(analysis.block_data))
async with AIOFile("vote_data.json", 'w+') as jsonfile:
await jsonfile.write(json.dumps(analysis.block_data))
await jsonfile.fsync()
# with open('vote_data.json', 'w') as jsonfile:
# jsonfile.write(json.dumps(analysis.block_data))

# Only monitor specified account for blocks and votes
async def monitor_send(self):
Expand All @@ -192,7 +211,15 @@ async def monitor_send(self):
block_count = len(self.hashes)
if len(self.hashes) >= 1:
self.hash = self.hashes[-1]
self.writeBkup()
copyTime = time.time()
self.vote_data_copy = ujson.loads(ujson.dumps(self.vote_data))
self.vote_data = []
copyTimeEnd = time.time()
writeTime = time.time()
asyncio.create_task(self.writeBkup())
writeTimeEnd = time.time()
print('Copy Time: '+str(copyTimeEnd-copyTime)+' - Write Bkup Time: '+str(writeTimeEnd-writeTime))
print('Total Time: '+str(writeTimeEnd-copyTime))
else:
print("Hash Not Confirmed")
await asyncio.sleep(args.delay+1)
Expand All @@ -215,7 +242,10 @@ async def periodic_send(self):
self.balance = res_js['balance']
while 1:
if self.hash in self.hashes or self.timestamp == None:
if self.hashes: self.writeBkup()
if self.hashes:
self.vote_data_copy = ujson.loads(ujson.dumps(self.vote_data))
self.vote_data = []
asyncio.create_task(self.writeBkup())
try:
await getWork(self, self.hash, account)
self.timestamp = math.floor(time.time()*1000)
Expand Down Expand Up @@ -286,15 +316,20 @@ async def periodic_send(self):
# res_js = await res.json()
# print(res_js)
await asyncio.sleep(args.delay)

#"all_local_accounts": "true",
async def analyse_votes(self):
async with websockets.connect(f"ws://{args.node_url}:{args.socket_port}") as websocket:
await websocket.send(json.dumps(subscription("confirmation", ack=True, options={"accounts":[
account
]})))
await websocket.send(json.dumps(subscription("confirmation", ack=True,
options={"accounts":[account],
"confirmation_type": "active_quorum",
"include_election_info": "true"})))

# Use the following instead of above to listen on all accounts instead of filtered account
# await websocket.send(json.dumps(subscription("confirmation", ack=True)))
# await websocket.send(json.dumps(subscription("confirmation", ack=True,
# options={"confirmation_type": "active_quorum",
# "include_election_info": "true",
# "include_block": "false"
# })))

print(await websocket.recv()) # ack

Expand All @@ -303,34 +338,56 @@ async def analyse_votes(self):

blocks = set()
block_count = 0
vbhInterval = time.time()
while 1:
if time.time() - vbhInterval > args.vbh:
print('recording vbh')
vbhInterval = time.time()
self.recordVBH()
if not websocket.open:
print ('Websocket NOT connected. Trying to reconnect.')
try:
websocket = await asyncio.wait_for(websockets.connect(f"ws://{args.node_url}:{args.socket_port}"), 2)
await websocket.send(json.dumps(subscription("confirmation", ack=True, options={"accounts":[
account
]})))
await websocket.send(json.dumps(subscription("vote", ack=True)))
await websocket.send(json.dumps(subscription("confirmation", ack=True,
options={"accounts":[account],
"confirmation_type": "active_quorum",
"include_election_info": "true"})))
# Use the following instead of above to listen on all accounts instead of filtered account
# await websocket.send(json.dumps(subscription("confirmation", ack=True,
# options={"confirmation_type": "active_quorum",
# "include_election_info": "true",
# "include_block": "false"
# })))
await websocket.send(json.dumps(subscription("vote", ack=True,
options={"include_replays": "true",
"include_indeterminate": "true"
})))
except Exception as e:
print('Error!: ', e)
try:
rec = json.loads(await websocket.recv())
# print(json.dumps(rec))
topic = rec.get("topic", None)
if topic:
message = rec["message"]
if topic == "vote":
timeMilli = int(time.time()*1000)
if timeMilli-int(rec['time']) > 500:
print('Time: '+str(timeMilli)+' - Vote Time: '+rec['time']+' - Diff: '+str(timeMilli-int(rec['time'])))
repaccount, vote_count = message["account"], len(message["blocks"])
votes[repaccount][vote_count-1] += 1
self.vbh[repaccount][vote_count-1] += 1
data = {}
data['time'] = rec['time']
data['account'] = message['account']
data['sequence'] = message['sequence']
data['blocks'] = message['blocks']
self.vote_data.append(data)
elif topic == "confirmation":
data = {}
data['timestamp'] = self.timestamp
data['time'] = rec['time']
data['conf_time'] = message['election_info']['time']
data['account'] = message['account']
data['hash'] = message['hash']
data['confirmation_type'] = message['confirmation_type']
Expand Down Expand Up @@ -362,13 +419,17 @@ async def analyse_votes(self):
except KeyboardInterrupt:
pass

analysis.writeBkup()
#analysis.writeBkup()
analysis.recordVBH()
print('\nWriting to vote_hashes.json .....')
with open('vote_hashes.json', 'w') as jsonfile:
jsonfile.write(json.dumps(analysis.conf_data))
print('\nWriting to vote_data.json .....')
with open('vote_data.json', 'w') as jsonfile:
jsonfile.write(json.dumps(analysis.block_data))
print('\nWriting to vbh_interval.json .....')
with open('vbh_interval.json', 'w') as jsonfile:
jsonfile.write(json.dumps(analysis.vote_by_hash))
vote_dict = dict(votes)
print('\nWriting to vote_batching.csv .....')
with open('vote_batching.csv', 'w') as csvfile:
Expand Down

0 comments on commit 7c30946

Please sign in to comment.