Skip to content

Commit

Permalink
updated vote_analysis.py to handle node disconnects better
Browse files Browse the repository at this point in the history
  • Loading branch information
Srayman committed Aug 31, 2019
1 parent d8a4dd4 commit 227b0cc
Showing 1 changed file with 74 additions and 37 deletions.
111 changes: 74 additions & 37 deletions vote_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
import aiohttp
import math
import config
import mpmath
import copy
#from aiohttp_requests import requests
from collections import defaultdict
from sys import exit
from time import sleep
from datetime import datetime
from mpmath import mp

parser = argparse.ArgumentParser()
parser.add_argument('-host', '--node_url', type=str, help='Nano node url', default=config.host)
Expand Down Expand Up @@ -51,7 +49,7 @@

VOTE_BY_HASH_MAX = 12
votes = defaultdict(lambda: [0]*VOTE_BY_HASH_MAX)
VOTE_COUNT_MAX = 60
VOTE_COUNT_MAX = 200
counts = defaultdict(lambda: [0]*VOTE_COUNT_MAX)

#Rename existing file
Expand Down Expand Up @@ -98,6 +96,12 @@ def blockcreate(account, representative, balance, link, previous, work):

def process(block):
return {'action':'process', 'block':block}

def republish(hash):
return {'action':'republish', 'hash':hash}

def confirm(hash):
return {'action':'block_confirm', 'hash':hash}

def accountrpc(account):
return {'action':'account_history', 'account':account, 'count':'1'}
Expand All @@ -110,7 +114,10 @@ def pending(account):

def workget(account):
return {'action':'work_get', 'wallet':wallet, 'account':account}


def workgenerate(hash,account):
return {'action':'work_generate', 'hash':hash}

def dpowworkget(hash, account):
return {'hash':hash, 'user':config.dpow_user, 'api_key':config.dpow_api_key, 'account':account}

Expand All @@ -119,26 +126,27 @@ async def getWork(self, hash, account):
async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
res = await session.post(config.dpow_url, json=dpowworkget(hash, account))
res_js = await res.json()
if not res_js["work"]: print(res_js)
if 'work' not in res_js: print(res_js)
self.work = res_js["work"]
else:
async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
res = await session.post(f"http://{args.node_url}:{args.node_port}", json=workget(account))
res = await session.post(f"http://{args.node_url}:{args.node_port}", json=workgenerate(hash,account))
res_js = await res.json()
if not res_js["work"]: print(res_js)
if 'work' not in res_js: print(res_js)
self.work = res_js["work"]

class VoteAnalysis():
def __init__(self):
self.vote_data = []
self.conf_data = []
self.hashes = []
self.confirmedHashes = []
self.block_data = defaultdict(dict)
self.hash = None
self.work = None
self.balance = None
self.pending = None
self.timestamp = math.floor(time.time()*1000)
self.timestamp = None

def writeBkup(self):
# Enable votes.json for debugging all votes compared to filtered votes
Expand Down Expand Up @@ -176,7 +184,7 @@ async def monitor_send(self):
block_count = 0
while 1:
self.timestamp = math.floor(time.time()*1000)
if len(self.hashes) > block_count:
if len(self.hashes) > block_count or len(self.vote_data) > 1000:
block_count = len(self.hashes)
self.hash = self.hashes[-1]
self.writeBkup()
Expand All @@ -201,14 +209,14 @@ async def periodic_send(self):
res_js = await res.json()
self.balance = res_js['balance']
while 1:
if self.hash in self.hashes or not self.hashes:
if self.hash in self.hashes or self.timestamp == None:
if self.hashes: self.writeBkup()
try:
await getWork(self, self.hash, account)
self.timestamp = math.floor(time.time()*1000)
if send == 1:
print(str(time.time())+' - Sending ... Work: '+self.work)
self.balance = mp.nstr(mp.fsub(mp.mpmathify(self.balance),1),64)[:-2]
self.balance = int(self.balance)-1
async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
res = await session.post(f"http://{args.node_url}:{args.node_port}", json=blockcreate(account, config.representative, self.balance, account, self.hash, self.work))
res_js = await res.json()
Expand All @@ -218,22 +226,25 @@ async def periodic_send(self):
res_js = await res.json()
if 'hash' in res_js:
self.hash = res_js['hash']
self.hashes.append(self.hash)
send = 0
else:
print(res_js)
print(blockcreate(account, config.representative, self.balance, account, self.hash, self.work))
else:
print(res_js)
# async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
# res = await session.post(f"http://{args.node_url}:{args.node_port}", json=sendrpc(account, self.work))
# res_js = await res.json()
# if 'block' in res_js:
# self.hash = res_js["block"]
# self.hashes.append(self.hash)
# send = 0
# else:
# print(res_js)
else:
print(str(time.time())+' - Receiving ... Work: '+self.work)
self.balance = mp.nstr(mp.fadd(mp.mpmathify(self.balance),1),64)[:-2]
self.balance = int(self.balance)+1
async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
res = await session.post(f"http://{args.node_url}:{args.node_port}", json=blockcreate(account, config.representative, self.balance, self.hash, self.hash, self.work))
res_js = await res.json()
Expand All @@ -243,6 +254,7 @@ async def periodic_send(self):
res_js = await res.json()
if 'hash' in res_js:
self.hash = res_js['hash']
self.hashes.append(self.hash)
send = 1
else:
print(res_js)
Expand All @@ -253,51 +265,76 @@ async def periodic_send(self):
# res_js = await res.json()
# if 'block' in res_js:
# self.hash = res_js["block"]
# self.hashes.append(self.hash)
# send = 1
# else:
# print(res_js)
except Exception as e: print(traceback.format_exc())
except Exception as e:
print("Error Sending or Receiving")
# print(traceback.format_exc())
# print(res_js)
print(str(time.time())+" - Hash: "+self.hash)
else:
print("Hash Not Confirmed")
# async with aiohttp.ClientSession(json_serialize=json.dumps) as session:
# res = await session.post(f"http://{args.node_url}:{args.node_port}", json=confirm(self.hash))
# res_js = await res.json()
# print(res_js)
await asyncio.sleep(args.delay)

async def analyse_votes(self):
async with websockets.connect(f"ws://{args.node_url}:{args.socket_port}") as websocket:
await websocket.send(json.dumps(subscription("confirmation", ack=True, options={"accounts":[
account
]})))
print(await websocket.recv()) # ack

# Use the following instead of above to listen on all accounts instead of filtered account
# await websocket.send(json.dumps(subscription("confirmation", ack=True)))

print(await websocket.recv()) # ack

await websocket.send(json.dumps(subscription("vote", ack=True)))
print(await websocket.recv()) # ack

blocks = set()
block_count = 0
while 1:
rec = json.loads(await websocket.recv())
topic = rec.get("topic", None)
if topic:
message = rec["message"]
if topic == "vote":
repaccount, vote_count = message["account"], len(message["blocks"])
votes[repaccount][vote_count-1] += 1
data = {}
data['time'] = rec['time']
data['account'] = message['account']
data['blocks'] = message['blocks']
self.vote_data.append(data)
elif topic == "confirmation":
data = {}
data['timestamp'] = self.timestamp
data['time'] = rec['time']
data['account'] = message['account']
data['hash'] = message['hash']
data['confirmation_type'] = message['confirmation_type']
self.conf_data.append(data)
self.hashes.append(message['hash'])
print("{} - {} blocks confirmed".format(str(time.time()), len(self.conf_data)))
if not websocket.open:
print ('Websocket NOT connected. Trying to reconnect.')
try:
websocket = await asyncio.wait_for(websockets.connect(f"ws://{args.node_url}:{args.socket_port}"), 2)
await websocket.send(json.dumps(subscription("confirmation", ack=True, options={"accounts":[
account
]})))
await websocket.send(json.dumps(subscription("vote", ack=True)))
except Exception as e:
print('Error!: ', e)
try:
rec = json.loads(await websocket.recv())
topic = rec.get("topic", None)
if topic:
message = rec["message"]
if topic == "vote":
repaccount, vote_count = message["account"], len(message["blocks"])
votes[repaccount][vote_count-1] += 1
data = {}
data['time'] = rec['time']
data['account'] = message['account']
data['blocks'] = message['blocks']
self.vote_data.append(data)
elif topic == "confirmation":
data = {}
data['timestamp'] = self.timestamp
data['time'] = rec['time']
data['account'] = message['account']
data['hash'] = message['hash']
data['confirmation_type'] = message['confirmation_type']
self.conf_data.append(data)
self.confirmedHashes.append(message['hash'])
print("{} - {} blocks sent".format(str(time.time()), len(self.hashes)))
print("{} - {} blocks confirmed".format(str(time.time()), len(self.conf_data)))
except Exception as e:
print('Error!: ', e)

analysis = VoteAnalysis()
try:
Expand Down

0 comments on commit 227b0cc

Please sign in to comment.