Skip to content

Commit

Permalink
jobInfo[<int>] converted into dictionary.
Browse files Browse the repository at this point in the history
  • Loading branch information
avatar-lavventura committed Sep 15, 2018
1 parent a0e6ad6 commit d59e6ec
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 30 deletions.
14 changes: 7 additions & 7 deletions Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,21 @@ def isDriverOn(): #{

jobInfo = getJobInfo(clusterAddress, jobKey, index, eBlocBroker, web3)
userID = ""
if not jobInfo: #if not ',' in jobInfo or jobInfo == '':
log("jobInfo is returned as empty list. Geth might be closed", 'red')
if type(jobInfo) is str: #if not ',' in jobInfo or jobInfo == '':
log(jobInfo, 'red')
runFlag = 1
else: #{
log('jobOwner/userID: ' + jobInfo[6])
userID = jobInfo[6].lower()
log('jobOwner/userID: ' + jobInfo['jobOwner'])
userID = jobInfo['jobOwner'].lower()
userExist = isUserExist(userID, eBlocBroker, web3)

if jobInfo[0] == str(lib.job_state_code['COMPLETED']):
if jobInfo['status'] == str(lib.job_state_code['COMPLETED']):
log("Job is already completed.", 'red')
runFlag = 1
if jobInfo[0] == str(lib.job_state_code['REFUNDED']):
if jobInfo['status'] == str(lib.job_state_code['REFUNDED']):
log("Job is refunded.", 'red')
runFlag = 1
if runFlag == 0 and not jobInfo[0] == lib.job_state_code['PENDING']:
if runFlag == 0 and not jobInfo['status'] == lib.job_state_code['PENDING']:
log("Job is already captured and in process or completed.", 'red')
runFlag = 1
if 'False' in strCheck:
Expand Down
4 changes: 2 additions & 2 deletions contract/contracts/eBlocBroker.sol
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ contract eBlocBroker {
/* -----------------------------------------------------EVENTS---------------------------------------------------------*/
/* Records the submitted jobs' information under submitJob() method call.*/
event LogJob(address indexed clusterAddress,
string jobKey,
string indexed jobKey,
uint index,
uint8 storageID,
string desc,
Expand Down Expand Up @@ -403,7 +403,7 @@ contract eBlocBroker {
);

/* Records the refunded jobs' information under refund() method call. */
event LogCancelRefund(address indexed clusterAddress,
event LogCancelRefund(address clusterAddress,
string jobKey,
uint32 index
);
Expand Down
4 changes: 1 addition & 3 deletions contractCalls/LogJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import sys, asyncio, time
from web3.auto import w3


def handle_event(event):
'''
Expand Down Expand Up @@ -69,7 +68,6 @@ def runLogCancelRefund(fromBlock, clusterAddress, eBlocBroker): #{
return log_ret(myFilter, 2)
#}


def logJob(eBlocBroker=None): #{
if eBlocBroker is None: #{
import os
Expand Down Expand Up @@ -99,7 +97,7 @@ def logJob(eBlocBroker=None): #{
if __name__ == '__main__': #{
if len(sys.argv) == 2:
fromBlock = int(sys.argv[1])
clusterAddress = str(sys.argv[2]) # Only obtains jobs that are submitted to the cluster.
clusterAddress = str(sys.argv[2]) # Only obtains jobs that are submitted to the cluster.
logJob()
else:
fromBlock = 954795
Expand Down
25 changes: 21 additions & 4 deletions contractCalls/getJobInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ def getJobInfo(clusterAddress, jobKey, index, eBlocBroker=None, web3=None): #{
#}

clusterAddress = web3.toChecksumAddress(clusterAddress)
res = None
job = None

try:
res = eBlocBroker.functions.getJobInfo(clusterAddress, jobKey, int(index)).call()
job = eBlocBroker.functions.getJobInfo(clusterAddress, jobKey, int(index)).call()
jobDict = {'status': job[0], 'core': job[1], 'startTime': job[2], 'received': job[3], 'coreMinutePrice': job[4], 'coreMinuteGas': job[5], 'jobOwner': job[6]}
#nprint "dict['Name']: ", dict['Name']
except Exception:
return 'Exception: web3.exceptions.BadFunctionCallOutput'
return res
return jobDict
#}

if __name__ == '__main__': #{
Expand All @@ -31,6 +34,20 @@ def getJobInfo(clusterAddress, jobKey, index, eBlocBroker=None, web3=None): #{
clusterAddress = "0x4e4a0750350796164d8defc442a712b7557bf282"
jobKey = "153802737479941507912962421857730686964"
index = 0
jobKey = "QmRsaBEGcqxQcJbBxCi1LN9iz5bDAGDWR6Hx7ZvWqgqmdR" # Long Sleep Job
index = 4

jobInfo = getJobInfo(clusterAddress, jobKey, index)

if type(jobInfo) is dict:
print('{0: <16}'.format('status:') + str(jobInfo['status']))
print('{0: <16}'.format('core"') + str(jobInfo['core']))
print('{0: <16}'.format('startTime"') + str(jobInfo['startTime']))
print('{0: <16}'.format('received:') + str(jobInfo['received']))
print('{0: <16}'.format('coreMinutePrice:') + str(jobInfo['coreMinutePrice']))
print('{0: <16}'.format('coreMinuteGas:') + str(jobInfo['coreMinuteGas']))
print('{0: <16}'.format('jobInfoOwner:') + jobInfo['jobOwner'])

print(getJobInfo(clusterAddress, jobKey, index))
else:
print(jobInfo)
#}
8 changes: 4 additions & 4 deletions driverFunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ def sbatchCall(userID, resultsFolder, eBlocBroker, web3): #{
resultsFolder + '/' + jobKeyGlobal + '*' + str(indexGlobal) + '*' + str(storageIDGlobal) + '*' + shareTokenGlobal + '.sh']);

jobInfo = getJobInfo(lib.CLUSTER_ID, jobKeyGlobal, int(indexGlobal), eBlocBroker, web3)
jobCoreNum = jobInfo[1]
coreSecondGas = timedelta(seconds=int((int(jobInfo[5]) + 1) * 60)) # Client's requested seconds to run his/her job, 1 minute additional given.
jobCoreNum = str(jobInfo['core'])
coreSecondGas = timedelta(seconds=int((jobInfo['coreMinuteGas'] + 1) * 60)) # Client's requested seconds to run his/her job, 1 minute additional given.
d = datetime(1,1,1) + coreSecondGas
timeLimit = str(int(d.day)-1) + '-' + str(d.hour) + ':' + str(d.minute)

log("timeLimit: " + str(timeLimit) + "| RequestedCoreNum: " + str(jobCoreNum))
log("timeLimit: " + str(timeLimit) + "| RequestedCoreNum: " + jobCoreNum)

# cmd: sudo su - $userID -c "cd $resultsFolder && sbatch -c$jobCoreNum $resultsFolder/${jobKey}*${index}*${storageID}*$shareToken.sh --mail-type=ALL
# SLURM submit job, Real mode -N is used. For Emulator-mode -N use 'sbatch -c'
jobID = subprocess.check_output(['sudo', 'su', '-', userID, '-c',
'cd' + ' ' + resultsFolder + ' && ' + 'sbatch -N' + str(jobCoreNum) + ' ' +
'cd' + ' ' + resultsFolder + ' && ' + 'sbatch -N' + jobCoreNum + ' ' +
resultsFolder + '/' + jobKeyGlobal + '*' + str(indexGlobal) + '*' + str(storageIDGlobal) + '*' + shareTokenGlobal + '.sh' + ' ' +
'--mail-type=ALL']).decode('utf-8').strip()
jobID = jobID.split()[3]
Expand Down
16 changes: 8 additions & 8 deletions endCode.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def removeSourceCode(resultsFolderPrev): #{
filesToRemove = subprocess.check_output(['find', '.', '-type', 'f', '!', '-newer', resultsFolderPrev + '/timestamp.txt']).decode('utf-8').strip()
if filesToRemove is not '' or filesToRemove is not None:
log('\nFiles to be removed: \n' + filesToRemove + '\n')

# cmd: find . -type f ! -newer $resultsFolder/timestamp.txt -delete
subprocess.run(['find', '.', '-type', 'f', '!', '-newer', resultsFolderPrev + '/timestamp.txt', '-delete'])
#}
Expand Down Expand Up @@ -105,8 +105,8 @@ def endCall(jobKey, index, storageID, shareToken, folderName, jobID): #{
#}

log("JOB_INFO: " + ",".join(map(str, jobInfo)))

userID = jobInfo[6].replace("u'", "").replace("'", "").lower()
userID = jobInfo['jobOwner'].lower()
userIDAddr = hashlib.md5(userID.encode('utf-8')).hexdigest() # Convert Ethereum User Address into 32-bits
userInfo = getUserInfo(userID, '1', eBlocBroker, web3)

Expand Down Expand Up @@ -143,11 +143,11 @@ def endCall(jobKey, index, storageID, shareToken, folderName, jobID): #{

log("")

if jobInfo[0] == str(lib.job_state_code['COMPLETED']): #{
if jobInfo['status'] == str(lib.job_state_code['COMPLETED']): #{
log('Job is already get paid.', 'red')
sys.exit()
#}
clientTimeLimit = jobInfo[5]
clientTimeLimit = jobInfo['coreMinuteGas']
log("clientGasMinuteLimit: " + str(clientTimeLimit)) # Clients minuteGas for the job

countTry = 0
Expand All @@ -156,17 +156,17 @@ def endCall(jobKey, index, storageID, shareToken, folderName, jobID): #{
# sys.exit()
countTry += 1
log("Waiting... " + str(countTry * 60) + ' seconds passed.', 'yellow')
if jobInfo[0] == lib.job_state_code['RUNNING']: # It will come here eventually, when setJob() is deployed.
if jobInfo['status'] == lib.job_state_code['RUNNING']: # It will come here eventually, when setJob() is deployed.
log("Job has been started.", 'green')
break # Wait until does values updated on the blockchain

if jobInfo[0] == lib.job_state_code['COMPLETED']:
if jobInfo['status'] == lib.job_state_code['COMPLETED']:
log("Error: Already completed job is received.", 'red')
sys.exit() # Detects an error on the SLURM side

jobInfo = getJobInfo(lib.CLUSTER_ID, jobKey, index, eBlocBroker, web3)
#while jobInfo == "Connection refused" or jobInfo == "" or jobInfo == "Errno" : #{
while not jobInfo:
while not jobInfo: #TODO check
log("Error: Please run geth on the background.", 'red')
jobInfo = getJobInfo(lib.CLUSTER_ID, jobKey, index, eBlocBroker, web3)
time.sleep(5)
Expand Down
11 changes: 9 additions & 2 deletions jobResults.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@

sum1 += int(res[7]) - int(res[8])
jobInfo = getJobInfo(clusterAddress, jobKey, index, eBlocBroker, web3)
print(str(counter) + " " + res[1] + " " + res[2] + " " + res[3] + " | " + lib.job_state_code[jobInfo[0]] +
"," + jobInfo[1] + "," + jobInfo[2] + "," + jobInfo[3] + "," + jobInfo[4] + "," + jobInfo[5])

print(str(counter) + ' ' + res[1] + ' ' + res[2] + ' ' + res[3] + '|' +
'{0: <16}'.format('status:') + lib.job_state_code[str(jobInfo['status'])] + ' ' +
'{0: <16}'.format('core"') + str(jobInfo['core']) + ' ' +
'{0: <16}'.format('startTime"') + str(jobInfo['startTime']) + ' ' +
'{0: <16}'.format('received:') + str(jobInfo['received']) + ' ' +
'{0: <16}'.format('coreMinutePrice:') + str(jobInfo['coreMinutePrice']) + ' ' +
'{0: <16}'.format('coreMinuteGas:') + str(jobInfo['coreMinuteGas']) + ' ' +
'{0: <16}'.format('jobInfoOwner:') + jobInfo['jobOwner'])
counter += 1
#}
print(counter)
Expand Down

0 comments on commit d59e6ec

Please sign in to comment.