diff --git a/Dockerfile b/Dockerfile index 90f93c83..c6aea5f5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,10 @@ # syntax=docker/dockerfile:1 FROM golang:latest RUN apt-get install -y ca-certificates -RUN wget --no-check-certificate -q "https://dist.ipfs.io/go-ipfs/v0.19.0/go-ipfs_v0.19.0_linux-amd64.tar.gz" \ - && tar -xf "go-ipfs_v0.19.0_linux-amd64.tar.gz" \ - && rm -f go-ipfs_v0.19.0_linux-amd64.tar.gz \ +ARG IPFS_TAG=v0.19.0 +RUN wget --no-check-certificate -q "https://dist.ipfs.io/go-ipfs/"${IPFS_TAG}"/go-ipfs_"${IPFS_TAG}"_linux-amd64.tar.gz" \ + && tar -xf "go-ipfs_"${IPFS_TAG}"_linux-amd64.tar.gz" \ + && rm -f go-ipfs_${IPFS_TAG}_linux-amd64.tar.gz \ && cd go-ipfs \ && make install \ && ./install.sh @@ -89,6 +90,7 @@ ENV PATH="/opt/venv/bin:$PATH" WORKDIR /workspace RUN git clone https://github.com/ebloc/ebloc-broker.git WORKDIR /workspace/ebloc-broker + #: `pip install -e .` takes few minutes RUN git checkout dev >/dev/null 2>&1 \ && git fetch --all --quiet >/dev/null 2>&1 \ diff --git a/broker/.todo.org_archive b/broker/.todo.org_archive index 759b47bd..a9d21480 100644 --- a/broker/.todo.org_archive +++ b/broker/.todo.org_archive @@ -126,3 +126,13 @@ E: Error: connect 12D3KooWQ9eXnXRs5MeCP11tgvcWENsUs4JUMBBESfDX3QDGf548 failurfai * /ip4/68.183.79.44/tcp/4001 failed to negotiate security protocol: peer id mismatch: expected 12D3KooWQ9eXnXRs5MeCP11tgvcWENsUs4JUMBBESfDX3QDGf548, but remote key matches 12D3KooWP77QwRFYkEeat23hGNdDQ36VyCiEP56RaRf585QzauTD #+end_src + +* DONE during job submittion check web3 connection +CLOSED: [2023-04-05 Wed 15:45] +:PROPERTIES: +:ARCHIVE_TIME: 2023-04-05 Wed 15:45 +:ARCHIVE_FILE: ~/ebloc-broker/broker/todo.org +:ARCHIVE_OLPATH: TASKS +:ARCHIVE_CATEGORY: todo +:ARCHIVE_TODO: DONE +:END: diff --git a/broker/Driver.py b/broker/Driver.py index 7cd6a124..d81bad1b 100644 --- a/broker/Driver.py +++ b/broker/Driver.py @@ -406,7 +406,7 @@ def run_driver(given_bn): f"==> account_balance={math.floor(gwei_balance)} [blue]gwei[/blue] ≈ " f"{format(cfg.w3.fromWei(wei_amount, 'ether'), '.2f')} [blue]ether" ) - log(f"==> Ebb_token_balance={Cent(balance_temp)._to()} [blue]usd") + log(f"==> Overall Ebb_token_balance={Cent(balance_temp)._to()} [blue]usd") slurm.pending_jobs_check() first_iteration_flag = True while True: @@ -422,7 +422,7 @@ def run_driver(given_bn): if not first_iteration_flag: console_ruler() if isinstance(balance, int): - cc = Cent(int(balance) - int(balance_temp))._to() + cc = Cent(int(balance) - int(env.config["token_balance"]))._to() log(f"==> since driver started provider_gained_token={cc} usd") current_bn = Ebb.get_block_number() @@ -488,7 +488,11 @@ def reconnect(): def check_connection(): if not network.is_connected(): - reconnect() + try: + reconnect() + except Exception as e: + log(f"E: {e}") + if not network.is_connected(): time.sleep(15) else: diff --git a/broker/Pidfile.py b/broker/Pidfile.py old mode 100755 new mode 100644 diff --git a/broker/_cli/console.py b/broker/_cli/console.py index c2925866..1dcef6bc 100755 --- a/broker/_cli/console.py +++ b/broker/_cli/console.py @@ -27,7 +27,7 @@ def my_inline_function(): app = VulcanoApp() Ebb = cfg.Ebb -_log.IS_WRITE = False +_log.IS_WRITE = False # applies this all libs imported this t1 = threading.Thread(target=my_inline_function) t1.start() diff --git a/broker/_utils/_log.py b/broker/_utils/_log.py index b5a37499..cafca044 100644 --- a/broker/_utils/_log.py +++ b/broker/_utils/_log.py @@ -19,7 +19,10 @@ install() # for rich, show_locals=True # pretty.install() -IS_WRITE = True # if False disable write into file for the process +# if False disable write into file for the process +# heads up applies this all libs imported this +IS_WRITE = True + DRIVER_LOG = None IS_THREADING_MODE_PRINT = False thread_log_files: Dict[str, str] = {} @@ -157,7 +160,7 @@ def br(text, color="white"): def ok(): - return " " + br(" [g]OK[/g] ") + return " " + br(" [g]OK[/g] ") def _console_clear(): diff --git a/broker/_utils/tools.py b/broker/_utils/tools.py index 38715acf..c246e59e 100644 --- a/broker/_utils/tools.py +++ b/broker/_utils/tools.py @@ -573,7 +573,7 @@ def squeue() -> None: if len(f"{squeue_output}\n".split("\n", 1)[1]) > 0: # checks if the squeue output's line number is gretaer than 1 # log("view information about jobs located in the Slurm scheduling queue:", "yellow") - log(f"{squeue_output}{ok()}\n") + log(f"{squeue_output} {ok()}\n") def compare_files(fn1, fn2) -> bool: diff --git a/broker/bash_scripts/clean_for_new_test.sh b/broker/bash_scripts/clean_for_new_test.sh index dc06f794..5ae71064 100755 --- a/broker/bash_scripts/clean_for_new_test.sh +++ b/broker/bash_scripts/clean_for_new_test.sh @@ -24,25 +24,28 @@ for user in $(members eblocbroker | tr " " "\n"); do done BASE="/var/ebloc-broker" -mkdir -p $BASE/to_delete - -mv $BASE/* $BASE/to_delete >/dev/null 2>&1 -DIR=$BASE/to_delete/public -[ -d $DIR ] && mv $BASE/to_delete/public $BASE/ - -DIR=$BASE/to_delete/cache # do not delete files in /var/ebloc-broker/cache/ -[ -d $DIR ] && mv $DIR $BASE/ - -FILE=$BASE/to_delete/slurm_mail_prog.sh # recover slurm_mail_prog.sh -[ -f $FILE ] && mv $FILE $BASE/ +if [[ -d $BASE ]]; then + mkdir -p $BASE/to_delete + + mv $BASE/* $BASE/to_delete >/dev/null 2>&1 + DIR=$BASE/to_delete/public + [ -d $DIR ] && mv $BASE/to_delete/public $BASE/ + + DIR=$BASE/to_delete/cache # do not delete files in /var/ebloc-broker/cache/ + [ -d $DIR ] && mv $DIR $BASE/ + + FILE=$BASE/to_delete/slurm_mail_prog.sh # recover slurm_mail_prog.sh + [ -f $FILE ] && mv $FILE $BASE/ + + find /var/ebloc-broker/to_delete -name "*data_link*" | while read -r i + do + sudo umount -f $i/* >/dev/null 2>&1 + done + sudo rm -rf $BASE/to_delete + rm -f /var/ebloc-broker/cache/*.tar.gz + mkdir -p $BASE/cache +fi -find /var/ebloc-broker/to_delete -name "*data_link*" | while read -r i -do - sudo umount -f $i/* >/dev/null 2>&1 -done -sudo rm -rf $BASE/to_delete -rm -f /var/ebloc-broker/cache/*.tar.gz -mkdir -p $BASE/cache find $HOME/.ebloc-broker/*/* -mindepth 1 ! \ -regex '^./private\(/.*\)?' -delete >/dev/null 2>&1 @@ -62,11 +65,11 @@ rm -f $HOME/.ebloc-broker/ipfs.out rm -f $HOME/.ebloc-broker/ganache.out rm -f $HOME/.ebloc-broker/*.yaml~ -rm -f $BASE/geth_server.out -rm -f $BASE/.node-xmlhttprequest* -rm -f $BASE/ipfs.out -rm -f $BASE/modified_date.txt -rm -f $BASE/package-lock.json +rm -f $HOME/.ebloc-broker/geth_server.out +rm -f $HOME/.ebloc-broker/.node-xmlhttprequest* +rm -f $HOME/.ebloc-broker/ipfs.out +rm -f $HOME/.ebloc-broker/modified_date.txt +rm -f $HOME/.ebloc-broker/package-lock.json rm -rf ~/ebloc-broker/contract/build rm -rf ~/ebloc-broker/contract/reports @@ -91,10 +94,12 @@ done if [ "$(hostname)" = "homevm" ]; then echo "#> ln datasets for homevm" - ~/ebloc-broker/broker/test_setup/ln_medium_data.sh + ~/ebloc-broker/broker/bash_scripts/ln_medium_data.sh fi -echo -e "\n/var/ebloc-broker/" -CURRENT_DIR=$PWD -cd /var/ebloc-broker && fdfind . | as-tree && cd ~ -cd $CURRENT_DIR +if [[ -d $BASE ]]; then + echo -e "\n/var/ebloc-broker/" + CURRENT_DIR=$PWD + cd /var/ebloc-broker && fdfind . | as-tree && cd ~ + cd $CURRENT_DIR +fi diff --git a/broker/test_setup/ln_medium_data.sh b/broker/bash_scripts/ln_medium_data.sh similarity index 100% rename from broker/test_setup/ln_medium_data.sh rename to broker/bash_scripts/ln_medium_data.sh diff --git a/broker/eblocbroker_scripts/Contract.py b/broker/eblocbroker_scripts/Contract.py index 8f610db5..bb7bbab3 100644 --- a/broker/eblocbroker_scripts/Contract.py +++ b/broker/eblocbroker_scripts/Contract.py @@ -7,7 +7,7 @@ from contextlib import suppress from os.path import expanduser from typing import Union -from broker.imports import nc + from halo import Halo from pymongo import MongoClient from web3.exceptions import TransactionNotFound @@ -19,6 +19,7 @@ from broker._utils.yaml import Yaml from broker.config import env from broker.errors import QuietExit, Web3NotConnected +from broker.imports import nc from broker.libs.mongodb import MongoBroker from broker.utils import ipfs_to_bytes32, terminate from brownie.network.account import Account, LocalAccount diff --git a/broker/eblocbroker_scripts/get_block_number.py b/broker/eblocbroker_scripts/get_block_number.py index 2a095b9e..98900b57 100755 --- a/broker/eblocbroker_scripts/get_block_number.py +++ b/broker/eblocbroker_scripts/get_block_number.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from broker.eblocbroker_scripts.utils import Cent import sys from broker import cfg @@ -21,6 +22,8 @@ def main(): output = Ebb.get_block_number() if is_write_to_file: env.config["block_continue"] = output + balance_temp = Ebb.get_balance(env.PROVIDER_ID) + env.config["token_balance"] = int(balance_temp) else: log(f"block_number={output}") except Exception as e: diff --git a/broker/gdrive/submit.py b/broker/gdrive/submit.py index f8a29319..2645d64d 100755 --- a/broker/gdrive/submit.py +++ b/broker/gdrive/submit.py @@ -31,7 +31,7 @@ def _submit(job, provider, key, requester, required_confs): processed_logs = Ebb._eblocbroker.events.LogJob().processReceipt(tx_receipt, errors=DISCARD) log(vars(processed_logs[0].args)) try: - log(f"job_index={processed_logs[0].args['index']}{ok()}", "bold") + log(f"job_index={processed_logs[0].args['index']} {ok()}") except IndexError: log(f"E: Tx({tx_hash}) is reverted") except Exception as e: diff --git a/broker/imports.py b/broker/imports.py index 87464b61..9a585b70 100644 --- a/broker/imports.py +++ b/broker/imports.py @@ -2,6 +2,7 @@ import sys from contextlib import suppress + from broker import cfg, config from broker._utils.tools import log, print_tb, read_json, run from broker.config import env diff --git a/broker/libs/eudat.py b/broker/libs/eudat.py index 6c54d42e..7bfe32cd 100644 --- a/broker/libs/eudat.py +++ b/broker/libs/eudat.py @@ -148,7 +148,7 @@ def share_single_folder(folder_name, f_id): """Share given folder path with the user.""" if not config.oc.is_shared(folder_name): config.oc.share_file_with_user(folder_name, f_id, remote_user=True, perms=31) - log(f"sharing with [yellow]{f_id}[/yellow]{ok()}") + log(f"sharing with [yellow]{f_id}[/yellow] {ok()}") log("## Requester folder is already shared") @@ -240,7 +240,7 @@ def submit(provider, requester, job, required_confs=1): log(vars(processed_logs[0].args)) try: processed_logs[0].args["index"] - # log(f"[bold]job_index={processed_logs[0].args['index']}{ok()}") + # log(f"[bold]job_index={processed_logs[0].args['index']} {ok()}") except IndexError: log(f"E: Tx({tx_hash}) is reverted") else: diff --git a/broker/libs/gdrive.py b/broker/libs/gdrive.py index bba3eb93..2b4872da 100644 --- a/broker/libs/gdrive.py +++ b/broker/libs/gdrive.py @@ -131,7 +131,7 @@ def upload(folder_to_share, tmp_dir, is_source_code=False): is_file_exist = _list(tar_hash, is_folder=True) if is_file_exist: - log(f"## requested folder {tar_hash} is already uploaded", "bold blue") + log(f"## requested folder {tar_hash} is already uploaded", "blue") log(is_file_exist, "bg") key = is_file_exist.partition("\n")[0].split()[0] is_already_uploaded = True @@ -288,7 +288,7 @@ def parse_gdrive_info(gdrive_info): log(_dict) except: - log(gdrive_info, "bold yellow") + log(gdrive_info, "yellow") def size(key, mime_type, folder_name, gdrive_info, results_folder_prev, code_hashes, is_cached): diff --git a/broker/libs/mongodb.py b/broker/libs/mongodb.py index 17e58cd6..d743260e 100755 --- a/broker/libs/mongodb.py +++ b/broker/libs/mongodb.py @@ -4,11 +4,8 @@ from pymongo import MongoClient from rich.pretty import pprint -from broker._utils import _log from broker._utils._log import log -_log.IS_WRITE = False - class BaseMongoClass: def __init__(self, mc, collection) -> None: @@ -166,7 +163,7 @@ def main(): if args.is_delete_all: ebb_mongo.delete_shared_ids() output = ebb_mongo.delete_all() - log(f"mc['ebloc_broker']['cache'] is_deleted={output}") + log(f"mc['ebloc_broker']['cache'] is_deleted={output}", is_write=False) else: ebb_mongo.find_all_share_id() # output = ebb_mongo.get_job_state_running_tx("QmRD841sowPfgz8u2bMBGA5bYAAMPXxUb4J95H7YjngU4K", 37) diff --git a/broker/test_setup/job_cppr.yaml b/broker/test_setup/job_cppr.yaml index 543b2f26..add4cdb1 100644 --- a/broker/test_setup/job_cppr.yaml +++ b/broker/test_setup/job_cppr.yaml @@ -1,21 +1,21 @@ config: - requester_address: '0x740fcbc6d4e7e5102b8cba29370b93c6de4c786e' - provider_address: '0x4934a70ba8c1c3acfa72e809118bdd9048563a24' + requester_address: '0x4c2aebf67f8cfce387ad0ee3774bb193c4a62ef6' + provider_address: '0x1926b36af775e1312fdebcc46303ecae50d945af' source_code: - storage_id: b2drop + storage_id: ipfs_gpg cache_type: public path: ~/test_eblocbroker/run_cppr storage_hours: 0 data: data1: - hash: f71df9d36cd519d80a3302114779741d + hash: 779745f315060d1bc0cd44b7266fb4da data2: - hash: 1bfca57fe54bc46ba948023f754521d6 + hash: 45281dfec4618e5d20570812dea38760 data3: cache_type: public - path: /home/alper/test_eblocbroker/small/KZ2-sawtooth + path: /home/alper/test_eblocbroker/small/babyface.n6c10 storage_hours: 1 - storage_id: b2drop + storage_id: ipfs_gpg data_transfer_out: 5 jobs: job1: diff --git a/broker/test_setup/job_nas.yaml b/broker/test_setup/job_nas.yaml index 32b014e0..dd2f7f5e 100644 --- a/broker/test_setup/job_nas.yaml +++ b/broker/test_setup/job_nas.yaml @@ -1,8 +1,8 @@ config: - requester_address: '0x2c73a80956516ba9e6005030faed2f8212bc10a3' - provider_address: '0x1926b36af775e1312fdebcc46303ecae50d945af' + requester_address: '0x72c1a89ff3606aa29686ba8d29e28dccff06430a' + provider_address: '0x4934a70ba8c1c3acfa72e809118bdd9048563a24' source_code: - storage_id: ipfs_gpg + storage_id: b2drop cache_type: public path: ~/test_eblocbroker/NPB3.3-SER_source_code storage_hours: 0 diff --git a/broker/test_setup/submit_jobs.py b/broker/test_setup/submit_jobs.py index 04862774..53e89b91 100755 --- a/broker/test_setup/submit_jobs.py +++ b/broker/test_setup/submit_jobs.py @@ -1,29 +1,31 @@ #!/usr/bin/env python3 -from brownie import network import os.path import random import sys import time +from contextlib import suppress from datetime import datetime from pathlib import Path from random import randint -from broker.imports import nc + from pymongo import MongoClient from web3.logs import DISCARD -from contextlib import suppress + from broker import cfg from broker._utils import _log from broker._utils._log import console_ruler, ok from broker._utils.tools import _date, _timestamp, countdown, is_process_on, log, run from broker._utils.web3_tools import get_tx_status from broker._utils.yaml import Yaml +from broker.config import env +from broker.imports import nc from broker.libs import gdrive from broker.libs.mongodb import BaseMongoClass from broker.submit_base import SubmitBase from broker.test_setup.user_set import providers, requesters from broker.utils import print_tb -from broker.config import env +from brownie import network Ebb = cfg.Ebb cfg.IS_FULL_TEST = True @@ -37,7 +39,7 @@ PROVIDER_MAIL = "alper.alimoglu.research2@gmail.com" benchmarks = ["nas", "cppr"] -storage_ids = ["b2drop", "gdrive", "ipfs"] +storage_ids = ["gdrive", "ipfs", "b2drop"] ipfs_types = ["ipfs", "ipfs_gpg"] test_dir = Path.home() / "ebloc-broker" / "broker" / "test_setup" @@ -53,7 +55,7 @@ cfg.TEST_PROVIDERS = providers cfg.NETWORK_ID = "bloxberg_core" -_ruler = "===========================" +_ruler = "=======================" # for provider_addr in providers: # mini_tests_submit(storage_ids, provider_addr) @@ -240,7 +242,7 @@ def check_connection(): if not network.is_connected(): time.sleep(15) else: - log(f"#> bloxberg connection using network_id={cfg.NETWORK_ID}{ok()}", is_write=False) + log(f"#> bloxberg connection using network_id={cfg.NETWORK_ID} {ok()}", is_write=False) def run_job(counter) -> None: @@ -249,8 +251,6 @@ def run_job(counter) -> None: :param counter: counter index to keep track of submitted job number """ for idx, provider_addr in enumerate(providers): - check_connection() - breakpoint() # DEBUG # yaml_cfg["config"]["data"]["data3"]["storage_id"] = random.choice(storage_ids) storage_id = (idx + counter) % len(storage_ids) selected_benchmark = random.choice(benchmarks) @@ -260,14 +260,16 @@ def run_job(counter) -> None: if selected_benchmark == "nas": log( - f"{_ruler} Submitting the job from [cyan]NAS Benchmark[/cyan] to [g]{provider_addr} {_ruler}", + f"{_ruler} Submitting the job from [cyan]NAS Benchmark[/cyan] to [g]{provider_addr}[/g]\t", "bold blue", ) + check_connection() yaml_cfg = Yaml(nas_yaml_fn) benchmark_name = create_nas_job_script() elif selected_benchmark == "cppr": print(" ") - log(f"{_ruler} Submitting [cyan]job with cppr datasets[/cyan] data_set_idx={idx} {_ruler}\t") + log(f"{_ruler} Submitting [cyan]job[/cyan] with [cyan]cppr datasets[/cyan] data_set_idx={idx}\t") + check_connection() # log(f" * Attempting to submit [cyan]job with cppr datasets[/cyan] to_provider=[g]{provider_addr}") yaml_cfg = Yaml(cppr_yam_fn) hash_medium_data_0, hash_medium_data = create_cppr_job_script(idx) @@ -343,7 +345,7 @@ def main(): else: try: counter = 0 - for _ in range(160): + for _ in range(100): for _ in range(2): # submitted as batch is faster run_job(counter) counter += 1 diff --git a/broker/todo.org b/broker/todo.org index c54389be..6fa70204 100644 --- a/broker/todo.org +++ b/broker/todo.org @@ -39,12 +39,21 @@ Failed to share file: googleapi: Error 403: Rate limit exceeded. User message: " ** TODO fetch b2drop empty size ** TODO investigate store web3 ebb object and read reread from it using picle ** TODO -+ hide ipfs progress in submitting jobs - -+ Failed to get about: Get "https://www.googleapis.com/drive/v3/about?alt=json&fields=maxImportSizes%2CmaxUploadSize%2CstorageQuota%2Cuser": oauth2: cannot fetch token: 400 Bad Request +*** hide ipfs progress in submitting jobs +*** Failed to get about: +#+begin_src bash +Get "https://www.googleapis.com/drive/v3/about?alt=json&fields=maxImportSizes%2CmaxUploadSize%2CstorageQuota%2Cuser": oauth2: cannot fetch token: 400 Bad Request Response: { "error": "invalid_grant", "error_description": "Token has been expired or revoked." } +#+end_src -** TODO during job submittion check web3 connection +** TODO freeze at +#+begin_src bash +==> since driver started provider_gained_token=0 usd + * 2023-04-06 08:34:46 waiting new job to come since bn=19922833 +==> current_block=19922819 | sync_from=19922833 +## Waiting block number to be updated, it remains constant at 19922819 +Downloading meta_data.json -> /var/ebloc-broker/b68d6462f9762cfed7bbc7f90e6c6c7f/1KFwXV2UWnJwj25HS3wdrjcQxartboQAW_0/meta_data.json +#+end_src diff --git a/broker/utils.py b/broker/utils.py index 61968904..67be930b 100644 --- a/broker/utils.py +++ b/broker/utils.py @@ -423,6 +423,10 @@ def start_ipfs_daemon(_is_print=False) -> bool: and "Swarm listening on /ip6/" not in line and "WebUI:" not in line and "Initializing" not in line + and "Swarm." not in line + and "ipfs swarm" not in line + and "Computed default" not in line + and line != "" ): print(line)