Skip to content

Commit

Permalink
update.
Browse files Browse the repository at this point in the history
  • Loading branch information
avatar-lavventura committed Apr 8, 2018
1 parent c12c9d3 commit 7f2b7e7
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 69 deletions.
21 changes: 10 additions & 11 deletions Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,22 @@ def isSlurmOn():
check = os.popen("cat $logPath/checkSinfoOut.txt").read()

if not "PARTITION" in str(check):
logTest("-------------------------- \n");
logTest("Error: sinfo returns emprty string, please run:\nsudo bash runSlurm.sh\n");
logTest('Error Message: \n' + check);
logTest("Error: sinfo returns emprty string, please run:\nsudo bash runSlurm.sh\n", "");
logTest('Error Message: \n' + check, "");
sys.exit();

if "sinfo: error" in str(check):
logTest("Error on munged: \n" + check)
logTest("Please Do:\n")
logTest("sudo munged -f")
logTest("/etc/init.d/munge start")
logTest("Error on munged: \n" + check, "")
logTest("Please Do:\n", "")
logTest("sudo munged -f", "")
logTest("/etc/init.d/munge start", "")
sys.exit()

yes = set(['yes', 'y', 'ye']);
no = set(['no' , 'n']);

if constants.WHOAMI == '':
print('Once please run: bash initialize.sh');
if constants.WHOAMI == '' or constants.EBLOCPATH == '' or constants.CLUSTER_ID == '':
print('Once please run: bash initialize.sh');
sys.exit();

isContractExist = os.popen('$contractCallPath/isContractExist.py').read();
Expand Down Expand Up @@ -96,7 +95,7 @@ def isSlurmOn():
sys.exit()
#}

deployedBlockNumber = os.popen('$contractCallPath/getDeployedBlockNumber.py').read();
deployedBlockNumber = os.popen('$contractCallPath/getDeployedBlockNumber.py').read().rstrip('\n');
blockReadFromContract=str(0)

logTest("clusterAddress: " + clusterID, "yellow")
Expand Down Expand Up @@ -211,7 +210,7 @@ def isSlurmOn():
os.environ['jobKey'] = submittedJob[2];
os.environ['index'] = submittedJob[3];

jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = jobInfo.split(',');

# Checks isAlreadyCaptured job or not. If it is completed job do not obtain it
Expand Down
44 changes: 40 additions & 4 deletions contractCalls/getJobInfo.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,47 @@
#!/usr/bin/env python
#!/usr/bin/env python

import os, json, sys, time
from web3 import Web3
from web3.providers.rpc import HTTPProvider
sys.path.insert(1, os.path.join(sys.path[0], '..')); import constants
os.chdir(sys.path[0]);
os.system('#!/bin/bash source $HOME/.venv-py3/bin/activate')



web3 = Web3(HTTPProvider('http://localhost:' + str(constants.RPC_PORT)))

fileAddr = open("address.json", "r")
contractAddress = fileAddr.read().replace("\n", "")

with open('abi.json', 'r') as abi_definition:
abi = json.load(abi_definition)

contractAddress = web3.toChecksumAddress(contractAddress);
eBlocBroker = web3.eth.contract(contractAddress, abi=abi);

if __name__ == '__main__': #{
if(len(sys.argv) == 4):
clusterAddress = str(sys.argv[1]);
jobKey = str(sys.argv[2]);
index = int(sys.argv[3]);
else:
clusterAddress = "0x6af0204187a93710317542d383a1b547fa42e705";
jobKey = "3d8e2dc2-b855-1036-807f-9dbd8c6b1579=117649886378445811229351254502963812811";
index = 3;
#jobKey = "QmTXyUrHxkf2m85W6Sy6VAMBuZyZAuSDQAbjSgDcLLnEdW";
#index = 4;
clusterAddress = web3.toChecksumAddress(clusterAddress);
print(eBlocBroker.call().getJobInfo(clusterAddress, jobKey, index));
#}




'''
web3 = Web3(HTTPProvider('http://localhost:' + str(constants.RPC_PORT)))
#web3 = Web3(HTTPProvider('http://localhost:8545'))
fileAddr = open("address.json", "r")
contractAddress = fileAddr.read().replace("\n", "")
Expand All @@ -17,7 +52,7 @@
eBlocBroker = web3.eth.contract(contractAddress, abi=abi);
if __name__ == '__main__': #{
if(len(sys.argv) == 4):
if(len(sys.argv) == 4):
clusterAddress = str(sys.argv[1]);
jobKey = str(sys.argv[2]);
index = int(sys.argv[3]);
Expand All @@ -27,6 +62,7 @@
index = 3;
#jobKey = "QmTXyUrHxkf2m85W6Sy6VAMBuZyZAuSDQAbjSgDcLLnEdW";
#index = 4;
clusterAddress = web3.toChecksumAddress(clusterAddress);
print(eBlocBroker.functions.getJobInfo(clusterAddress, jobKey, index).call());
clusterAddress = web3.toChecksumAddress(clusterAddress);
print(eBlocBroker.functions.getJobInfo(clusterAddress, jobKey, index).call());
#}
'''
12 changes: 7 additions & 5 deletions contractCalls/submitJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,28 @@ def isIpfsOn():
accountID = int(sys.argv[10]);
else:
# USER Inputs----------------------------------------------------------------
clusterAddress = "0x6af0204187a93710317542d383a1b547fa42e705";
clusterAddress = "0xda1e61e853bb8d63b1426295f59cb45a34425b63";
clusterAddress = web3.toChecksumAddress(clusterAddress);
blockReadFrom, coreNumber, pricePerMin = eBlocBroker.functions.getClusterInfo(clusterAddress).call();
my_filter = eBlocBroker.eventFilter('LogCluster',{'fromBlock':int(blockReadFrom),'toBlock':int(blockReadFrom) + 1})
jobKey = "3d8e2dc2-b855-1036-807f-9dbd8c6b1579=folderName"; # "QmefdYEriRiSbeVqGvLx15DKh4WqSMVL8nT4BwvsgVZ7a5"
#jobKey = "3d8e2dc2-b855-1036-807f-9dbd8c6b1579=folderName";
jobKey = "QmefdYEriRiSbeVqGvLx15DKh4WqSMVL8nT4BwvsgVZ7a5"
coreNum = 1;
coreGasDay = 0;
coreGasHour = 0;
coreGasMin = 1;
jobDescription = "Science";
storageType = 1;
storageType = 0;
myMiniLockId = "";
accountID = 0;
# ----------------------------------------------------------------------------
if storageType == 0 or storageType == 2:
isIpfsOn();
output = os.popen('ipfs swarm connect ' + my_filter.get_all_entries()[0].args['ipfsAddress']).read();
strVal = my_filter.get_all_entries()[0].args['ipfsAddress'];
print("Trying to connect into: " + strVal);
output = os.popen('ipfs swarm connect ' + strVal).read();
print(output)

sys.exit();
coreMinuteGas = coreGasMin + coreGasHour * 60 + coreGasDay * 1440;
msgValue = coreNum * pricePerMin * coreMinuteGas;

Expand Down
7 changes: 3 additions & 4 deletions driverFunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def sbatchCall(): #{
time.sleep(0.25)

os.system("cp run.sh ${jobKey}_${index}_${folderIndex}_${shareToken}_$miniLockId.sh");
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = jobInfo.split(',');
jobCoreNum = jobInfo[1]

Expand Down Expand Up @@ -215,8 +215,7 @@ def driverIpfsCall(jobKey, index, folderType, miniLockId): #{
os.environ['jobSavePath'] = jobSavePath

if not os.path.isdir(jobSavePath): # If folder does not exist
os.environ['mkdirPath'] = jobSavePath;
os.system("mkdir $mkdirPath");
os.system("mkdir -p " + jobSavePath);

os.chdir(jobSavePath);
if os.path.isfile(jobKey):
Expand Down Expand Up @@ -269,7 +268,7 @@ def driverIpfsCall(jobKey, index, folderType, miniLockId): #{
break;
else:
logTest("Error: Please run Parity or Geth on the background.**************************************************************")
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ", "")[1:-1];
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ", "")[1:-1];
jobInfo = jobInfo.split(',');
jobCoreNum = jobInfo[1];
'''
Expand Down
74 changes: 42 additions & 32 deletions endCode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,63 @@ def logTest(strIn):
def endCall(jobKey, index, storageType, shareToken, miniLockId, folderName):
endTimeStamp = os.popen('date +%s').read();

global jobKeyGlobal; jobKeyGlobal=jobKey
global indexGlobal; indexGlobal=index;
global jobKeyGlobal; jobKeyGlobal = jobKey
global indexGlobal; indexGlobal = index;

os.environ['endTimeStamp'] = endTimeStamp;
logTest("endTimeStamp: " + endTimeStamp)

# Paths--------------------------------------
eblocPath = constants.EBLOCPATH;
contractCallPath = constants.EBLOCPATH + '/contractCalls'; os.environ['contractCallPath'] = contractCallPath;
logPath = constants.LOG_PATH;
programPath = constants.PROGRAM_PATH;
# -------------------------------------------
encodedShareToken = base64.b64encode(shareToken + ':')
#os.chdir(constants.EBLOCPATH + '/contractCalls');

encodedShareToken = '';
if shareToken != '-1':
encodedShareToken = base64.b64encode(shareToken + ':')

header = "var eBlocBroker = require('" + eblocPath + "/eBlocBrokerHeader.js')"; os.environ['header'] = header;
header = "var eBlocBroker = require('" + constants.EBLOCPATH + "/eBlocBrokerHeader.js')"; os.environ['header'] = header;

clusterID = constants.CLUSTER_ID;
os.environ['programPath'] = str(programPath)
os.environ['programPath'] = str(programPath);
os.environ['clusterID'] = clusterID;
os.environ['jobKey'] = jobKey
os.environ['index'] = str(index)
os.environ["IPFS_PATH"] = constants.IPFS_REPO # Default IPFS repo path
os.environ['eblocPath'] = eblocPath
os.environ['encodedShareToken'] = encodedShareToken
os.environ['clientMiniLockId'] = miniLockId
os.environ['jobName'] = folderName
os.environ['storageType'] = str(storageType)
os.environ['jobKey'] = jobKey;
os.environ['index'] = str(index);
os.environ["IPFS_PATH"] = constants.IPFS_REPO; # Default IPFS repo path
os.environ['eblocPath'] = constants.EBLOCPATH;
os.environ['encodedShareToken'] = encodedShareToken;
os.environ['clientMiniLockId'] = miniLockId;
os.environ['jobName'] = folderName;
os.environ['storageType'] = str(storageType);

fDate = open(programPath + '/' + jobKey + "_" + index + '/modifiedDate.txt', 'r')
modifiedDate = fDate.read().rstrip('\n');
os.environ['modifiedDate'] = modifiedDate; fDate.close()
logTest(modifiedDate)
logTest(jobKey + ' ' + index + ' ' + storageType + ' ' + shareToken + ' ' + miniLockId + ' ' + folderName);

logTest("jobKey: " + jobKey);
logTest("index: " + index);
logTest("storageType: " + storageType);
logTest("shareToken: " + shareToken);
logTest("encodedShareToken: " + encodedShareToken);
logTest("miniLockId: " + miniLockId);
logTest("folderName: " + folderName);

jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
logTest(os.popen('echo $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n'));

logTest("keyHash: " + jobKey )
logTest("Index: " + index )
logTest("storageType: " + storageType )
logTest("shareToken: |" + shareToken + "|")
logTest("encodedShareToken: |" + encodedShareToken + "|")
logTest("miniLockId: |" + miniLockId + "|")

jobInfo = os.popen('python $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
while(True):
if(not(jobInfo == "Connection refused" or jobInfo == "" or jobInfo == "Errno")):
if not(jobInfo == "Connection refused" or jobInfo == "" or jobInfo == "Errno"):
break;
else:
logTest('jobInfo: ' + jobInfo);
logTest("Error: Please run Parity or Geth on the background. or unlock your Cluster Ethereum Account**************************************")
jobInfo = os.popen('python $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
logTest(os.popen('echo $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n'));
logTest(os.popen('whoami').read().rstrip('\n'));
logTest(os.popen('pwd').read().rstrip('\n'));
logTest("Error: Please run Parity or Geth on the background. *****")
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
time.sleep(1)

logTest("JOB_INFO:" + jobInfo)
Expand All @@ -86,21 +95,21 @@ def endCall(jobKey, index, storageType, shareToken, miniLockId, folderName):
logTest( "Error: Already completed job...");
sys.exit(); # Detects an error on the SLURM side

jobInfo = os.popen('python $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
while(True):
if(not(jobInfo == "Connection refused" or jobInfo == "" or jobInfo == "Errno")):
break;
else:
logTest("Error: Please run Parity or Geth on the background.****************************")
jobInfo = os.popen('python $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = os.popen('$contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
time.sleep(1)
jobInfo = jobInfo.split(',');
time.sleep(30) # Short sleep here so this loop is not keeping CPU busy
#}

logTest("jobName: " + str(folderName));
jobId = os.popen("sacct --name $jobName.sh -n | awk '{print $1}' | head -n 1 | sed -r 's/[.batch]+//g' ").read();
os.environ['jobId'] = jobId; ################
os.environ['jobId'] = jobId;
logTest("JOBID ------------> " + str(jobId));

# Here we know that job is already completed
Expand Down Expand Up @@ -221,17 +230,18 @@ def endCall(jobKey, index, storageType, shareToken, miniLockId, folderName):
#os.system("rm -rf " + programPath + '/' + jobKey + "_" + index); # Deleted downloaded code from local since it is not needed anymore
#}
logTest("ReceiptHash: " + transactionHash);
txFile = open(logPath + '/transactions/' + clusterID + '.txt', 'a');
txFile = open(constants.LOG_PATH + '/transactions/' + clusterID + '.txt', 'a');
txFile.write(transactionHash + " end_receiptCheck\n");
txFile.close();

if __name__ == '__main__': #{
#jobKey, index, storageType, shareToken, miniLockId, folderName
jobKey = sys.argv[1];
index = sys.argv[2];
storageType = sys.argv[3];
shareToken = sys.argv[4];
miniLockId = sys.argv[5];
runName = sys.argv[6];
folderName = sys.argv[6];

endCall(jobKey, index, storageType, shareToken, miniLockId, runName)
endCall(jobKey, index, storageType, shareToken, miniLockId, folderName)
#}
8 changes: 4 additions & 4 deletions initialize.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,21 @@ source $HOME/.profile
currentDir=$PWD;
# Folder Setup:--------------------------------------
if [ ! -d $HOME/.eBlocBroker ]; then
mkdir $HOME/.eBlocBroker;
mkdir -p $HOME/.eBlocBroker;
fi

cd $HOME/.eBlocBroker

if [ ! -d transactions ]; then
mkdir transactions
mkdir -p transactions
fi

if [ ! -d ipfsHashes ]; then
mkdir ipfsHashes
mkdir -p ipfsHashes
fi

if [ ! -d endCodeAnalyse ]; then
mkdir endCodeAnalyse
mkdir -p endCodeAnalyse
fi

touch $HOME/.eBlocBroker/transactions/clusterOut.txt
Expand Down
3 changes: 1 addition & 2 deletions jobResults.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@

sum1 += (int(res[7]) - int(res[8]))

jobInfo = os.popen('python $contractCallPath/getJobInfo.py $clusterID $jobKey $index').read().rstrip('\n').replace(" ","")[1:-1];
jobInfo = os.popen('python $contractCallPath/getJobInfo.py $clusterID $jobKey $index 2>/dev/null').read().rstrip('\n').replace(" ","")[1:-1];
r=jobInfo.split(',')

print(str(counter) + " " + res[1] + " " + res[2] + " " + res[3] + " | " + constants.job_state_code[r[0]] + "," + r[1] + "," + r[2] + "," + r[3] + "," + r[4] + "," + r[5]);
counter = counter + 1;

print(counter)
print("GAINED: " + str(sum1));

8 changes: 7 additions & 1 deletion killall.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#!/bin/bash

sudo pkill -f "Driver.py"
sudo pkill -f "endCode.py"

pids=$(ps aux | grep "[e]ndCode" | awk '{print $2}')
for word in $pids
do
sudo kill -9 $word
done


3 changes: 2 additions & 1 deletion runIPFS.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#!/bin/bash
nohup ipfs daemon > ipfs.out&

ipfs daemon > ipfs.out&
6 changes: 1 addition & 5 deletions runSlurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ sudo slurmd
sudo munged -f
sudo /etc/init.d/munge start
slurmdbd &

if [ ! -d /tmp/slurmstate ]; then
mkdir /tmp/slurmstate
fi
mkdir -p /tmp/slurmstate

slurmctld -c
#slurmctld -cDvvvvvv

sinfo
1 change: 1 addition & 0 deletions startCode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python

import sys, os, constants, time

def startCall( jobKey, index ):
Expand Down

0 comments on commit 7f2b7e7

Please sign in to comment.