diff --git a/node_stats.py b/node_stats.py index d12b140..fd09e28 100644 --- a/node_stats.py +++ b/node_stats.py @@ -142,7 +142,7 @@ async def main(): # data['block_processor_generator'] = response3['node']['block_processor']['generator']['state_blocks']['count'] data['block_arrival_count'] = response3['node']['block_arrival']['arrival']['count'] data['online_reps_arrival_count'] = response3['node']['online_reps']['reps']['count'] - data['votes_cache_count'] = response3['node']['votes_cache']['cache']['count'] +# data['votes_cache_count'] = response3['node']['votes_cache']['cache']['count'] data['block_uniquer_count'] = response3['node']['block_uniquer']['blocks']['count'] data['vote_uniquer_count'] = response3['node']['vote_uniquer']['votes']['count'] data['confirmation_height_count'] = response3['node']['confirmation_height_processor']['awaiting_processing']['count'] diff --git a/vote_analysis.py b/vote_analysis.py index b8d55c7..401fb52 100644 --- a/vote_analysis.py +++ b/vote_analysis.py @@ -16,11 +16,13 @@ import math import config import copy +import ujson #from aiohttp_requests import requests from collections import defaultdict from sys import exit from time import sleep from datetime import datetime +from aiofile import AIOFile parser = argparse.ArgumentParser() parser.add_argument('-host', '--node_url', type=str, help='Nano node url', default=config.host) @@ -30,6 +32,7 @@ parser.add_argument('-delay', '--delay', type=int, help='Sending delay (in seconds)', default=config.delay) parser.add_argument('-send', '--send', type=str, help='Send/Receive use true (default false)', default=config.send) parser.add_argument('-beta', '--beta', type=str, help='Is this the beta network? (default true)', default='true') +parser.add_argument('-vbh', '--vbh', type=int, help='Time interval (in seconds) for vote by hash recording', default=60) args = parser.parse_args() if args.send == 'true': @@ -49,6 +52,7 @@ VOTE_BY_HASH_MAX = 12 votes = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX) + VOTE_COUNT_MAX = 200 counts = defaultdict(lambda: [0]*VOTE_COUNT_MAX) @@ -141,10 +145,13 @@ async def getWork(self, hash, account): class VoteAnalysis(): def __init__(self): + self.vote_data_copy = [] self.vote_data = [] self.conf_data = [] self.hashes = [] self.confirmedHashes = [] + self.vote_by_hash = {} + self.vbh = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX) self.block_data = defaultdict(dict) self.hash = '' self.work = None @@ -152,15 +159,19 @@ def __init__(self): self.pending = None self.timestamp = None - def writeBkup(self): + def recordVBH(self): + self.vote_by_hash[str(time.time())] = self.vbh + self.vbh = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX) + + async def writeBkup(self): + print(str(time.time())+' - processing votes') # Enable votes.json for debugging all votes compared to filtered votes # print(str(time.time())+' - Writing to votes.json .....') # with open('votes.json', 'a') as jsonfile: # jsonfile.write(json.dumps(self.vote_data)) - vote_data_copy = copy.deepcopy(self.vote_data) - self.vote_data = [] - print(str(time.time())+' - {} votes received'.format(len(vote_data_copy))) - for x in vote_data_copy: + beforeTime = time.time() + print(str(time.time())+' - {} votes received'.format(len(self.vote_data_copy))) + for x in self.vote_data_copy: for z in x['blocks']: if z in self.hashes: data = {} @@ -175,13 +186,21 @@ def writeBkup(self): self.block_data[z][x['account']] = data print(str(time.time())+' - '+str(len(self.block_data[self.hash]))+" votes for Hash: "+self.hash) print("") + afterTime = time.time() + print('For Loop Time: '+str(afterTime-beforeTime)) if len(self.conf_data)%args.save == 0: print('\nWriting to vote_hashes.json .....') - with open('vote_hashes.json', 'w') as jsonfile: - jsonfile.write(json.dumps(analysis.conf_data)) + async with AIOFile("vote_hashes.json", 'w+') as jsonfile: + await jsonfile.write(json.dumps(analysis.conf_data)) + await jsonfile.fsync() +# with open('vote_hashes.json', 'w') as jsonfile: +# jsonfile.write(json.dumps(analysis.conf_data)) print('\nWriting to vote_data.json .....') - with open('vote_data.json', 'w') as jsonfile: - jsonfile.write(json.dumps(analysis.block_data)) + async with AIOFile("vote_data.json", 'w+') as jsonfile: + await jsonfile.write(json.dumps(analysis.block_data)) + await jsonfile.fsync() +# with open('vote_data.json', 'w') as jsonfile: +# jsonfile.write(json.dumps(analysis.block_data)) # Only monitor specified account for blocks and votes async def monitor_send(self): @@ -192,7 +211,15 @@ async def monitor_send(self): block_count = len(self.hashes) if len(self.hashes) >= 1: self.hash = self.hashes[-1] - self.writeBkup() + copyTime = time.time() + self.vote_data_copy = ujson.loads(ujson.dumps(self.vote_data)) + self.vote_data = [] + copyTimeEnd = time.time() + writeTime = time.time() + asyncio.create_task(self.writeBkup()) + writeTimeEnd = time.time() + print('Copy Time: '+str(copyTimeEnd-copyTime)+' - Write Bkup Time: '+str(writeTimeEnd-writeTime)) + print('Total Time: '+str(writeTimeEnd-copyTime)) else: print("Hash Not Confirmed") await asyncio.sleep(args.delay+1) @@ -215,7 +242,10 @@ async def periodic_send(self): self.balance = res_js['balance'] while 1: if self.hash in self.hashes or self.timestamp == None: - if self.hashes: self.writeBkup() + if self.hashes: + self.vote_data_copy = ujson.loads(ujson.dumps(self.vote_data)) + self.vote_data = [] + asyncio.create_task(self.writeBkup()) try: await getWork(self, self.hash, account) self.timestamp = math.floor(time.time()*1000) @@ -286,15 +316,20 @@ async def periodic_send(self): # res_js = await res.json() # print(res_js) await asyncio.sleep(args.delay) - +#"all_local_accounts": "true", async def analyse_votes(self): async with websockets.connect(f"ws://{args.node_url}:{args.socket_port}") as websocket: - await websocket.send(json.dumps(subscription("confirmation", ack=True, options={"accounts":[ - account - ]}))) + await websocket.send(json.dumps(subscription("confirmation", ack=True, + options={"accounts":[account], + "confirmation_type": "active_quorum", + "include_election_info": "true"}))) # Use the following instead of above to listen on all accounts instead of filtered account -# await websocket.send(json.dumps(subscription("confirmation", ack=True))) +# await websocket.send(json.dumps(subscription("confirmation", ack=True, +# options={"confirmation_type": "active_quorum", +# "include_election_info": "true", +# "include_block": "false" +# }))) print(await websocket.recv()) # ack @@ -303,34 +338,56 @@ async def analyse_votes(self): blocks = set() block_count = 0 + vbhInterval = time.time() while 1: + if time.time() - vbhInterval > args.vbh: + print('recording vbh') + vbhInterval = time.time() + self.recordVBH() if not websocket.open: print ('Websocket NOT connected. Trying to reconnect.') try: websocket = await asyncio.wait_for(websockets.connect(f"ws://{args.node_url}:{args.socket_port}"), 2) - await websocket.send(json.dumps(subscription("confirmation", ack=True, options={"accounts":[ - account - ]}))) - await websocket.send(json.dumps(subscription("vote", ack=True))) + await websocket.send(json.dumps(subscription("confirmation", ack=True, + options={"accounts":[account], + "confirmation_type": "active_quorum", + "include_election_info": "true"}))) +# Use the following instead of above to listen on all accounts instead of filtered account +# await websocket.send(json.dumps(subscription("confirmation", ack=True, +# options={"confirmation_type": "active_quorum", +# "include_election_info": "true", +# "include_block": "false" +# }))) + await websocket.send(json.dumps(subscription("vote", ack=True, + options={"include_replays": "true", + "include_indeterminate": "true" + }))) except Exception as e: print('Error!: ', e) try: rec = json.loads(await websocket.recv()) +# print(json.dumps(rec)) topic = rec.get("topic", None) if topic: message = rec["message"] if topic == "vote": + timeMilli = int(time.time()*1000) + if timeMilli-int(rec['time']) > 500: + print('Time: '+str(timeMilli)+' - Vote Time: '+rec['time']+' - Diff: '+str(timeMilli-int(rec['time']))) repaccount, vote_count = message["account"], len(message["blocks"]) votes[repaccount][vote_count-1] += 1 + self.vbh[repaccount][vote_count-1] += 1 data = {} data['time'] = rec['time'] data['account'] = message['account'] + data['sequence'] = message['sequence'] data['blocks'] = message['blocks'] self.vote_data.append(data) elif topic == "confirmation": data = {} data['timestamp'] = self.timestamp data['time'] = rec['time'] + data['conf_time'] = message['election_info']['time'] data['account'] = message['account'] data['hash'] = message['hash'] data['confirmation_type'] = message['confirmation_type'] @@ -362,13 +419,17 @@ async def analyse_votes(self): except KeyboardInterrupt: pass -analysis.writeBkup() +#analysis.writeBkup() +analysis.recordVBH() print('\nWriting to vote_hashes.json .....') with open('vote_hashes.json', 'w') as jsonfile: jsonfile.write(json.dumps(analysis.conf_data)) print('\nWriting to vote_data.json .....') with open('vote_data.json', 'w') as jsonfile: jsonfile.write(json.dumps(analysis.block_data)) +print('\nWriting to vbh_interval.json .....') +with open('vbh_interval.json', 'w') as jsonfile: + jsonfile.write(json.dumps(analysis.vote_by_hash)) vote_dict = dict(votes) print('\nWriting to vote_batching.csv .....') with open('vote_batching.csv', 'w') as csvfile: