Skip to content

Commit

Permalink
LogJob updated.
Browse files Browse the repository at this point in the history
  • Loading branch information
avatar-lavventura committed Sep 17, 2018
1 parent 8c37d68 commit 48a2c7a
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 96 deletions.
12 changes: 8 additions & 4 deletions Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,21 +319,25 @@ def isDriverOn(): #{
elif str(loggedJobs[i].args['storageID']) == '0':
log("New job has been received. IPFS call |" + time.ctime(), "green")
driverFunc.driverIpfsCall(loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']),
str(loggedJobs[i].args['storageID'], eBlocBroker, web3), hashlib.md5(userID.encode('utf-8')).hexdigest())
str(loggedJobs[i].args['storageID']), hashlib.md5(userID.encode('utf-8')).hexdigest(),
eBlocBroker, web3)
elif str(loggedJobs[i].args['storageID']) == '1':
log("New job has been received. EUDAT call |" + time.ctime(), "green")
driverFunc.driverEudatCall(loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']), userInfo[4],
hashlib.md5(userID.encode('utf-8')).hexdigest(), eBlocBroker, web3)
hashlib.md5(userID.encode('utf-8')).hexdigest(),
eBlocBroker, web3)
#thread.start_new_thread(driverFunc.driverEudatCall, (loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index'])))
elif str(loggedJobs[i].args['storageID']) == '2':
log("New job has been received. IPFS with miniLock call |" + time.ctime(), "green")
driverFunc.driverIpfsCall(loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']),
str(loggedJobs[i].args['storageID'], eBlocBroker, web3), hashlib.md5(userID.encode('utf-8')).hexdigest())
str(loggedJobs[i].args['storageID']), hashlib.md5(userID.encode('utf-8')).hexdigest(),
eBlocBroker, web3)
#thread.start_new_thread(driverFunc.driverIpfsCall, (loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']), str(loggedJobs[i].args['storageID']), submittedJob[5]))
elif str(loggedJobs[i].args['storageID']) == '3':
log("New job has been received. GitHub call |" + time.ctime(), "green")
driverFunc.driverGithubCall(loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']),
str(loggedJobs[i].args['storageID'], eBlocBroker, web3), hashlib.md5(userID.encode('utf-8')).hexdigest())
str(loggedJobs[i].args['storageID']), hashlib.md5(userID.encode('utf-8')).hexdigest(),
eBlocBroker, web3)
elif str(loggedJobs[i].args['storageID']) == '4':
log("New job has been received. Googe Drive call |" + time.ctime(), "green")
driverFunc.driverGdriveCall(loggedJobs[i].args['jobKey'], str(loggedJobs[i].args['index']), str(loggedJobs[i].args['storageID']),
Expand Down
119 changes: 74 additions & 45 deletions contractCalls/LogJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

import sys, asyncio, time
from web3.auto import w3


# -----------------
def handle_event(event):
'''
print(event.blockNumber)
print(event.args.clusterAddress)
print(event.args.jobKey)
print(event.args.index)
print(event.args.storageID)
'''

# print(event.blockNumber)
# print(event.args.clusterAddress)
# print(event.args.jobKey)
# print(event.args.index)
# print(event.args.storageID)

print(event)

def log_loop(event_filter, poll_interval): #{
Expand All @@ -33,7 +34,41 @@ def log_loop(event_filter, poll_interval): #{
#}
#}

def log_ret(event_filter, poll_interval): #{
def logJob(eBlocBroker=None): #{
if eBlocBroker is None: #{
import os
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from imports import connectEblocBroker
eBlocBroker = connectEblocBroker()
#}

loggedJobs = runLogJob(fromBlock, clusterAddress, eBlocBroker)

# print(myFilter.filter_params)
if len(loggedJobs) == 0:
log_loop(myFilter, 2)
else: #{
# print(myFilter.get_all_entries())
for i in range(0, len(loggedJobs)):
print(loggedJobs[i])
print(loggedJobs[i]['blockNumber'])
print(loggedJobs[i].args['clusterAddress'])
print(loggedJobs[i].args['jobKey'])
print(loggedJobs[i].args['index'])
print(loggedJobs[i].args['storageID'])
print(loggedJobs[i].args['desc'])
#}
#}
# -----------------

def getEbloBroker(): #{
import os
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from imports import connectEblocBroker
return connectEblocBroker()
#}

def logReturn(event_filter, poll_interval): #{
while True: #{
loggedJobs = event_filter.get_new_entries()
if len(loggedJobs) > 0:
Expand All @@ -42,7 +77,11 @@ def log_ret(event_filter, poll_interval): #{
#}
#}

def runLogJob(fromBlock, clusterAddress, eBlocBroker): #{
def runLogJob(fromBlock, clusterAddress, eBlocBroker=None): #{
if eBlocBroker is None: #{
eBlocBroker = getEbloBroker()
#}

myFilter = eBlocBroker.events.LogJob.createFilter(
fromBlock=int(fromBlock),
argument_filters={'clusterAddress': str(clusterAddress)}
Expand All @@ -52,55 +91,45 @@ def runLogJob(fromBlock, clusterAddress, eBlocBroker): #{
if len(loggedJobs) > 0:
return loggedJobs
else:
return log_ret(myFilter, 2)
return logReturn(myFilter, 2)
#}

def runLogCancelRefund(fromBlock, clusterAddress, eBlocBroker): #{
def runLogCancelRefund(fromBlock, clusterAddress, eBlocBroker=None): #{
if eBlocBroker is None: #{
eBlocBroker = getEbloBroker()
#}
myFilter = eBlocBroker.events.LogCancelRefund.createFilter(
fromBlock=int(fromBlock),
# argument_filters={'clusterAddress': str(clusterAddress)}
# argument_filters={'clusterAddress': str(clusterAddress)} #TODO: uncomment
)
loggedJobs = myFilter.get_all_entries()

if len(loggedJobs) > 0:
return loggedJobs
else:
return log_ret(myFilter, 2)
#}

def logJob(eBlocBroker=None): #{
if eBlocBroker is None: #{
import os
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from imports import connectEblocBroker
eBlocBroker = connectEblocBroker()
#}

loggedJobs = run(fromBlock, clusterAddress, eBlocBroker)

# print(myFilter.filter_params)
if len(loggedJobs) == 0:
log_loop(myFilter, 2)
else: #{
# print(myFilter.get_all_entries())
for i in range(0, len(loggedJobs)):
print(loggedJobs[i])
print(loggedJobs[i]['blockNumber'])
print(loggedJobs[i].args['clusterAddress'])
print(loggedJobs[i].args['jobKey'])
print(loggedJobs[i].args['index'])
print(loggedJobs[i].args['storageID'])
print(loggedJobs[i].args['desc'])
#}
return logReturn(myFilter, 2)
#}

if __name__ == '__main__': #{
if len(sys.argv) == 2:
if len(sys.argv) == 2: #{
fromBlock = int(sys.argv[1])
clusterAddress = str(sys.argv[2]) # Only obtains jobs that are submitted to the cluster.
logJob()
else:
#}
else: #{
fromBlock = 954795
clusterAddress = '0x4e4a0750350796164d8defc442a712b7557bf282'
logJob()
clusterAddress = '0x4e4a0750350796164d8defc442a712b7557bf282'
#}

loggedJobs = runLogJob(fromBlock, clusterAddress)
for i in range(0, len(loggedJobs)):
print(loggedJobs[i])
print(loggedJobs[i]['blockNumber'])
print(loggedJobs[i].args['clusterAddress'])
print(loggedJobs[i].args['jobKey'])
print(loggedJobs[i].args['index'])
print(loggedJobs[i].args['storageID'])
print(loggedJobs[i].args['desc'])
#}
#}


88 changes: 44 additions & 44 deletions contractCalls/submitJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,58 +9,27 @@
web3 = getWeb3()
eBlocBroker = connectEblocBroker(web3)

if __name__ == '__main__': #{
if len(sys.argv) == 10: #{
clusterAddress = web3.toChecksumAddress(str(sys.argv[1]))
blockReadFrom, coreNumber, pricePerMin = eBlocBroker.call().getClusterInfo(clusterAddress)
my_filter = eBlocBroker.eventFilter('LogCluster',{'fromBlock':int(blockReadFrom),'toBlock':int(blockReadFrom) + 1})
jobKey = str(sys.argv[2])
coreNum = int(sys.argv[3])
coreGasDay = int(sys.argv[4])
coreGasHour = int(sys.argv[5])
coreGasMin = int(sys.argv[6])
jobDescription = str(sys.argv[7])
storageID = int(sys.argv[8])
accountID = int(sys.argv[9])
#}
else: #{
# USER Inputs ================================================================
clusterAddress = web3.toChecksumAddress("0x4e4a0750350796164D8DefC442a712B7557BF282") #POA
# clusterAddress = web3.toChecksumAddress("0x75a4c787c5c18c587b284a904165ff06a269b48c") #POW

blockReadFrom, coreNumber, pricePerMin = eBlocBroker.functions.getClusterInfo(clusterAddress).call()
my_filter = eBlocBroker.eventFilter('LogCluster',{'fromBlock':int(blockReadFrom),'toBlock':int(blockReadFrom) + 1})
#jobKey = "3d8e2dc2-b855-1036-807f-9dbd8c6b1579=folderName"
jobKey = "QmRsaBEGcqxQcJbBxCi1LN9iz5bDAGDWR6Hx7ZvWqgqmdR"
# jobKey = "QmRsaBEGcqxQcJbBxCi1LN9iz5bDAGDWR6Hx7ZvWqgqmdR" # Long Sleep Job.
# jobKey = "QmefdYEriRiSbeVqGvLx15DKh4WqSMVL8nT4BwvsgVZ7a5" #"1-R0MoQj7Xfzu3pPnTqpfLUzRMeCTg6zG"
coreNum = 1
coreGasDay = 0
coreGasHour = 0
coreGasMin = 1
jobDescription = "Science"
storageID = 0
accountID = 0
# =============================================================================
#}


def submitJob(clusterAddress, jobKey, coreNum, coreGasDay, coreGasHour, coreGasMin, jobDescription, storageID, accountID): #{
clusterAddress = web3.toChecksumAddress(clusterAddress) #POA
# clusterAddress = web3.toChecksumAddress("0x75a4c787c5c18c587b284a904165ff06a269b48c") #POW
blockReadFrom, coreNumber, pricePerMin = eBlocBroker.functions.getClusterInfo(clusterAddress).call()
my_filter = eBlocBroker.eventFilter('LogCluster',{'fromBlock':int(blockReadFrom),'toBlock':int(blockReadFrom) + 1})

if not eBlocBroker.functions.isClusterExist(clusterAddress).call(): #{
print("Requested cluster's Ethereum Address \"" + clusterAddress + "\" does not exist.")
sys.exit()
return "Requested cluster's Ethereum Address \"" + clusterAddress + "\" does not exist."
#}

fromAccount = web3.eth.accounts[accountID]
fromAccount = web3.toChecksumAddress(fromAccount)

blockReadFrom, orcid = eBlocBroker.functions.getUserInfo(fromAccount).call()
if not eBlocBroker.functions.isUserExist(fromAccount).call(): #{
print("Requested user's Ethereum Address \"" + fromAccount + "\" does not exist.")
sys.exit()
return "Requested user's Ethereum Address \"" + fromAccount + "\" does not exist."
#}

if str(eBlocBroker.functions.isOrcIdVerified(orcid).call()) == '0': #{
print('User\'s orcid: ' + orcid + ' is not verified.')
sys.exit()
return 'User\'s orcid: ' + orcid + ' is not verified.'
#}

if storageID == 0 or storageID == 2: #{
Expand All @@ -76,15 +45,46 @@
msgValue = coreNum * pricePerMin * coreMinuteGas

if (storageID == 0 and len(jobKey) != 46) or (storageID == 2 and len(jobKey) != 46) or (storageID == 4 and len(jobKey) != 33): #{
print("jobKey's length does not match with its original length. Please check your jobKey.")
sys.exit()
return "jobKey's length does not match with its original length. Please check your jobKey."
#}

gasLimit = 4500000
if coreNum <= coreNumber and len(jobDescription) < 128 and int(storageID) < 5 and len(jobKey) <= 64 and coreMinuteGas != 0: #{
tx = eBlocBroker.transact({"from": fromAccount, "value": msgValue, "gas": gasLimit}).submitJob(clusterAddress, jobKey, coreNum, jobDescription, coreMinuteGas, storageID)
print('Tx: ' + tx.hex())
return 'Tx: ' + tx.hex()
#print('Value: ' + str(msgValue))
#print(clusterAddress + " " + jobKey + " " + str(coreNum) + " " + jobDescription + " " + str(coreMinuteGas) + " " + str(storageID))
#}
#}

if __name__ == '__main__': #{
if len(sys.argv) == 10: #{
clusterAddress = str(sys.argv[1])
jobKey = str(sys.argv[2])
coreNum = int(sys.argv[3])
coreGasDay = int(sys.argv[4])
coreGasHour = int(sys.argv[5])
coreGasMin = int(sys.argv[6])
jobDescription = str(sys.argv[7])
storageID = int(sys.argv[8])
accountID = int(sys.argv[9])
#}
else: #{
# USER Inputs ================================================================
clusterAddress = '0x4e4a0750350796164D8DefC442a712B7557BF282'
#jobKey = "3d8e2dc2-b855-1036-807f-9dbd8c6b1579=folderName"
jobKey = "QmRsaBEGcqxQcJbBxCi1LN9iz5bDAGDWR6Hx7ZvWqgqmdR"
# jobKey = "QmRsaBEGcqxQcJbBxCi1LN9iz5bDAGDWR6Hx7ZvWqgqmdR" # Long Sleep Job.
# jobKey = "QmefdYEriRiSbeVqGvLx15DKh4WqSMVL8nT4BwvsgVZ7a5" #"1-R0MoQj7Xfzu3pPnTqpfLUzRMeCTg6zG"
coreNum = 1
coreGasDay = 0
coreGasHour = 0
coreGasMin = 1
jobDescription = "Science"
storageID = 0
accountID = 0
# =============================================================================
#}
ret = submitJob(clusterAddress, jobKey, coreNum, coreGasDay, coreGasHour, coreGasMin, jobDescription, storageID, accountID)
print(ret)
#}
6 changes: 3 additions & 3 deletions driverFunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def log(strIn, color=''): #{
print(stylize(strIn, fg(color)))
else:
print(strIn)

txFile = open(lib.LOG_PATH + '/transactions/clusterOut.txt', 'a')
txFile.write(strIn + "\n")
txFile.close()
Expand Down Expand Up @@ -350,7 +350,7 @@ def driverIpfsCall(jobKey, index, storageID, userID, eBlocBroker, web3): #{
indexGlobal = index
storageIDGlobal = storageID

lib.isIpfsOn(os, time)
lib.isIpfsOn()

resultsFolder = lib.PROGRAM_PATH + "/" + userID + "/" + jobKey + "_" + index + '/JOB_TO_RUN'
resultsFolderPrev = lib.PROGRAM_PATH + "/" + userID + "/" + jobKey + "_" + index
Expand All @@ -368,7 +368,7 @@ def driverIpfsCall(jobKey, index, storageID, userID, eBlocBroker, web3): #{

ipfsCallCounter = 0
# cmd: bash $eblocPath/ipfsStat.sh $jobKey
isIPFSHashExist = subprocess.check_output(['bash', lib.EBLOCPATH + '/ipfsStat.sh', jobKey]).decode('utf-8').split()
isIPFSHashExist = subprocess.check_output(['bash', lib.EBLOCPATH + '/ipfsStat.sh', jobKey]).decode('utf-8').strip()
log(isIPFSHashExist)

if "CumulativeSize" in isIPFSHashExist: #{
Expand Down

0 comments on commit 48a2c7a

Please sign in to comment.