diff --git a/broker/Driver.py b/broker/Driver.py index d81bad1b..6a151854 100644 --- a/broker/Driver.py +++ b/broker/Driver.py @@ -496,7 +496,7 @@ def check_connection(): if not network.is_connected(): time.sleep(15) else: - log(f"bloxberg connection{ok()}", is_write=False) + log(f"bloxberg connection {ok()}", is_write=False) def _run_driver(given_bn, lock): diff --git a/broker/_cli/helper.py b/broker/_cli/helper.py index 5096c993..c5bd1764 100644 --- a/broker/_cli/helper.py +++ b/broker/_cli/helper.py @@ -107,6 +107,7 @@ def register(self): def authenticate_orc_id(self): # FIXME: missing functionality obj = self.subparsers.add_parser("auth_orc_id", help="Authenticate orcid") + obj.add_argument("eth_address", type=str, help="Ethereum address of the user") def submit(self): obj = self.subparsers.add_parser("submit", help="Submit job") diff --git a/broker/_daemons/ipfs.py b/broker/_daemons/ipfs.py index 7a5414cf..0a8c6d4e 100755 --- a/broker/_daemons/ipfs.py +++ b/broker/_daemons/ipfs.py @@ -53,7 +53,7 @@ def main(): cfg.ipfs.remove_lock_files() _run() else: - log(f"## [g]IPFS[/g] daemon is already running{ok()}") + log(f"## [g]IPFS[/g] daemon is already running {ok()}") sys.exit(100) diff --git a/broker/_utils/yaml.py b/broker/_utils/yaml.py index d880a67c..51bd258d 100755 --- a/broker/_utils/yaml.py +++ b/broker/_utils/yaml.py @@ -104,7 +104,7 @@ def __init__(self, path, auto_dump=True): self.changed = False self.yaml = YAML() self.yaml.indent(mapping=2, sequence=4, offset=2) - with FileLock(self.fp_lock, timeout=1): + with FileLock(self.fp_lock, timeout=2): _remove(self.path_temp) if self.auto_dump: if self.path.exists(): diff --git a/broker/_watch/watch.py b/broker/_watch/watch.py index ff92caff..6dfa3df3 100755 --- a/broker/_watch/watch.py +++ b/broker/_watch/watch.py @@ -21,6 +21,7 @@ is_while = True # to fetch on-going results is_provider = True +cfg.NETWORK_ID = "bloxberg_core" is_csv = False analyze_long_test = False @@ -257,7 +258,7 @@ def main(): if len(sys.argv) == 2: eth_address = sys.argv[1] - from_block = 19874011 + from_block = 19926110 watch(eth_address, from_block) diff --git a/broker/_watch/watch_tests.sh b/broker/_watch/watch_tests.sh index 52871c42..9596f661 100755 --- a/broker/_watch/watch_tests.sh +++ b/broker/_watch/watch_tests.sh @@ -8,22 +8,19 @@ providers=("0x29e613b04125c16db3f3613563bfdd0ba24cb629" num=$(ps aux | grep -E "[w]atch.py" | grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l) if [ $num -ge 1 ]; then - echo "warning: `watch.py` is already running" + echo -e "#> 'watch' process is already running" else rm -f ~/.ebloc-broker/watch.out ~/.ebloc-broker/watch_*.out \ ~/ebloc-broker/broker/_watch/provider_*.txt for i in "${!providers[@]}"; do + touch ~/.ebloc-broker/watch_${providers[$i]}.out ~/ebloc-broker/broker/_watch/watch.py "${providers[$i]}" >/dev/null & + sleep 1 done fi -watch --color head -n 15 \ +clear +watch --color head -n 16 \ ~/.ebloc-broker/watch_${providers[0]}.out \ ~/.ebloc-broker/watch_${providers[1]}.out \ ~/.ebloc-broker/watch_${providers[2]}.out - -: ' commented -cat ~/.ebloc-broker/watch_$provider_1.out -cat ~/.ebloc-broker/watch_$provider_2.out -cat ~/.ebloc-broker/watch_$provider_3.out -' diff --git a/broker/bash_scripts/clean_for_new_test.sh b/broker/bash_scripts/clean_for_new_test.sh index 5ae71064..d55cdbaa 100755 --- a/broker/bash_scripts/clean_for_new_test.sh +++ b/broker/bash_scripts/clean_for_new_test.sh @@ -103,3 +103,6 @@ if [[ -d $BASE ]]; then cd /var/ebloc-broker && fdfind . | as-tree && cd ~ cd $CURRENT_DIR fi + +echo "" +cat ~/.brownie/network-config.yaml| grep bloxberg diff --git a/broker/cfg.py b/broker/cfg.py index f0d7df5b..1b423a17 100644 --- a/broker/cfg.py +++ b/broker/cfg.py @@ -24,7 +24,7 @@ IS_SEARCH_BEST_PROVIDER_VERBOSE = True TEST_PROVIDERS = None TX_LOG_VERBOSE = False -NETWORK_ID = "bloxberg" +NETWORK_ID = "bloxberg" # "bloxberg_core" class EBB: diff --git a/broker/drivers/b2drop.py b/broker/drivers/b2drop.py index 257915c3..b47872ce 100644 --- a/broker/drivers/b2drop.py +++ b/broker/drivers/b2drop.py @@ -169,14 +169,14 @@ def _download_folder(self, results_folder_prev, folder_name): "--show-progres", "--progress=bar:force:noscroll", ] - log(" ".join(cmd), is_code=True, color="bold yellow") + log(" ".join(cmd), is_code=True, color="yellow") run(cmd) with cd(results_folder_prev): run(["unzip", "-o", "-j", download_fn]) _remove(download_fn) self.tar_downloaded_path[folder_name] = cached_tar_fn - log(f"## download file from B2DROP{ok()}") + log(f"## download file from B2DROP {ok()}") return except: log("E: Failed to download B2DROP file via wget.\nTrying `config.oc.get_file()` approach...") @@ -291,7 +291,7 @@ def get_share_token(self, f_id): # TODO: if added before or some do nothing if Ebb.mongo_broker.add_item_share_id(share_key, value["share_id"], value["share_token"]): # adding into mongoDB for future usage - log(f"#> [g]{share_key}[/g] is added into mongoDB{ok()}") + log(f"#> [g]{share_key}[/g] is added into mongoDB {ok()}") except Exception as e: print_tb(e) log(f"E: {e}") @@ -370,7 +370,7 @@ def run(self) -> bool: time.sleep(0.25) def _run(self) -> bool: - log(f"{br(get_date())} new job has been received through B2DROP: {self.job_key} {self.index} ", "bold cyan") + log(f"{br(get_date())} new job has been received through B2DROP: {self.job_key} {self.index} ", "cyan") # TODO: refund check try: provider_info = Ebb.get_provider_info(self.logged_job.args["provider"]) @@ -422,7 +422,7 @@ def _run(self) -> bool: except KeyError: try: shared_id = Ebb.mongo_broker.find_shareid_item(key=share_key) - log(f"#> reading from mongo_broker{ok()}") + log(f"#> reading from mongo_broker {ok()}") log(shared_id) except Exception as e: print_tb(e) diff --git a/broker/drivers/storage_class.py b/broker/drivers/storage_class.py index d4b32d95..db2e679f 100644 --- a/broker/drivers/storage_class.py +++ b/broker/drivers/storage_class.py @@ -222,7 +222,7 @@ def is_run_exists_in_tar(self, tar_path) -> bool: .strip() ) if output.count("/") == 1: - log(f"[m]./run.sh[/m] exists under the parent folder{ok()}", "bold") + log(f"[m]./run.sh[/m] exists under the parent folder {ok()}", "bold") return True else: log("E: run.sh does not exist under the parent folder") diff --git a/broker/eblocbroker_scripts/Contract.py b/broker/eblocbroker_scripts/Contract.py index bb7bbab3..3ad626c6 100644 --- a/broker/eblocbroker_scripts/Contract.py +++ b/broker/eblocbroker_scripts/Contract.py @@ -101,9 +101,6 @@ def _setup(self, is_brownie=False): self.eBlocBroker, self.w3, self._eblocbroker = connect() except Exception as e: print_tb(e) - sys.exit(1) - - ebb = None # contract object def brownie_load_account(self, fn="", password="alper"): """Load accounts from Brownie for Bloxberg.""" diff --git a/broker/eblocbroker_scripts/get_block_number.py b/broker/eblocbroker_scripts/get_block_number.py index 98900b57..34079964 100755 --- a/broker/eblocbroker_scripts/get_block_number.py +++ b/broker/eblocbroker_scripts/get_block_number.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -from broker.eblocbroker_scripts.utils import Cent import sys from broker import cfg @@ -22,8 +21,8 @@ def main(): output = Ebb.get_block_number() if is_write_to_file: env.config["block_continue"] = output - balance_temp = Ebb.get_balance(env.PROVIDER_ID) - env.config["token_balance"] = int(balance_temp) + #: used in cleaning for a new test + env.config["token_balance"] = int(Ebb.get_balance(env.PROVIDER_ID)) else: log(f"block_number={output}") except Exception as e: diff --git a/broker/eblocbroker_scripts/log_job.py b/broker/eblocbroker_scripts/log_job.py index 2a06d84a..70bef82b 100755 --- a/broker/eblocbroker_scripts/log_job.py +++ b/broker/eblocbroker_scripts/log_job.py @@ -62,6 +62,7 @@ def log_loop(event_filter, poll_interval: int = 6): with suppress(Exception): bn = cfg.Ebb.get_block_number() + print() sleep_duration = 0 while True: # watch_bn() # this may cause timeout error over time diff --git a/broker/imports.py b/broker/imports.py index 9a585b70..1512d8b5 100644 --- a/broker/imports.py +++ b/broker/imports.py @@ -97,14 +97,14 @@ def connect_to_eblocbroker() -> None: cfg.w3 = network.web3 except: - from broker.python_scripts import add_bloxberg_into_network_config + # from broker.python_scripts import add_bloxberg_into_network_config - add_bloxberg_into_network_config.main() + # add_bloxberg_into_network_config.main() try: - log( - "warning: [green]bloxberg[/green] key is added into the " - "[m]~/.brownie/network-config.yaml[/m] file. Please try again." - ) + # log( + # "warning: [green]bloxberg[/green] key is added into the " + # "[m]~/.brownie/network-config.yaml[/m] file. Please try again." + # ) network.connect(cfg.NETWORK_ID) except KeyError: sys.exit(1) diff --git a/broker/libs/eudat.py b/broker/libs/eudat.py index 7bfe32cd..a7034498 100644 --- a/broker/libs/eudat.py +++ b/broker/libs/eudat.py @@ -6,7 +6,6 @@ import pickle import shutil import subprocess -import sys import time from contextlib import suppress from pathlib import Path diff --git a/broker/libs/mongodb.py b/broker/libs/mongodb.py index d743260e..9e049e27 100755 --- a/broker/libs/mongodb.py +++ b/broker/libs/mongodb.py @@ -4,6 +4,7 @@ from pymongo import MongoClient from rich.pretty import pprint + from broker._utils._log import log @@ -149,6 +150,9 @@ def is_received(self, requester_addr, key, index, is_print=False) -> bool: def main(): + # from broker._utils import _log + + # _log.IS_WRITE = False mc = MongoClient() ebb_mongo = MongoBroker(mc, mc["ebloc_broker"]["cache"]) parser = argparse.ArgumentParser(description="Process MongoDB.") diff --git a/broker/start_code.py b/broker/start_code.py index 2d12de1b..13e73c01 100755 --- a/broker/start_code.py +++ b/broker/start_code.py @@ -47,7 +47,7 @@ def start_call(key, index, slurm_job_id) -> None: p2.stdout.close() # type: ignore date = p3.communicate()[0].decode("utf-8").strip() start_ts = check_output(["date", "-d", date, "+'%s'"]).strip().decode("utf-8").strip("'") - log(f"{env.EBB_SCRIPTS}/set_job_state_running.py {key} {index} {job_id} {start_ts}", "bold white") + log(f"{env.EBB_SCRIPTS}/set_job_state_running.py {key} {index} {job_id} {start_ts}", is_code=True) log(f"#> pid={pid}") for attempt in range(10): if attempt > 0: diff --git a/broker/test_setup/register_data_files.py b/broker/test_setup/register_data_files.py index 24dcfba7..1e142065 100755 --- a/broker/test_setup/register_data_files.py +++ b/broker/test_setup/register_data_files.py @@ -63,7 +63,7 @@ def register_data_files(data_price, hashes): def main(): - ## register_data_files(data_price=1, hashes=hashes_small) + # register_data_files(data_price=1, hashes=hashes_small) # register_data_files(data_price=Cent("0.0002 usd"), hashes=hashes_medium_1) # register_data_files(data_price=Cent("0.0003 usd"), hashes=hashes_medium_2) diff --git a/broker/test_setup/submit_jobs.py b/broker/test_setup/submit_jobs.py index 53e89b91..500b7c0e 100755 --- a/broker/test_setup/submit_jobs.py +++ b/broker/test_setup/submit_jobs.py @@ -356,7 +356,6 @@ def main(): log(f"#> number_of_submitted_jobs={counter}") except Exception as e: - print(str(e)) print_tb(e) @@ -365,5 +364,6 @@ def main(): main() except Exception as e: print_tb(e) + log("end") except KeyboardInterrupt: sys.exit(1)