diff --git a/.depreciated/imports.py b/.depreciated/imports.py new file mode 100644 index 00000000..a8f8b312 --- /dev/null +++ b/.depreciated/imports.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 + +from web3 import IPCProvider, Web3 +from web3.middleware import geth_poa_middleware +from web3.providers.rpc import HTTPProvider + +from broker.utils import is_geth_on, run, terminate + + +def _connect_to_web3() -> None: + """Connect to web3 of the corresponding ethereum blockchain. + + * bloxberg: + __ https://bloxberg.org + """ + if env.IS_GETH_TUNNEL or not env.IS_EBLOCPOA: + if env.IS_BLOXBERG: + # TODO you can use brownie's connected web3? + cfg.w3 = Web3(HTTPProvider("https://core.bloxberg.org")) + else: + cfg.w3 = Web3(HTTPProvider(f"http://localhost:{env.RPC_PORT}")) + else: + cfg.w3 = Web3(IPCProvider(env.DATADIR.joinpath("geth.ipc"))) + #: inject the poa compatibility middleware to the innermost layer + cfg.w3.middleware_onion.inject(geth_poa_middleware, layer=0) + + +def connect_to_web3() -> None: + """Connect to the private ethereum network using web3. + + Note that you should create only one RPC Provider per process, as it + recycles underlying TCP/IP network connections between your process and + Ethereum node + """ + if cfg.w3.isConnected(): + return + + web3_ipc_fn = env.DATADIR.joinpath("geth.ipc") + for _ in range(5): + _connect_to_web3() + if not cfg.w3.isConnected(): + try: + if env.IS_GETH_TUNNEL: + raise Exception("Web3ConnectError: try tunnelling: ssh -f -N -L 8545:localhost:8545 username@") + + if env.IS_BLOXBERG: + log("E: web3 is not connected into [green]BLOXBERG[/green]") + else: + is_geth_on() + except QuietExit: + pass + except Exception as e: + print_tb(e) + sys.exit(1) + + if not env.IS_GETH_TUNNEL and not env.IS_BLOXBERG: + log( + "E: If web3 is not connected please start geth server and give permission \n" + "to /private/geth.ipc file doing: ", + end="", + ) + log(f"sudo chown $(logname) {web3_ipc_fn}", "green") + log(f"#> running `sudo chown $(whoami) {web3_ipc_fn}`") + run(["sudo", "chown", env.WHOAMI, web3_ipc_fn]) + else: + break + else: + terminate(is_traceback=False) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d2b96f6a..dde52807 100755 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,16 +22,17 @@ repos: entry: bash -c 'mypy "$@" || true' -- - repo: https://github.com/psf/black - rev: 22.3.0 # Replace by any tag/version: https://github.com/psf/black/tags + rev: 23.3.0 # Replace by any tag/version: https://github.com/psf/black/tags hooks: - id: black name: black - exclude: ^.old_work/ + exclude: ^.depreciated/ - repo: https://gitlab.com/pycqa/flake8 rev: 3.8.3 hooks: - id: flake8 +exclude: ".depreciated/.*|.old_work/.*" default_language_version: python: python3 diff --git a/broker/Driver.py b/broker/Driver.py index 6a151854..16a515a4 100644 --- a/broker/Driver.py +++ b/broker/Driver.py @@ -14,12 +14,14 @@ from typing import List import zc.lockfile +from halo import Halo from ipdb import launch_ipdb_on_exception from broker import cfg, config +from broker._import import check_connection from broker._utils import _log -from broker._utils._log import console_ruler, log, ok -from broker._utils.tools import is_process_on, kill_process_by_name, print_tb, squeue +from broker._utils._log import console_ruler, log +from broker._utils.tools import _date, is_process_on, kill_process_by_name, print_tb, squeue from broker.config import env, setup_logger from broker.drivers.b2drop import B2dropClass from broker.drivers.gdrive import GdriveClass @@ -31,6 +33,7 @@ from broker.lib import eblocbroker_function_call, pre_check, run_storage_thread, session_start_msg, state from broker.libs import eudat, gdrive, slurm from broker.libs.user_setup import give_rwe_access, user_add +from broker.python_scripts.add_bloxberg_into_network_config import read_network_config from broker.utils import ( StorageID, check_ubuntu_packages, @@ -259,7 +262,9 @@ def process_logged_job(self, idx): log("==> [yellow]job_info:", "bold") log(self.job_info) except Exception as e: - print_tb(e) + if "Max retries exceeded with url" not in str(e): + print_tb(e) + return for job in range(1, len(self.job_info["core"])): @@ -317,11 +322,21 @@ def process_logged_jobs(self): except JobException as e: log(str(e)) except Exception as e: - print_tb(e) + if "Max retries exceeded with url" not in str(e): + print_tb(e) + log(str(e)) # breakpoint() # DEBUG +def check_bn_progress(current_bn, bn_read): + while current_bn < int(bn_read): + current_bn = Ebb.get_block_number() + time.sleep(4) + + return current_bn + + def run_driver(given_bn): """Run the main driver script for eblocbroker on the background.""" # dummy sudo command to get the password when session starts for only to @@ -430,15 +445,10 @@ def run_driver(given_bn): log(f" * {get_date()} waiting new job to come since bn={bn_read}") log(f"==> current_block={current_bn} | sync_from={bn_read} ", end="") - flag = True - while current_bn < int(bn_read): - current_bn = Ebb.get_block_number() - if flag: - log() - log(f"## Waiting block number to be updated, it remains constant at {current_bn}") - - flag = False - time.sleep(2) + if current_bn < int(bn_read): + msg = f"warning: waiting block number to be updated, it remains constant at {current_bn} " + with Halo(text=msg, text_color="yellow", spinner="line", placement="right"): + current_bn = check_bn_progress(current_bn, bn_read) bn_read = str(bn_read) # reading events' block number has been updated if not first_iteration_flag: @@ -458,9 +468,8 @@ def run_driver(given_bn): bn_read = env.config["block_continue"] = current_bn except Exception as e: if "Filter not found" in str(e) or "Read timed out" in str(e): - # HTTPSConnectionPool(host='core.bloxberg.org', port=443): Read timed out. (read timeout=10) try: - log() + # HTTPSConnectionPool(host='core.bloxberg.org', port=443): Read timed out. (read timeout=10) nc(env.BLOXBERG_HOST, 8545) except Exception: log(f"E: Failed to make TCP connecton to {env.BLOXBERG_HOST}") @@ -473,32 +482,6 @@ def run_driver(given_bn): raise e -def reconnect(): - log(f"E: {network.show_active()} is not connected through {env.BLOXBERG_HOST}") - if cfg.NETWORK_ID == "bloxberg": - cfg.NETWORK_ID = "bloxberg_core" - elif cfg.NETWORK_ID == "bloxberg_core": - with suppress(Exception): - nc("alpy-bloxberg.duckdns.org", 8545) - cfg.NETWORK_ID = "bloxberg" - - log(f"Trying at {cfg.NETWORK_ID} ...") - network.connect(cfg.NETWORK_ID) - - -def check_connection(): - if not network.is_connected(): - try: - reconnect() - except Exception as e: - log(f"E: {e}") - - if not network.is_connected(): - time.sleep(15) - else: - log(f"bloxberg connection {ok()}", is_write=False) - - def _run_driver(given_bn, lock): config.logging = setup_logger(_log.DRIVER_LOG) while True: @@ -517,7 +500,7 @@ def _run_driver(given_bn, lock): print_tb(e) check_connection() - console_ruler(character="*") + log(f"#> -=-=-=-=-=-=-=-=-=- [g]RESTARTING[/g] {_date()} -=-=-=-=-=-=-=-=-=- [blue]<#", is_write=False) continue finally: with suppress(Exception): @@ -548,7 +531,9 @@ def main(args): log(f"rootdir: {os.getcwd()}", h=False) log(f"logfile: {_log.DRIVER_LOG}", h=False) - log(f"Attached to host RPC client listening at '{env.BLOXBERG_HOST}'", h=False) + bloxberg_host = read_network_config(cfg.NETWORK_ID) + log(f"Attached to host RPC client listening at '{bloxberg_host}'", h=False) + print() is_driver_on(process_count=1, is_print=False) try: diff --git a/broker/_daemons/ganache.py b/broker/_daemons/ganache.py index 64898350..d19a259b 100755 --- a/broker/_daemons/ganache.py +++ b/broker/_daemons/ganache.py @@ -41,13 +41,6 @@ def main(): if len(sys.argv) == 2: port = int(sys.argv[1]) - # try: - # npm_package = "ganache-cli" - # if not is_npm_installed(npm_package): - # log(f"E: {npm_package} is not installed within npm") - # sys.exit() - # except Exception as e: - # print_tb(e) if not is_ganache_on(8547): run(port) diff --git a/broker/_import.py b/broker/_import.py new file mode 100644 index 00000000..a6e1e42c --- /dev/null +++ b/broker/_import.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import time +from contextlib import suppress + +from broker import cfg +from broker._utils._log import ok +from broker.config import env +from broker.imports import nc +from broker.utils import log +from brownie import network + + +def reconnect(): + log(f"E: {network.show_active()} is not connected through {env.BLOXBERG_HOST}") + if cfg.NETWORK_ID == "bloxberg": + cfg.NETWORK_ID = "bloxberg_core" + elif cfg.NETWORK_ID == "bloxberg_core": + with suppress(Exception): + nc("alpy-bloxberg.duckdns.org", 8545) + cfg.NETWORK_ID = "bloxberg" + + log(f"Trying at {cfg.NETWORK_ID}... ", end="") + network.connect(cfg.NETWORK_ID) + log(ok()) + + +def check_connection(): + if not network.is_connected(): + try: + reconnect() + except Exception as e: + log(f"E: {e}") + + if not network.is_connected(): + time.sleep(15) + else: + log(f"bloxberg connection through [g]{cfg.NETWORK_ID}[/g] {ok()}", is_write=False) diff --git a/broker/_utils/_log.py b/broker/_utils/_log.py index cafca044..21a413b3 100644 --- a/broker/_utils/_log.py +++ b/broker/_utils/_log.py @@ -327,7 +327,7 @@ def log( text = base_str.join(textwrap.wrap(text, 120, break_long_words=False, break_on_hyphens=False)) if is_wrap: - text = "\n".join(textwrap.wrap(text, 80, break_long_words=False, break_on_hyphens=False)) + text = "\n".join(textwrap.wrap(text, 120, break_long_words=False, break_on_hyphens=False)) if is_write and IS_WRITE: if threading.current_thread().name != "MainThread" and cfg.IS_THREADING_ENABLED: diff --git a/broker/_utils/tools.py b/broker/_utils/tools.py index c246e59e..42bf7cbc 100644 --- a/broker/_utils/tools.py +++ b/broker/_utils/tools.py @@ -112,6 +112,7 @@ def PrintException() -> str: def print_tb(message=None, is_print_exc=True) -> None: """Log the traceback.""" + log(f"{WHERE()} ", end="") if message: if isinstance(message, QuietExit): if str(message): diff --git a/broker/_watch/watch.py b/broker/_watch/watch.py index 6dfa3df3..9ab7a286 100755 --- a/broker/_watch/watch.py +++ b/broker/_watch/watch.py @@ -258,7 +258,7 @@ def main(): if len(sys.argv) == 2: eth_address = sys.argv[1] - from_block = 19926110 + from_block = 19954408 watch(eth_address, from_block) diff --git a/broker/bash_scripts/clean_for_new_test.sh b/broker/bash_scripts/clean_for_new_test.sh index d55cdbaa..87d9dded 100755 --- a/broker/bash_scripts/clean_for_new_test.sh +++ b/broker/bash_scripts/clean_for_new_test.sh @@ -15,7 +15,7 @@ clean_gdrive () { # update block.continue.txt with the current block number python3 -uB $HOME/ebloc-broker/broker/eblocbroker_scripts/get_block_number.py True -squeue | tail -n+2 | awk '{print $1}' | xargs scancel 2> /dev/null +# timeout 2 squeue | tail -n+2 | awk '{print $1}' | xargs scancel 2> /dev/null # remove created users users for user in $(members eblocbroker | tr " " "\n"); do @@ -79,7 +79,6 @@ rm -f /tmp/run/driver_popen.pid >/dev/null 2>&1 rm -f ~/.ebloc-broker/.oc_client.pckl rm -f /var/ebloc-broker/cache/*.tar.gz - # unpin and remove all IPFS content from my machine # ipfs pin ls --type recursive | cut -d' ' -f1 | ifne xargs -n1 ipfs pin rm # ipfs repo gc diff --git a/broker/bash_scripts/killall.sh b/broker/bash_scripts/killall.sh index 25439b18..b0204e55 100755 --- a/broker/bash_scripts/killall.sh +++ b/broker/bash_scripts/killall.sh @@ -12,8 +12,8 @@ done killall python 2> /dev/null killall python3 2> /dev/null -echo "## killall all jobs in squeue" -squeue + +timeout 2 squeue && echo "## killall all jobs in squeue" if [ $? -eq 0 ]; then squeue | tail -n+2 | awk '{print $1}' | ifne xargs scancel 2> /dev/null fi diff --git a/broker/config.py b/broker/config.py index eac3b8e2..231090c9 100644 --- a/broker/config.py +++ b/broker/config.py @@ -72,9 +72,8 @@ def __init__(self) -> None: self.GDRIVE = self.cfg["gdrive"] self.DATADIR = Path(self.cfg["datadir"]) self.LOG_DIR = Path(self.cfg["log_path"]) - try: - self.BLOXBERG_HOST = read_network_config(Path.home() / ".brownie" / "network-config.yaml") + self.BLOXBERG_HOST = read_network_config(cfg.NETWORK_ID) except: self.BLOXBERG_HOST = "https://core.bloxberg.org" diff --git a/broker/eblocbroker_scripts/Contract.py b/broker/eblocbroker_scripts/Contract.py index 3ad626c6..eb1e5dbb 100644 --- a/broker/eblocbroker_scripts/Contract.py +++ b/broker/eblocbroker_scripts/Contract.py @@ -14,7 +14,7 @@ from web3.types import TxReceipt from broker import cfg -from broker._utils._log import ok +from broker._utils._log import WHERE, ok from broker._utils.tools import exit_after, log, print_tb, without_keys from broker._utils.yaml import Yaml from broker.config import env @@ -25,10 +25,11 @@ from brownie.network.account import Account, LocalAccount from brownie.network.transaction import TransactionReceipt +# from brownie.network.gas.strategies import GasNowStrategy # from brownie.network.gas.strategies import LinearScalingStrategy -GAS_PRICE = 1.13 # was 1 -EXIT_AFTER = 120 # seconds +GAS_PRICE = 1.14 # was 1 => 1.13 +EXIT_AFTER = 180 # seconds class Base: @@ -75,7 +76,7 @@ def __init__(self, is_brownie=False) -> None: self.mongo_broker = MongoBroker(mc, mc["ebloc_broker"]["cache"]) # self.gas_limit = "max" # 300000 self.ops = {} - self.max_retries = 10 + self.max_retries = 3 self.required_confs = 1 self._from = "" #: tx cost exceeds current gas limit. Limit: 9990226, got: @@ -356,7 +357,8 @@ def timeout(self, func, *args): raise Exception(f"Method {method} is not implemented") from e def timeout_wrapper(self, method, *args): - for _ in range(self.max_retries): + idx = 0 + while idx < self.max_retries: self.ops = { "from": self._from, "gas": self.gas, @@ -385,6 +387,7 @@ def timeout_wrapper(self, method, *args): if "Try increasing the gas price" in str(e): self.gas_price *= 1.13 + continue if "Execution reverted" in str(e) or "Insufficient funds" in str(e): print_tb(e) @@ -406,6 +409,7 @@ def timeout_wrapper(self, method, *args): except KeyboardInterrupt: log("warning: Timeout Awaiting Transaction in the mempool") self.gas_price *= 1.13 + continue except Exception as e: log(f"Exception: {e}") # brownie.exceptions.TransactionError: Tx dropped without known replacement @@ -413,40 +417,51 @@ def timeout_wrapper(self, method, *args): self.gas_price *= 1.13 log("Sleep 15 seconds, will try again...") time.sleep(15) - else: - raise e + continue + + raise e + + idx += 1 ################ # Transactions # ################ def _submit_job(self, required_confs, requester, job_price, *args) -> "TransactionReceipt": self.gas_price = GAS_PRICE - for _ in range(self.max_retries): + method_name = "submitJob" + idx = 0 + while idx < self.max_retries: self.ops = { "gas": self.gas, "gas_price": f"{self.gas_price} gwei", "from": requester, "allow_revert": True, "required_confs": required_confs, - # "value": self.w3.toWei(job_price, "gwei"), # not required anymore } try: - return self.timeout("submitJob", *args) + return self.timeout(method_name, *args) except ValueError as e: - log(f"E: {e}") - if "Insufficient funds" in str(e): - raise e - - if "Execution reverted" in str(e): + log(f"E: {WHERE()} {e}") + if "Insufficient funds" in str(e) or "Execution reverted" in str(e): raise e if "Transaction cost exceeds current gas limit" in str(e): self.gas -= 10000 - except KeyboardInterrupt as e: + continue + + if "Try increasing the gas price" in str(e): + self.gas_price *= 1.13 + continue + except KeyboardInterrupt as e: # acts as Exception + if str(e): + log(f"E: {WHERE()} {e}", is_wrap=True) + if "Awaiting Transaction in the mempool" in str(e): log("warning: Timeout Awaiting Transaction in the mempool") self.gas_price *= 1.13 + continue + idx += 1 raise Exception("No valid Tx receipt is generated") def deposit_storage(self, data_owner, code_hash, _from) -> "TransactionReceipt": diff --git a/broker/eblocbroker_scripts/get_block_number.py b/broker/eblocbroker_scripts/get_block_number.py index 34079964..35ec396a 100755 --- a/broker/eblocbroker_scripts/get_block_number.py +++ b/broker/eblocbroker_scripts/get_block_number.py @@ -8,6 +8,7 @@ from broker.utils import print_tb logging = setup_logger() +# cfg.NETWORK_ID = "bloxberg_core" def main(): diff --git a/broker/eblocbroker_scripts/submit_job.py b/broker/eblocbroker_scripts/submit_job.py index 4495684c..54936bd7 100644 --- a/broker/eblocbroker_scripts/submit_job.py +++ b/broker/eblocbroker_scripts/submit_job.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -import sys - from broker import cfg from broker._utils._log import br from broker._utils.tools import log, print_tb @@ -88,7 +86,7 @@ def check_before_submit(self, provider, _from, provider_info, key, job): if is_use_ipfs: if not is_ipfs_on(): - sys.exit() + raise Exception("ipfs daemon is not running in the background") try: ipfs.swarm_connect(provider_info["ipfs_address"]) diff --git a/broker/end_code.py b/broker/end_code.py index 6816d349..1126a013 100755 --- a/broker/end_code.py +++ b/broker/end_code.py @@ -556,7 +556,7 @@ def run(self): self.elapsed_time = run_time[self.job_id] log(f"finalized_elapsed_time={self.elapsed_time}") - log("## job_info=", "info", end="") + log("job_info=", end="") log(pprint.pformat(self.job_info)) try: self.get_cloud_storage_class(0).initialize(self) diff --git a/broker/errors.py b/broker/errors.py index 9992aef8..591b3136 100644 --- a/broker/errors.py +++ b/broker/errors.py @@ -27,9 +27,9 @@ class Timeout(Exception): """Timeout.""" -class Web3NotConnected(Exception): - """Web3 is not connected.""" - - class QuietExit(Exception): """Exit quietly without printing the trace.""" + + +class Web3NotConnected(Exception): + """Web3 is not connected.""" diff --git a/broker/imports.py b/broker/imports.py index 1512d8b5..b83952fb 100644 --- a/broker/imports.py +++ b/broker/imports.py @@ -25,7 +25,9 @@ def _ping(host) -> bool: """Return True if host (str) responds to a ping request. Remember that a host may not respond to a ping (ICMP) request even if the host name is valid. - Building the command. Ex: "ping -c 1 google.com" + Building the command. + + Ex: "ping -c 1 google.com" """ cmd = ["ping", "-c", "1", host] pr, output, e = popen_communicate(cmd) # noqa @@ -58,13 +60,14 @@ def connect(): def connect_to_eblocbroker() -> None: - """Connect to eBlocBroker smart contract in the given private blockchain.""" + """Connect to eBlocBroker smart contract through the given private blockchain.""" + from brownie import network + if config.ebb: return if not env.EBLOCPATH: - log("E: env.EBLOCPATH variable is empty") - raise QuietExit + raise QuietExit("env.EBLOCPATH variable is empty") try: if env.IS_EBLOCPOA: @@ -72,8 +75,6 @@ def connect_to_eblocbroker() -> None: config._eblocbroker = config.ebb config.ebb.contract_address = cfg.w3.toChecksumAddress(env.CONTRACT_ADDRESS) elif env.IS_BLOXBERG and not cfg.IS_BROWNIE_TEST: - from brownie import network - try: network.connect(cfg.NETWORK_ID) if not network.is_connected(): @@ -119,73 +120,3 @@ def connect_to_eblocbroker() -> None: except Exception as e: print_tb(e) raise e - - -# depreciated - -""" -from web3 import IPCProvider, Web3 -from web3.middleware import geth_poa_middleware -from web3.providers.rpc import HTTPProvider -from broker.utils import is_geth_on, run, terminate - -def _connect_to_web3() -> None: - "Connect to web3 of the corresponding ethereum blockchain. - - * bloxberg: - __ https://bloxberg.org - " - if env.IS_GETH_TUNNEL or not env.IS_EBLOCPOA: - if env.IS_BLOXBERG: - # TODO you can use brownie's connected web3? - cfg.w3 = Web3(HTTPProvider("https://core.bloxberg.org")) - else: - cfg.w3 = Web3(HTTPProvider(f"http://localhost:{env.RPC_PORT}")) - else: - cfg.w3 = Web3(IPCProvider(env.DATADIR.joinpath("geth.ipc"))) - #: inject the poa compatibility middleware to the innermost layer - cfg.w3.middleware_onion.inject(geth_poa_middleware, layer=0) - - -def connect_to_web3() -> None: - "Connect to the private ethereum network using web3. - - Note that you should create only one RPC Provider per process, as it - recycles underlying TCP/IP network connections between your process and - Ethereum node - " - if cfg.w3.isConnected(): - return - - web3_ipc_fn = env.DATADIR.joinpath("geth.ipc") - for _ in range(5): - _connect_to_web3() - if not cfg.w3.isConnected(): - try: - if env.IS_GETH_TUNNEL: - raise Exception("Web3ConnectError: try tunnelling: ssh -f -N -L 8545:localhost:8545 username@") - - if env.IS_BLOXBERG: - log("E: web3 is not connected into [green]BLOXBERG[/green]") - else: - is_geth_on() - except QuietExit: - pass - except Exception as e: - print_tb(e) - sys.exit(1) - - if not env.IS_GETH_TUNNEL and not env.IS_BLOXBERG: - log( - "E: If web3 is not connected please start geth server and give permission \n" - "to /private/geth.ipc file doing: ", - end="", - ) - log(f"sudo chown $(logname) {web3_ipc_fn}", "green") - log(f"#> running `sudo chown $(whoami) {web3_ipc_fn}`") - run(["sudo", "chown", env.WHOAMI, web3_ipc_fn]) - else: - break - else: - terminate(is_traceback=False) -""" diff --git a/broker/ipfs/job_example.yaml b/broker/ipfs/job_example.yaml index 8e03c689..a0bf40b5 100644 --- a/broker/ipfs/job_example.yaml +++ b/broker/ipfs/job_example.yaml @@ -1,19 +1,19 @@ config: - requester_address: '0x0636278CBD420368b1238ab204b1073df9cC1c5c' - provider_address: '0x29e613b04125c16db3f3613563bfdd0ba24cb629' - source_code: - cache_type: public - path: ~/test_eblocbroker/test_data/base/source_code - storage_hours: 0 - storage_id: ipfs - data: - data1: - cache_type: public - path: ~/test_eblocbroker/test_data/base/data/data1 - storage_hours: 1 - storage_id: ipfs - data_transfer_out: 2 - jobs: - job1: - cores: 1 - run_time: 1 + requester_address: '0x0636278CBD420368b1238ab204b1073df9cC1c5c' + provider_address: '0x29e613b04125c16db3f3613563bfdd0ba24cb629' + source_code: + cache_type: public + path: ~/test_eblocbroker/test_data/base/source_code + storage_hours: 0 + storage_id: ipfs + data: + data1: + cache_type: public + path: ~/test_eblocbroker/test_data/base/data/data1 + storage_hours: 1 + storage_id: ipfs + data_transfer_out: 2 + jobs: + job1: + cores: 1 + run_time: 1 diff --git a/broker/ipfs/submit.py b/broker/ipfs/submit.py index a425e726..f8f4d174 100755 --- a/broker/ipfs/submit.py +++ b/broker/ipfs/submit.py @@ -54,8 +54,7 @@ def pre_check(job: Job, requester): job.check_account_status(requester) is_bin_installed("ipfs") if not is_dpkg_installed("pigz"): - log("E: Install [g]pigz[/g].\nsudo apt install -y pigz") - sys.exit() + raise Exception("E: Install [g]pigz[/g].\nsudo apt install -y pigz") if not os.path.isfile(env.GPG_PASS_FILE): log(f"E: Please store your gpg password in the [m]{env.GPG_PASS_FILE}[/m] file for decryption", is_wrap=True) @@ -111,13 +110,12 @@ def _submit(provider_addr, job, requester, targets, required_confs): def submit_ipfs(job: Job, is_pass=False, required_confs=1): - log(f"==> attemptting to submit job ({job.source_code_path})") + log(f"==> attemptting to submit job ({job.source_code_path}) using [green]ipfs[/green]") requester = Ebb.w3.toChecksumAddress(job.requester_addr) try: pre_check(job, requester) except Exception as e: - print_tb(e) - sys.exit() + raise e main_storage_id = job.storage_ids[0] job.folders_to_share = job.paths @@ -127,8 +125,7 @@ def submit_ipfs(job: Job, is_pass=False, required_confs=1): elif main_storage_id == StorageID.IPFS_GPG: log("==> submitting source code through [blue]IPFS_GPG[/blue]") else: - log("E: Please provide IPFS or IPFS_GPG storage type for the source code") - sys.exit(1) + raise Exception("Please provide IPFS or IPFS_GPG storage type for the source code") # provider_info = Ebb.get_provider_info(job.provider_addr) targets = [] @@ -155,8 +152,7 @@ def submit_ipfs(job: Job, is_pass=False, required_confs=1): provider_info = Ebb.get_provider_info(provider_addr) provider_gpg_fingerprint = provider_info["gpg_fingerprint"] if not provider_gpg_fingerprint: - log("E: Provider did not register any GPG fingerprint") - sys.exit(1) + raise Exception("E: Provider did not register any GPG fingerprint") log(f"==> provider_gpg_fingerprint={provider_gpg_fingerprint}") for idx, folder in enumerate(job.folders_to_share): @@ -171,8 +167,7 @@ def submit_ipfs(job: Job, is_pass=False, required_confs=1): log(f"==> gpg_file={target}") targets.append(target) #: created gpg file will be removed since its already in ipfs except Exception as e: - print_tb(e) - sys.exit(1) + raise e job = _ipfs_add(job, target, idx) else: @@ -203,6 +198,6 @@ def main(): try: main() except KeyboardInterrupt: - sys.exit(1) + pass except Exception as e: print_tb(e) diff --git a/broker/libs/gdrive.py b/broker/libs/gdrive.py index 2b4872da..89cbc7a2 100644 --- a/broker/libs/gdrive.py +++ b/broker/libs/gdrive.py @@ -4,7 +4,6 @@ import os import shutil import subprocess -import sys from contextlib import suppress from pathlib import Path @@ -59,8 +58,8 @@ def submit(_from, job): job.keys[tar_hash] = folder_key if job.tmp_dir == "": - print_tb("job.tmp_dir is empty") - sys.exit() + # print_tb("job.tmp_dir is empty") + raise Exception("'job.tmp_dir' is empty") _dump_dict_to_file(data_files_json_path, job.keys) data_json = read_json(data_files_json_path) @@ -103,7 +102,7 @@ def submit(_from, job): break if _id: - log("## updating meta_data ", end="") + log("## updating meta_data ") update_meta_data_gdrive(_id, data_files_json_path) log(ok()) @@ -166,7 +165,8 @@ def delete_all(_type="all"): output = run(["/usr/local/bin/gdrive", "delete", "--recursive", line.split()[0]]) print(output) except Exception as e: - log(f"E {e}") + if str(e) != "": + log(f"E: {e}") # else: # with suppress(Exception): # run(["gdrive", "delete", line.split()[0]]) @@ -265,7 +265,7 @@ def get_data_key_ids(results_folder_prev) -> bool: def update_meta_data_gdrive(key, path): output = get_file_id(key) meta_data_key = fetch_grive_output(output, "meta_data.json") - log(f"\n\t`gdrive update {meta_data_key} {path}`", h=False, end="") + log(f"$ gdrive update {meta_data_key} {path}", is_code=True, h=False, end="") run(["gdrive", "update", meta_data_key, path]) diff --git a/broker/python_scripts/add_bloxberg_into_network_config.py b/broker/python_scripts/add_bloxberg_into_network_config.py index 1620f8f1..a981a294 100755 --- a/broker/python_scripts/add_bloxberg_into_network_config.py +++ b/broker/python_scripts/add_bloxberg_into_network_config.py @@ -8,12 +8,12 @@ from broker._utils._log import log -def read_network_config(fn) -> str: +def read_network_config(network_id, fn=Path.home() / ".brownie" / "network-config.yaml") -> str: config_data, _, _ = ruamel.yaml.util.load_yaml_guess_indent(open(fn)) for _config in config_data["live"]: if _config["name"] == "Ethereum": for network in _config["networks"]: - if "bloxberg" in network["name"]: + if network_id in network["id"]: return network["host"] raise Exception diff --git a/broker/python_scripts/add_data.py b/broker/python_scripts/add_data.py index d9adbdba..f58d4780 100755 --- a/broker/python_scripts/add_data.py +++ b/broker/python_scripts/add_data.py @@ -1,11 +1,9 @@ #!/usr/bin/env python3 import os -import sys from os.path import expanduser from broker import cfg -from broker._utils.tools import print_tb def add_to_ipfs(results_folder): @@ -14,8 +12,7 @@ def add_to_ipfs(results_folder): result_ipfs_hash = cfg.ipfs.add(results_folder) print(result_ipfs_hash) except Exception as e: - print_tb(e) - sys.exit() + raise e if os.path.isdir(results_folder): basename = os.path.basename(os.path.normpath(results_folder)) diff --git a/broker/test_setup/job_cppr.yaml b/broker/test_setup/job_cppr.yaml index add4cdb1..8de2cb23 100644 --- a/broker/test_setup/job_cppr.yaml +++ b/broker/test_setup/job_cppr.yaml @@ -1,21 +1,21 @@ config: - requester_address: '0x4c2aebf67f8cfce387ad0ee3774bb193c4a62ef6' - provider_address: '0x1926b36af775e1312fdebcc46303ecae50d945af' + requester_address: '0x378181ce7b07e8dd749c6f42772574441b20e35f' + provider_address: '0x29e613b04125c16db3f3613563bfdd0ba24cb629' source_code: - storage_id: ipfs_gpg + storage_id: ipfs cache_type: public path: ~/test_eblocbroker/run_cppr storage_hours: 0 data: data1: - hash: 779745f315060d1bc0cd44b7266fb4da + hash: 0d6c3288ef71d89fb93734972d4eb903 data2: - hash: 45281dfec4618e5d20570812dea38760 + hash: fe801973c5b22ef6861f2ea79dc1eb9c data3: cache_type: public - path: /home/alper/test_eblocbroker/small/babyface.n6c10 + path: /home/alper/test_eblocbroker/small/LB07-bunny-sml storage_hours: 1 - storage_id: ipfs_gpg + storage_id: ipfs data_transfer_out: 5 jobs: job1: diff --git a/broker/test_setup/job_nas.yaml b/broker/test_setup/job_nas.yaml index dd2f7f5e..174a14cf 100644 --- a/broker/test_setup/job_nas.yaml +++ b/broker/test_setup/job_nas.yaml @@ -1,8 +1,8 @@ config: - requester_address: '0x72c1a89ff3606aa29686ba8d29e28dccff06430a' - provider_address: '0x4934a70ba8c1c3acfa72e809118bdd9048563a24' + requester_address: '0x92086404471d0e5453f856363358967308b37cd5' + provider_address: '0x1926b36af775e1312fdebcc46303ecae50d945af' source_code: - storage_id: b2drop + storage_id: ipfs cache_type: public path: ~/test_eblocbroker/NPB3.3-SER_source_code storage_hours: 0 diff --git a/broker/test_setup/register_requesters.py b/broker/test_setup/register_requesters.py index 0484e605..5bb86308 100755 --- a/broker/test_setup/register_requesters.py +++ b/broker/test_setup/register_requesters.py @@ -10,11 +10,12 @@ def main(): yaml_fn = "~/ebloc-broker/broker/test_setup/requester.yaml" - for user in requesters: + for idx, user in enumerate(requesters): yaml_user = Yaml(yaml_fn) yaml_user["cfg"]["eth_address"] = user with suppress(Exception): #: could also be used for updating requesters as well + print(f"[ counter={idx} ] ", end="") tx_hash = cfg.Ebb.register_requester(yaml_fn, is_question=False) if tx_hash: get_tx_status(tx_hash) diff --git a/broker/test_setup/requester.yaml b/broker/test_setup/requester.yaml index e3201f53..025955e2 100644 --- a/broker/test_setup/requester.yaml +++ b/broker/test_setup/requester.yaml @@ -1,4 +1,4 @@ cfg: - eth_address: '0xab608a70d0b4a3a288dd68a1661cdb8b6c742672' - gmail: alper.alimoglu.research2@gmail.com - oc_username: dbae8f40-20da-4f6f-b466-d434793320be + eth_address: '0xab608a70d0b4a3a288dd68a1661cdb8b6c742672' + gmail: alper.alimoglu.research2@gmail.com + oc_username: dbae8f40-20da-4f6f-b466-d434793320be diff --git a/broker/test_setup/submit_jobs.py b/broker/test_setup/submit_jobs.py index 500b7c0e..41b13a19 100755 --- a/broker/test_setup/submit_jobs.py +++ b/broker/test_setup/submit_jobs.py @@ -4,7 +4,6 @@ import random import sys import time -from contextlib import suppress from datetime import datetime from pathlib import Path from random import randint @@ -13,19 +12,17 @@ from web3.logs import DISCARD from broker import cfg +from broker._import import check_connection from broker._utils import _log -from broker._utils._log import console_ruler, ok +from broker._utils._log import console_ruler from broker._utils.tools import _date, _timestamp, countdown, is_process_on, log, run from broker._utils.web3_tools import get_tx_status from broker._utils.yaml import Yaml -from broker.config import env -from broker.imports import nc from broker.libs import gdrive from broker.libs.mongodb import BaseMongoClass from broker.submit_base import SubmitBase from broker.test_setup.user_set import providers, requesters from broker.utils import print_tb -from brownie import network Ebb = cfg.Ebb cfg.IS_FULL_TEST = True @@ -39,7 +36,7 @@ PROVIDER_MAIL = "alper.alimoglu.research2@gmail.com" benchmarks = ["nas", "cppr"] -storage_ids = ["gdrive", "ipfs", "b2drop"] +storage_ids = ["ipfs", "gdrive", "b2drop"] ipfs_types = ["ipfs", "ipfs_gpg"] test_dir = Path.home() / "ebloc-broker" / "broker" / "test_setup" @@ -54,8 +51,9 @@ providers = ["0x29e613b04125c16db3f3613563bfdd0ba24cb629"] # noqa cfg.TEST_PROVIDERS = providers -cfg.NETWORK_ID = "bloxberg_core" +# cfg.NETWORK_ID = "bloxberg_core" _ruler = "=======================" +FIRST_CYCLE = True # for provider_addr in providers: # mini_tests_submit(storage_ids, provider_addr) @@ -223,52 +221,34 @@ def mini_tests_submit(storage_ids, provider_addr): log(f"E: Tx({tx_hash}) is reverted") -def reconnect(): - log(f"E: {network.show_active()} is not connected through {env.BLOXBERG_HOST}") - if cfg.NETWORK_ID == "bloxberg": - cfg.NETWORK_ID = "bloxberg_core" - elif cfg.NETWORK_ID == "bloxberg_core": - with suppress(Exception): - nc("alpy-bloxberg.duckdns.org", 8545) - cfg.NETWORK_ID = "bloxberg" - - log(f"Trying at {cfg.NETWORK_ID} ...") - network.connect(cfg.NETWORK_ID) - - -def check_connection(): - if not network.is_connected(): - reconnect() - if not network.is_connected(): - time.sleep(15) - else: - log(f"#> bloxberg connection using network_id={cfg.NETWORK_ID} {ok()}", is_write=False) - - -def run_job(counter) -> None: +def run_job(counter, cycleid) -> None: """Submit single job. :param counter: counter index to keep track of submitted job number """ + global FIRST_CYCLE # type: ignore for idx, provider_addr in enumerate(providers): # yaml_cfg["config"]["data"]["data3"]["storage_id"] = random.choice(storage_ids) storage_id = (idx + counter) % len(storage_ids) selected_benchmark = random.choice(benchmarks) storage = storage_ids[storage_id] if storage == "ipfs": - storage = random.choice(ipfs_types) + if FIRST_CYCLE: + storage = "ipfs" + selected_benchmark = "cppr" + else: + storage = random.choice(ipfs_types) + + if not FIRST_CYCLE: + print(" ") if selected_benchmark == "nas": - log( - f"{_ruler} Submitting the job from [cyan]NAS Benchmark[/cyan] to [g]{provider_addr}[/g]\t", - "bold blue", - ) + log(f"{_ruler} Submitting the job from [c]NAS Benchmark[/c] to [g]{provider_addr}[/g] cycle={cycleid}\t") check_connection() yaml_cfg = Yaml(nas_yaml_fn) benchmark_name = create_nas_job_script() elif selected_benchmark == "cppr": - print(" ") - log(f"{_ruler} Submitting [cyan]job[/cyan] with [cyan]cppr datasets[/cyan] data_set_idx={idx}\t") + log(f"{_ruler} Submitting [c]job[/c] with [c]cppr datasets[/c] data_set_idx={idx} cycle={cycleid}\t") check_connection() # log(f" * Attempting to submit [cyan]job with cppr datasets[/cyan] to_provider=[g]{provider_addr}") yaml_cfg = Yaml(cppr_yam_fn) @@ -286,9 +266,14 @@ def run_job(counter) -> None: submit_base = SubmitBase(yaml_cfg.path) submission_date = _date() submission_timestamp = _timestamp() - requester_address = random.choice(requesters).lower() - yaml_cfg["config"]["requester_address"] = requester_address - log(f"requester={requester_address}") + if FIRST_CYCLE: + requester = requesters[0].lower() + FIRST_CYCLE = False + else: + requester = random.choice(requesters).lower() + + yaml_cfg["config"]["requester_address"] = requester + log(f"requester={requester}") tx_hash = submit_base.submit(is_pass=True) log(f"tx_hash={tx_hash}") tx_receipt = get_tx_status(tx_hash, is_verbose=True) @@ -341,13 +326,13 @@ def main(): # breakpoint() # DEBUG if is_mini_test: - run_job(0) + run_job(0, cycleid=0) else: try: counter = 0 - for _ in range(100): + for idx in range(80): for _ in range(2): # submitted as batch is faster - run_job(counter) + run_job(counter, idx) counter += 1 time.sleep(2) @@ -357,6 +342,7 @@ def main(): log(f"#> number_of_submitted_jobs={counter}") except Exception as e: print_tb(e) + check_connection() if __name__ == "__main__": diff --git a/broker/test_setup/users.py b/broker/test_setup/users.py index ec6ce095..48b8d60b 100755 --- a/broker/test_setup/users.py +++ b/broker/test_setup/users.py @@ -84,13 +84,13 @@ def main(): owner = Ebb.get_owner() log(f"ower_balance ({owner.lower()})=", "bold", end="") balances([owner], is_verbose=True) - # balances(providers) - # balances(requesters) + balances(providers) + balances(requesters) # # collect_all_into_base() # transfer_eth(["0xd118b6ef83ccf11b34331f1e7285542ddf70bc49"], "0.5 ether") # transfer_eth(providers, "0.4 ether") - transfer_eth(requesters, "0.11 ether") + # transfer_eth(requesters, "0.11 ether") # transfer_eth(extra_users, "0.1 ether") diff --git a/broker/todo.org b/broker/todo.org index 6fa70204..96661ad1 100644 --- a/broker/todo.org +++ b/broker/todo.org @@ -57,3 +57,110 @@ Response: { ## Waiting block number to be updated, it remains constant at 19922819 Downloading meta_data.json -> /var/ebloc-broker/b68d6462f9762cfed7bbc7f90e6c6c7f/1KFwXV2UWnJwj25HS3wdrjcQxartboQAW_0/meta_data.json #+end_src + + +** TODO max retry + +#+begin_src bash +E: HTTPConnectionPool(host='alpy-bloxberg.duckdns.org', port=8545): Max retries exceeded with url: / (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused')) +E: bloxberg is not connected through http://alpy-bloxberg.duckdns.org:8545 +Trying at bloxberg_core... Exception in thread Thread-2: +Traceback (most recent call last): + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connection.py", line 174, in _new_conn + conn = connection.create_connection( + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 95, in create_connection + raise err + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/util/connection.py", line 85, in create_connection + sock.connect(sa) +ConnectionRefusedError: [Errno 111] Connection refused + +During handling of the above exception, another exception occurred: + +Traceback (most recent call last): + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 703, in urlopen + httplib_response = self._make_request( + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 398, in _make_request + conn.request(method, url, **httplib_request_kw) + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connection.py", line 239, in request + super(HTTPConnection, self).request(method, url, body=body, headers=headers) + File "/usr/lib/python3.8/http/client.py", line 1256, in request + self._send_request(method, url, body, headers, encode_chunked) + File "/usr/lib/python3.8/http/client.py", line 1302, in _send_request + self.endheaders(body, encode_chunked=encode_chunked) + File "/usr/lib/python3.8/http/client.py", line 1251, in endheaders + self._send_output(message_body, encode_chunked=encode_chunked) + File "/usr/lib/python3.8/http/client.py", line 1011, in _send_output + self.send(msg) + File "/usr/lib/python3.8/http/client.py", line 951, in send + self.connect() + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connection.py", line 205, in connect + conn = self._new_conn() + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connection.py", line 186, in _new_conn + raise NewConnectionError( +urllib3.exceptions.NewConnectionError: : Failed to establish a new connection: [Errno 111] Connection refused + +During handling of the above exception, another exception occurred: + +Traceback (most recent call last): + File "/home/alper/venv/lib/python3.8/site-packages/requests/adapters.py", line 489, in send + resp = conn.urlopen( + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/connectionpool.py", line 787, in urlopen + retries = retries.increment( + File "/home/alper/venv/lib/python3.8/site-packages/urllib3/util/retry.py", line 592, in increment + raise MaxRetryError(_pool, url, error or ResponseError(cause)) +urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='alpy-bloxberg.duckdns.org', port=8545): Max retries exceeded with url: / (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused')) + +During handling of the above exception, another exception occurred: + +Traceback (most recent call last): + File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner + self.run() + File "/usr/lib/python3.8/threading.py", line 870, in run + self._target(*self._args, **self._kwargs) + File "/home/alper/venv/lib/python3.8/site-packages/brownie/network/middlewares/caching.py", line 156, in block_filter_loop + new_blocks = self.block_filter.get_new_entries() + File "/home/alper/venv/lib/python3.8/site-packages/web3/_utils/filters.py", line 160, in get_new_entries + log_entries = self._filter_valid_entries(self.eth_module.get_filter_changes(self.filter_id)) + File "/home/alper/venv/lib/python3.8/site-packages/web3/module.py", line 57, in caller + result = w3.manager.request_blocking(method_str, + File "/home/alper/venv/lib/python3.8/site-packages/web3/manager.py", line 197, in request_blocking + response = self._make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/manager.py", line 150, in _make_request + return request_func(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/brownie/network/middlewares/caching.py", line 202, in process_request + return make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/formatting.py", line 94, in middleware + response = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/gas_price_strategy.py", line 90, in middleware + return make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/formatting.py", line 94, in middleware + response = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/attrdict.py", line 33, in middleware + response = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/formatting.py", line 94, in middleware + response = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/formatting.py", line 94, in middleware + response = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/formatting.py", line 94, in middleware + response = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/buffered_gas_estimate.py", line 40, in middleware + return make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/brownie/network/middlewares/catch_tx_revert.py", line 24, in process_request + result = make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/middleware/exception_retry_request.py", line 105, in middleware + return make_request(method, params) + File "/home/alper/venv/lib/python3.8/site-packages/web3/providers/rpc.py", line 88, in make_request + raw_response = make_post_request( + File "/home/alper/venv/lib/python3.8/site-packages/web3/_utils/request.py", line 112, in make_post_request + response = session.post(endpoint_uri, data=data, *args, **kwargs) # type: ignore + File "/home/alper/venv/lib/python3.8/site-packages/requests/sessions.py", line 635, in post + return self.request("POST", url, data=data, json=json, **kwargs) + File "/home/alper/venv/lib/python3.8/site-packages/requests/sessions.py", line 587, in request + resp = self.send(prep, **send_kwargs) + File "/home/alper/venv/lib/python3.8/site-packages/requests/sessions.py", line 701, in send + r = adapter.send(request, **kwargs) + File "/home/alper/venv/lib/python3.8/site-packages/requests/adapters.py", line 565, in send + raise ConnectionError(e, request=request) +requests.exceptions.ConnectionError: HTTPConnectionPool(host='alpy-bloxberg.duckdns.org', port=8545): Max retries exceeded with url: / (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused')) +#+end_src diff --git a/docs/conf.py b/docs/conf.py index 8cd76384..5f144a97 100755 --- a/docs/conf.py +++ b/docs/conf.py @@ -97,6 +97,7 @@ # The name of the Pygments (syntax highlighting) style to use. pygments_style = "sphinx" + # custom solidity lexer def setup(sphinx): sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) diff --git a/requirements.txt b/requirements.txt index f05bfa11..d273c865 100755 --- a/requirements.txt +++ b/requirements.txt @@ -494,7 +494,7 @@ multimapping==4.1 multipart==0.2.4 # via # zope-publisher -mypy==1.1.1 +mypy==1.2.0 # via # flake8-mypy mypy-extensions==1.0.0