From 8dacdc5e7c0c7c858d3a6c3bacbc2a255483fb5a Mon Sep 17 00:00:00 2001 From: avatar-lavventura Date: Sat, 11 Jun 2022 21:39:29 +0000 Subject: [PATCH] Improve Dockerfile. - Fix watch scripts --- Dockerfile | 27 ++-- broker/Driver.py | 12 +- broker/_utils/_log.py | 70 ++++------ broker/_utils/tools.py | 6 +- broker/bash_scripts/clean_for_new_test.sh | 2 +- broker/bash_scripts/decrypt_gpg.sh | 5 +- broker/bash_scripts/fetch_ipfs_hashes.sh | 10 ++ broker/bash_scripts/killall.sh | 2 +- broker/bash_scripts/slurm_mail_prog.sh | 6 +- broker/drivers/driver_gc.py | 10 +- broker/drivers/gdrive.py | 4 +- broker/drivers/storage_class.py | 7 +- broker/eblocbroker_scripts/Contract.py | 3 +- broker/eblocbroker_scripts/get_data_price.py | 64 +++++++-- broker/eblocbroker_scripts/get_job_info.py | 125 +++++++----------- broker/eblocbroker_scripts/job.py | 16 +-- broker/eblocbroker_scripts/log_job.py | 6 +- broker/eblocbroker_scripts/process_payment.py | 4 +- broker/eblocbroker_scripts/register_data.py | 24 ++-- .../eblocbroker_scripts/register_provider.py | 6 +- .../eblocbroker_scripts/update_data_price.py | 14 +- .../update_provider_info.py | 6 +- broker/end_code.py | 4 +- broker/gdrive/README.org | 4 +- broker/gdrive/submit.py | 4 +- broker/imports.py | 4 +- broker/ipfs/submit.py | 2 +- broker/libs/_git.py | 4 +- broker/libs/eudat.py | 6 +- broker/libs/gdrive.py | 2 +- broker/libs/ipfs.py | 4 +- broker/libs/mongodb.py | 4 +- broker/libs/user_setup.py | 4 +- broker/link.py | 12 +- broker/python_scripts/apply_patch.py | 2 +- broker/python_scripts/get_transaction_log.py | 7 +- broker/start_code.py | 2 +- broker/test_setup/README.org | 5 +- broker/test_setup/check_list.org | 12 +- broker/test_setup/datasets.org | 7 +- broker/test_setup/job_cppr.yaml | 11 +- broker/test_setup/job_nas.yaml | 7 +- broker/test_setup/prepare_data.sh | 2 +- broker/test_setup/register_data_files.py | 24 +++- broker/test_setup/submit_jobs.py | 32 ++--- broker/test_setup/users.py | 4 +- broker/test_setup/watch_tests.sh | 17 +-- broker/utils.py | 7 +- .../{eblocbroker_scripts => watch}/watch.py | 37 +++--- .../{eblocbroker_scripts => watch}/watch.sh | 9 +- .../watch_jobs.py | 21 +-- requirements.txt | 4 +- scripts/setup.sh | 6 +- 53 files changed, 360 insertions(+), 339 deletions(-) mode change 100644 => 100755 broker/python_scripts/get_transaction_log.py rename broker/{eblocbroker_scripts => watch}/watch.py (84%) rename broker/{eblocbroker_scripts => watch}/watch.sh (61%) rename broker/{eblocbroker_scripts => watch}/watch_jobs.py (87%) diff --git a/Dockerfile b/Dockerfile index 1bb13ae0..b692c250 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,16 +1,16 @@ # syntax=docker/dockerfile:1 FROM golang:latest RUN apt-get install -y ca-certificates -RUN wget --no-check-certificate -q "https://dist.ipfs.io/go-ipfs/v0.11.0/go-ipfs_v0.11.0_linux-amd64.tar.gz" \ - && tar -xvf "go-ipfs_v0.11.0_linux-amd64.tar.gz" \ - && rm -f go-ipfs_v0.11.0_linux-amd64.tar.gz -WORKDIR go-ipfs -RUN make install \ +RUN wget --no-check-certificate -q "https://dist.ipfs.io/go-ipfs/v0.13.0/go-ipfs_v0.13.0_linux-amd64.tar.gz" \ + && tar -xf "go-ipfs_v0.13.0_linux-amd64.tar.gz" \ + && rm -f go-ipfs_v0.13.0_linux-amd64.tar.gz \ + && cd go-ipfs \ + && make install \ && ./install.sh -RUN git clone https://github.com/prasmussen/gdrive.git /workspace -WORKDIR /workspace/gdrive -RUN go env -w GO111MODULE=auto \ +RUN git clone https://github.com/prasmussen/gdrive.git /workspace/gdrive \ + && cd /workspace/gdrive \ + && go env -w GO111MODULE=auto \ && go get github.com/prasmussen/gdrive FROM python:3.7 @@ -151,7 +151,7 @@ RUN brownie init \ && cd /workspace//ebloc-broker/contract \ && brownie compile -# orginize Slurm files +# orginize slurm files RUN chown root:munge -R /etc/munge /etc/munge/munge.key /var/lib/munge # works but root is alright? WORKDIR /var/log/slurm WORKDIR /var/run/supervisor @@ -165,15 +165,16 @@ COPY --chown=slurm docker/slurm/files/slurm/slurmdbd.conf /etc/slurm/slurmdbd.co RUN chmod 0600 /etc/slurm/slurmdbd.conf ## finally # sysctl -w net.core.rmem_max=2500000 ? -RUN ipfs version \ - && ipfs init --profile=server,badgerds \ +RUN gdrive version \ + && ipfs version \ + && ipfs init \ && ipfs config Reprovider.Strategy roots \ && ipfs config Routing.Type none \ && ganache --version \ && /workspace/ebloc-broker/broker/bash_scripts/ubuntu_clean.sh >/dev/null 2>&1 \ - && du -sh / 2>&1 | grep -v "cannot" \ && echo "alias ls='ls -h --color=always -v --author --time-style=long-iso'" >> ~/.bashrc \ - && echo "export SQUEUE_FORMAT=\"%8i %9u %5P %2t %12M %12l %5D %3C %30j\"v" >> ~/.bashrc + && echo "export SQUEUE_FORMAT=\"%8i %9u %5P %2t %12M %12l %5D %3C %30j\"v" >> ~/.bashrc \ + && du -sh / 2>&1 | grep -v "cannot" WORKDIR /workspace/ebloc-broker/broker CMD ["/bin/bash"] diff --git a/broker/Driver.py b/broker/Driver.py index a5e2e3d4..cd93a3a8 100755 --- a/broker/Driver.py +++ b/broker/Driver.py @@ -114,12 +114,12 @@ def _tools(block_continue): # noqa if not output: log( - f"E: provider's registered gmail=[magenta]{gmail}[/magenta] does not match\n" - f" with the set gdrive's gmail=[magenta]{gdrive_gmail}[/magenta]" + f"E: provider's registered gmail=[m]{gmail}[/m] does not match\n" + f" with the set gdrive's gmail=[m]{gdrive_gmail}[/m]" ) raise QuietExit - log(f"==> provider_gmail=[magenta]{gmail}") + log(f"==> provider_gmail=[m]{gmail}") if env.IS_IPFS_USE: if not os.path.isfile(env.GPG_PASS_FILE): @@ -211,8 +211,8 @@ def process_logged_job(self, idx): index = self.logged_job.args["index"] self.job_block_number = self.logged_job["blockNumber"] self.cloud_storage_id = self.logged_job.args["cloudStorageID"] - log(f"## job_key=[magenta]{job_key}[/magenta] | index={index}", "b") - log(f" received_block_number={self.job_block_number}", "b") + log(f"## job_key=[m]{job_key}[/m] | index={index}", "b") + log(f" received_bn={self.job_block_number}", "b") log(f" tx_hash={self.logged_job['transactionHash'].hex()} | log_index={self.logged_job['logIndex']}", "b") log(f" provider={self.logged_job.args['provider']}", "b") log(f" received={self.logged_job.args['received']}", "b") @@ -302,7 +302,7 @@ def process_logged_jobs(self): except Exception as e: print_tb(e) log(str(e)) - breakpoint() # DEBUG + # breakpoint() # DEBUG def run_driver(given_bn): diff --git a/broker/_utils/_log.py b/broker/_utils/_log.py index b301f013..22543bed 100644 --- a/broker/_utils/_log.py +++ b/broker/_utils/_log.py @@ -24,12 +24,11 @@ thread_log_files: Dict[str, str] = {} custom_theme = Theme( { - "info": "dim cyan", - "warning": "magenta", + "info": "bold magenta", + # "info": "bold dim magenta", "danger": "bold red", "b": "bold", "m": "magenta", - # "magenta": "#ff79c6", } ) console = Console(theme=custom_theme) @@ -64,29 +63,23 @@ def __init__(self): self.LOG_FILENAME: Union[str, pathlib.Path] = "" self.console: Dict[str, Console] = {} - def print_color(self, text: str, color=None, is_bold=True, end=None) -> None: + def print_color(self, text: str, color=None, is_bold=True, end="\n") -> None: """Print string in color format.""" if text[0:3] in ["==>", "#> ", "## "]: if color and text == "==> ": - print(f"[bold {color}]{text[0:3]}[/bold {color}]", end="", flush=True) + console.print(f"[bold][{color}]{text[0:3]}[{color}][/bold]", end="") else: - print(f"[bold blue]{text[0:3]}[/bold blue]", end="", flush=True) + console.print(f"[bold blue]{text[0:3]}[/bold blue]", end="") text = text[3:] elif text[0:2] == "E:": - print("[bold red]E:[/bold red]", end="", flush=True) + console.print("[bold red]E:[/bold red]", end="") text = text[2:] - if end is None: - if is_bold: - print(f"[bold {color}]{text}[/bold {color}]") - else: - print(f"[{color}]{text}[/{color}]") + if is_bold: + console.print(f"[bold][{color}]{text}[{color}][/bold]", end=end) else: - if is_bold: - print(f"[bold {color}]{text}[/bold {color}]", end="", flush=True) - else: - print(f"[{color}]{text}[/{color}]", end="") + console.print(f"[{color}]{text}[/{color}]", end=end) def pre_color_check(self, text, color, is_bold): """Check color for substring.""" @@ -165,14 +158,14 @@ def console_ruler(msg="", character="=", color="cyan", fn=""): ll.console[fn] = Console(file=open(fn, "a"), force_terminal=True, theme=custom_theme) if msg: - console.rule(f"[bold {color}]{msg}", characters=character) - ll.console[fn].rule(f"[bold {color}]{msg}", characters=character) + console.rule(f"[bold][{color}]{msg}", characters=character) + ll.console[fn].rule(f"[bold][{color}]{msg}", characters=character) else: console.rule(characters=character) ll.console[fn].rule(characters=character) -def _log(text, color, is_bold, flush, fn, end, is_write=True, is_output=True): +def _log(text, color, is_bold, fn, end="\n", is_write=True, is_output=True): if not is_output: is_print = is_output else: @@ -192,11 +185,8 @@ def _log(text, color, is_bold, flush, fn, end, is_write=True, is_output=True): if is_print: if not IS_THREADING_MODE_PRINT or threading.current_thread().name == "MainThread": if is_bullet: - print( - f"[bold {_color}]{is_r}{text[:_len]}[/bold {_color}][{color}]{text[_len:]}[/{color}]", - end=end, - flush=flush, - ) + _msg = f"[bold][{_color}]{is_r}{text[:_len]}[/{_color}][/bold][{color}]{text[_len:]}[/{color}]" + console.print(_msg, end=end) else: ll.print_color(str(text), color, is_bold=is_bold, end=end) @@ -209,19 +199,19 @@ def _log(text, color, is_bold, flush, fn, end, is_write=True, is_output=True): if is_write and IS_WRITE: if is_bullet: ll.console[fn].print( - f"[bold {_color}]{is_r}{text[:_len]}[/bold {_color}][{color}]{_text}[/{color}]", + f"[bold][{_color}]{is_r}{text[:_len]}[/{_color}][/bold][{color}]{_text}[/{color}]", end=end, soft_wrap=True, ) else: if color: - ll.console[fn].print(f"[bold {color}]{_text}[/bold {color}]", end="", soft_wrap=True) + ll.console[fn].print(f"[bold][{color}]{_text}[/{color}][/bold]", end=end, soft_wrap=True) else: - ll.console[fn].print(_text, end="", soft_wrap=True) + ll.console[fn].print(_text, end=end, soft_wrap=True) else: text_to_write = "" if is_bullet: - text_to_write = f"[bold {_color}]{is_r}{_text[:_len]}[/bold {_color}][bold]{_text[_len:]}[/bold]" + text_to_write = f"[bold][{_color}]{is_r}{_text[:_len]}[/{_color}][/bold][bold]{_text[_len:]}[/bold]" else: if _color: text_to_write = f"[{_color}]{_text}[/{_color}]" @@ -229,28 +219,17 @@ def _log(text, color, is_bold, flush, fn, end, is_write=True, is_output=True): text_to_write = _text if is_print: - if end == "": - print(text_to_write, end="") - else: - print(text_to_write, flush=flush) + console.print(text_to_write, end=end) if is_write and IS_WRITE: ll.console[fn].print(text_to_write, end=end, soft_wrap=True) - if end is None: - if is_write and IS_WRITE: - ll.console[fn].print("") - - if color and is_bullet: - print() - def log( text="", color=None, fn=None, - end=None, - flush=False, + end="\n", is_write=True, where_back=0, is_code=False, @@ -264,9 +243,10 @@ def log( * colors: __ https://rich.readthedocs.io/en/latest/appendix/colors.html#appendix-colors - :param text: string to print - :param color: color of the complete string - :param fn: filename to write + :param end: (str, optional) Character to write at end of output. Defaults to "\\n". + :param text: String to print + :param color: Color of the complete string + :param fn: Filename to write """ is_bold: bool = False if color in ["bold", "b"]: @@ -331,7 +311,7 @@ def log( if is_write and IS_WRITE: ll.console[fn].print(text) else: - _log(text, color, is_bold, flush, fn, end, is_write, is_output) + _log(text, color, is_bold, fn, end, is_write, is_output) def WHERE(back=0): diff --git a/broker/_utils/tools.py b/broker/_utils/tools.py index 6130b5bf..baf49407 100755 --- a/broker/_utils/tools.py +++ b/broker/_utils/tools.py @@ -138,7 +138,7 @@ def print_tb(message=None, is_print_exc=True) -> None: sep_terminate = "raise Terminate" tb_text = "".join(traceback.format_exc()) if sep_terminate in tb_text: - tb_text = tb_text.split(sep_terminate, 1)[0] + "raise [magenta]Terminate[/magenta]()" + tb_text = tb_text.split(sep_terminate, 1)[0] + "raise [m]Terminate[/m]()" if is_print_exc and tb_text != "NoneType: None\n": log(tb_text.rstrip(), "bold", where_back=1) @@ -167,12 +167,12 @@ def _remove(path: str, is_verbose=False) -> None: shutil.rmtree(path) else: if is_verbose: - log(f"warning: {WHERE(1)} Nothing removed, following path does not exist:\n[magenta]{path}") + log(f"warning: {WHERE(1)} Nothing removed, following path does not exist:\n[m]{path}") return if is_verbose: - log(f"#> {WHERE(1)} following path:\n[magenta]{path}[/magenta] is removed") + log(f"#> {WHERE(1)} following path:\n[m]{path}[/m] is removed") except OSError as e: # Suppress the exception if it is a file not found error. # Otherwise, re-raise the exception. diff --git a/broker/bash_scripts/clean_for_new_test.sh b/broker/bash_scripts/clean_for_new_test.sh index 33ae29e0..55cfbfe3 100755 --- a/broker/bash_scripts/clean_for_new_test.sh +++ b/broker/bash_scripts/clean_for_new_test.sh @@ -79,7 +79,7 @@ echo "#> Running: ~/ebloc-broker/broker/python_scripts/clean_gdrive.py" ~/ebloc-broker/broker/python_scripts/clean_gdrive.py echo "[ OK ]" -for i in `gpg --list-keys --with-colons --fingerprint | sed -n 's/^fpr:::::::::\([[:alnum:]]\+\):/\1/p'`; do +for i in `gpg --list-keys --with-colons --fingerprint | sed -n 's/^fpr:::::::::\([[:alnum:]]\+\):/\1/p'`; do gpg --batch --delete-key "$i" 2>/dev/null done diff --git a/broker/bash_scripts/decrypt_gpg.sh b/broker/bash_scripts/decrypt_gpg.sh index 66b14126..5151c39c 100755 --- a/broker/bash_scripts/decrypt_gpg.sh +++ b/broker/bash_scripts/decrypt_gpg.sh @@ -1,6 +1,7 @@ #!/bin/bash -for dir in */*; do - gpg --verbose --batch --yes --output=$(echo $dir | rev | cut -c5- | rev) --pinentry-mode loopback --passphrase-file=/home/alper/.ebloc-broker/.gpg_pass.txt --decrypt "$dir" +for fn in */*; do + echo "$fn" + gpg --verbose --batch --yes --output=$(echo $fn | rev | cut -c5- | rev) --pinentry-mode loopback --passphrase-file=/home/alper/.ebloc-broker/.gpg_pass.txt --decrypt "$fn" done rm */*.diff.gz.gpg diff --git a/broker/bash_scripts/fetch_ipfs_hashes.sh b/broker/bash_scripts/fetch_ipfs_hashes.sh index 4545f3ee..ba5bcf0a 100755 --- a/broker/bash_scripts/fetch_ipfs_hashes.sh +++ b/broker/bash_scripts/fetch_ipfs_hashes.sh @@ -1,5 +1,6 @@ #!/bin/bash +# run at requester-node while read p; do ipfs get "$p" done /dev/null 2>&1 cd .. + +cd ipfs_gpg +for fn in */*; do + echo "$fn" + gpg --verbose --batch --yes --output=$(echo $fn | rev | cut -c5- | rev) --pinentry-mode loopback \ + --passphrase-file=/home/alper/.ebloc-broker/.gpg_pass.txt --decrypt "$fn" +done +rm */*.diff.gz.gpg +cd .. diff --git a/broker/bash_scripts/killall.sh b/broker/bash_scripts/killall.sh index 872f3600..0d340cd4 100755 --- a/broker/bash_scripts/killall.sh +++ b/broker/bash_scripts/killall.sh @@ -19,4 +19,4 @@ killall python 2> /dev/null killall python3 2> /dev/null echo "## killall all jobs in squeue" squeue | tail -n+2 | awk '{print $1}' | xargs scancel 2> /dev/null -printf "killall for ebloc-broker test [ ${GREEN}OK${NC} ]\n" +printf "killall for ebloc-broker test [ ${GREEN}OK${NC} ]\n" diff --git a/broker/bash_scripts/slurm_mail_prog.sh b/broker/bash_scripts/slurm_mail_prog.sh index 0c0a6606..c527f93b 100755 --- a/broker/bash_scripts/slurm_mail_prog.sh +++ b/broker/bash_scripts/slurm_mail_prog.sh @@ -42,7 +42,7 @@ if [[ $event == *"COMPLETED"* ]] || [[ $event == *"FAILED"* ]]; then fi arg0=$(echo $name | cut -d "$SEP" -f 1) # job_key arg1=$(echo $name | cut -d "$SEP" -f 2) # index - arg2=$(echo $name | cut -d "$SEP" -f 3) # received_block_number + arg2=$(echo $name | cut -d "$SEP" -f 3) # received_bn msg="$state fn=$name\n" msg="${msg}./end_code.py $arg0 $arg1 $arg2 \"$name\" $slurm_job_id" echo $msg | mail -s "Message Subject" $EMAIL @@ -57,7 +57,7 @@ if [[ $event == *"TIMEOUT"* ]]; then name=$(echo "$c" | grep -o -P '(?<=Name=).*(?=.sh Failed)') arg0=$(echo $name | cut -d "$SEP" -f 1) # job_key arg1=$(echo $name | cut -d "$SEP" -f 2) # index - arg2=$(echo $name | cut -d "$SEP" -f 3) # received_block_number + arg2=$(echo $name | cut -d "$SEP" -f 3) # received_bn msg="TIMEOUT fn=$name\n" msg="${msg}./end_code.py $arg0 $arg1 $arg2 \"$name\" $slurm_job_id" echo $msg | mail -s "Message Subject" $EMAIL @@ -72,7 +72,7 @@ if [[ $event == *"CANCELLED"* ]]; then name=$(echo "$c" | grep -o -P '(?<=Name=).*(?=.sh Ended)') arg0=$(echo $name | cut -d "$SEP" -f 1) # job_key arg1=$(echo $name | cut -d "$SEP" -f 2) # index - arg2=$(echo $name | cut -d "$SEP" -f 3) # received_block_number + arg2=$(echo $name | cut -d "$SEP" -f 3) # received_bn msg="CANCELLED fn=$name\n" msg="${msg}./end_code.py $arg0 $arg1 $arg2 \"$name\" $slurm_job_id" echo $msg | mail -s "Message Subject" $EMAIL diff --git a/broker/drivers/driver_gc.py b/broker/drivers/driver_gc.py index ff108602..08c3648f 100644 --- a/broker/drivers/driver_gc.py +++ b/broker/drivers/driver_gc.py @@ -19,12 +19,10 @@ def main(): for document in cursor: # print(document) # TODO: requester paramer as get_storage_duration - received_block_number, storage_duration = Ebb.get_job_storage_duration( - env.PROVIDER_ID, document["sourceCodeHash"] - ) - end_block_time = received_block_number + storage_duration * cfg.ONE_HOUR_BLOCK_DURATION + received_bn, storage_duration = Ebb.get_job_storage_duration(env.PROVIDER_ID, document["sourceCodeHash"]) + end_block_time = received_bn + storage_duration * cfg.ONE_HOUR_BLOCK_DURATION storageID = document["storageID"] - if end_block_time < block_number and received_block_number != 0: + if end_block_time < block_number and received_bn != 0: if storageID in (StorageID.IPFS, StorageID.IPFS_GPG): ipfsHash = document["jobKey"] print(run(["ipfs", "pin", "rm", ipfsHash])) @@ -39,7 +37,7 @@ def main(): print(cached_file_name) _remove(cached_file_name) - print(received_block_number) + print(received_bn) coll.delete_one({"jobKey": ipfsHash}) diff --git a/broker/drivers/gdrive.py b/broker/drivers/gdrive.py index cf9da71a..dce38f21 100755 --- a/broker/drivers/gdrive.py +++ b/broker/drivers/gdrive.py @@ -192,7 +192,7 @@ def get_data_init(self, key, _id, is_job_key=False): mime_type = gdrive.get_file_info(gdrive_output, _type="Mime") folder_name = gdrive.get_file_info(gdrive_output, _type="Name") - log(f"==> mime_type=[magenta]{mime_type}") + log(f"==> mime_type=[m]{mime_type}") if is_job_key: # key for the sourceCode tar.gz file is obtained try: @@ -241,7 +241,7 @@ def get_data(self, key, _id, is_job_key=False): # folder is already stored by its code_hash code_hash = name.replace(".tar.gz", "") log(f"==> name={name}") - log(f"==> mime_type=[magenta]{mime_type}") + log(f"==> mime_type=[m]{mime_type}") if _id == 0: # source code folder, ignore downloading result-* name = f"{name}.tar.gz" diff --git a/broker/drivers/storage_class.py b/broker/drivers/storage_class.py index 4998c369..162295f9 100755 --- a/broker/drivers/storage_class.py +++ b/broker/drivers/storage_class.py @@ -229,14 +229,13 @@ def is_run_exists_in_tar(self, tar_path) -> bool: ) if output.count("/") == 1: # main folder should contain the 'run.sh' file - log(f"[magenta]./run.sh[/magenta] exists under the parent folder{ok()}", "bold") + log(f"[m]./run.sh[/m] exists under the parent folder{ok()}", "bold") return True else: log("E: run.sh does not exist under the parent folder") return False except: - breakpoint() # DEBUG - log(f"E: run.sh does not exist under the tar={tar_path}") + log(f"E: `run.sh` file does not exist under the tar={tar_path}") return False def check_run_sh(self) -> bool: @@ -349,7 +348,7 @@ def _sbatch_call(self) -> bool: timestamp = p2.communicate()[0].decode("utf-8").strip() log(f"timestamp={timestamp}, ", "bold", end="") write_to_file(self.results_folder_prev / "timestamp.txt", timestamp) - log(f"job_received_block_number={job_block_number}", "bold") + log(f"job_received_bn={job_block_number}", "bold") log("## Adding recevied job into the mongoDB database") self.Ebb.mongo_broker.add_item( job_key, diff --git a/broker/eblocbroker_scripts/Contract.py b/broker/eblocbroker_scripts/Contract.py index 8f3ddc64..56923a51 100644 --- a/broker/eblocbroker_scripts/Contract.py +++ b/broker/eblocbroker_scripts/Contract.py @@ -38,8 +38,7 @@ class Contract: get_job_code_hashes, get_job_info, get_job_info_print, - get_job_owner, - set_job_received_block_number, + set_job_received_bn, update_job_cores, ) from broker.eblocbroker_scripts.get_provider_info import get_provider_info diff --git a/broker/eblocbroker_scripts/get_data_price.py b/broker/eblocbroker_scripts/get_data_price.py index 02f3fe07..d524a3b5 100755 --- a/broker/eblocbroker_scripts/get_data_price.py +++ b/broker/eblocbroker_scripts/get_data_price.py @@ -6,21 +6,63 @@ Ebb = cfg.Ebb -def get_data_price(provider, source_code_hash): - code_hash_bytes = cfg.w3.toBytes(text=source_code_hash) - data_block_numbers = Ebb.get_registered_data_bn(provider, code_hash_bytes) - (price, commitment_block_duration) = Ebb.get_registered_data_price( - provider, code_hash_bytes, data_block_numbers[-1] +def get_data_price(provider, source_code_hash, is_verbose=True): + bn = Ebb.get_block_number() + note_msg = "" + if isinstance(source_code_hash, bytes): + code_hash_bytes = source_code_hash + else: + code_hash_bytes = cfg.w3.toBytes(text=source_code_hash) + + registered_data_bn_list = Ebb.get_registered_data_bn(provider, code_hash_bytes) + if bn > registered_data_bn_list[-1]: + data_price_set_bn = registered_data_bn_list[-1] + else: + data_price_set_bn = registered_data_bn_list[-2] + if is_verbose: + remaining_min = (registered_data_bn_list[-1] - bn) * 6 / 60 + note_msg = f"{remaining_min} minutes remaining for new price to take place" + + (price, commitment_block_dur) = Ebb.get_registered_data_price(provider, code_hash_bytes, data_price_set_bn) + if is_verbose: + log(f" * price={price}") + log(f" * commitment_block_dur={commitment_block_dur}") + + prices = [] + for _bn in registered_data_bn_list: + (price, commitment_block_dur) = Ebb.get_registered_data_price(provider, code_hash_bytes, _bn) + prices.append(price) + + log(f" * registered_data_bn_list={registered_data_bn_list}") + log(f" * prices={prices}") + if note_msg: + log(f"## {note_msg}") + + return price, commitment_block_dur + + +def get_latest_data_price(provider, source_code_hash, is_verbose=True): + if isinstance(source_code_hash, bytes): + code_hash_bytes = source_code_hash + else: + code_hash_bytes = cfg.w3.toBytes(text=source_code_hash) + + registered_data_bn_list = Ebb.get_registered_data_bn(provider, code_hash_bytes) + (price, commitment_block_dur) = Ebb.get_registered_data_price( + provider, code_hash_bytes, registered_data_bn_list[-1] ) - log(f"==> price={price}") - log(f"==> commitment_block_duration={commitment_block_duration}") - log(f"==> data_block_numbers={data_block_numbers}") + if is_verbose: + log(f" * price={price}") + log(f" * commitment_block_dur={commitment_block_dur}") + log(f" * registered_data_bn_list={registered_data_bn_list}") + + return price, commitment_block_dur def main(): - address = "0x29e613b04125c16db3f3613563bfdd0ba24cb629" - code_hash = "9d5d892a63b5758090258300a59eb389" - get_data_price(address, code_hash) + provider_address = "0x29e613b04125c16db3f3613563bfdd0ba24cb629" + code_hash = "050e6cc8dd7e889bf7874689f1e1ead6" + get_data_price(provider_address, code_hash) if __name__ == "__main__": diff --git a/broker/eblocbroker_scripts/get_job_info.py b/broker/eblocbroker_scripts/get_job_info.py index eb25057b..44018164 100755 --- a/broker/eblocbroker_scripts/get_job_info.py +++ b/broker/eblocbroker_scripts/get_job_info.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 import sys -import traceback -from math import ceil from broker import cfg from broker._utils._log import br, log @@ -14,7 +12,7 @@ def analyze_data(self, key, provider=None): """Obtain information related to source-code data.""" - current_block_number = cfg.Ebb.get_block_number() + current_bn = cfg.Ebb.get_block_number() self.received_block = [] self.storage_duration = [] self.job_info["is_cached"] = {} @@ -36,10 +34,10 @@ def analyze_data(self, key, provider=None): self.job_info["is_cached"][code_hash_str] = False # FIXME double check # if remaining time to cache is 0, then caching is requested for the # related code_hash - if ds.received_block + ds.storage_duration >= current_block_number: - if ds.received_block < current_block_number: + if ds.received_block + ds.storage_duration >= current_bn: + if ds.received_block < current_bn: self.job_info["is_cached"][code_hash_str] = True - elif ds.received_block == current_block_number: + elif ds.received_block == current_bn: if code_hash in self.job_info["is_cached"]: self.job_info["is_cached"][code_hash_str] = True else: @@ -49,57 +47,57 @@ def analyze_data(self, key, provider=None): log(f" * code_hash{br(idx)}=[green]{code_hash_str}") log(f"==> received_block={ds.received_block}") - log(f"==> storage_duration{br(self.job_info['received_block_number'])}={ds.storage_duration}") + log(f"==> storage_duration{br(self.job_info['received_bn'])}={ds.storage_duration}") log(f"==> cloud_storage_id{br(idx)}={StorageID(self.job_info['cloudStorageID'][idx]).name}") log(f"==> cached_type={CacheType(self.job_info['cacheType'][idx]).name}") log(f"==> is_cached={self.job_info['is_cached'][code_hash_str]}") -def set_job_received_block_number(self, received_block_number): - if not received_block_number: - received_block_number = self.deployed_block_number +def set_job_received_bn(self, received_bn): + if not received_bn: + received_bn = self.deployed_block_number self.to_block = "latest" else: - self.to_block = int(received_block_number) + self.to_block = int(received_bn) - if int(received_block_number) > int(self.job_info["received_block_number"]): - self.job_info["received_block_number"] = received_block_number + if int(received_bn) > int(self.job_info["received_bn"]): + self.job_info["received_bn"] = received_bn def update_job_cores(self, provider, job_key, index=0, received_bn=0) -> int: """Update job cores.""" - self.set_job_received_block_number(received_bn) + self.set_job_received_bn(received_bn) try: event_filter = self._eblocbroker.events.LogJob.createFilter( argument_filters={"provider": str(provider)}, - fromBlock=int(self.job_info["received_block_number"]), + fromBlock=int(self.job_info["received_bn"]), toBlock=self.to_block, ) for logged_job in event_filter.get_all_entries(): if logged_job.args["jobKey"] == job_key and logged_job.args["index"] == int(index): - self.job_info["received_block_number"] = received_bn = int(logged_job["blockNumber"]) + self.job_info["received_bn"] = received_bn = int(logged_job["blockNumber"]) self.job_info.update({"core": logged_job.args["core"]}) self.job_info.update({"run_time": logged_job.args["runTime"]}) self.job_info.update({"cloudStorageID": logged_job.args["cloudStorageID"]}) self.job_info.update({"cacheType": logged_job.args["cacheType"]}) break else: - log(f"E: failed to find job({job_key}) to update") + log(f"E: failed to find and update job({job_key})") return received_bn except Exception as e: - print_tb(f"E: Failed to update_job_cores.\n{e}") + print_tb(f"E: Failed to update job cores.\n{e}") raise e -def get_job_code_hashes(self, provider, job_key, index, received_block_number=0): - """code_hashes of the completed job is obtained from its event.""" - # job_info["received_block_number"] - self.set_job_received_block_number(received_block_number) +def get_job_code_hashes(self, provider, job_key, index, received_bn=0): + """Return code hashes of the completed job is obtained from its event.""" + # job_info["received_bn"] + self.set_job_received_bn(received_bn) try: event_filter = self._eblocbroker.events.LogJob.createFilter( argument_filters={"provider": str(provider)}, - fromBlock=int(self.job_info["received_block_number"]), + fromBlock=int(self.job_info["received_bn"]), toBlock=self.to_block, ) for logged_job in event_filter.get_all_entries(): @@ -113,23 +111,19 @@ def get_job_code_hashes(self, provider, job_key, index, received_block_number=0) raise e -def get_job_info_print(self, provider, job_key, index, received_block_number): +def get_job_info_print(self, provider, job_key, index, received_bn): Ebb = cfg.Ebb - elapsed_time = 0 result_ipfs_hash = "" if self.job_info["result_ipfs_hash"] != empty_bytes32 and self.job_info["result_ipfs_hash"] != "": result_ipfs_hash = bytes32_to_ipfs(self.job_info["result_ipfs_hash"]) - if self.job_info["end_timestamp"]: - elapsed_time = int(self.job_info["end_timestamp"]) - int(self.job_info["start_timestamp"]) - if isinstance(self.job_info, dict): log(f"==> state_code={state.inv_code[self.job_info['stateCode']]}({self.job_info['stateCode']})") log(self.job_info) if result_ipfs_hash: log(f"==> result_ipfs_hash={result_ipfs_hash}") - Ebb.get_job_code_hashes(provider, job_key, index, received_block_number) + Ebb.get_job_code_hashes(provider, job_key, index, received_bn) if self.job_info["code_hashes"]: log("code_hashes:", "bold blue") for idx, code_hash in enumerate(self.job_info["code_hashes"]): @@ -152,21 +146,12 @@ def get_job_info_print(self, provider, job_key, index, received_block_number): else: print(self.job_info) - assert elapsed_time >= 0, "elapsed_time is negative" - - -def get_job_owner(self, provider, job_key, index, job_id=0): - job, received, job_owner, data_transfer_in, data_transfer_out = self._get_job_info( - provider, job_key, int(index), int(job_id) - ) - return job_owner - -def get_job_info(self, provider, job_key, index, job_id, received_block_number=0, is_print=True, is_log_print=False): +def get_job_info(self, provider, job_key, index, job_id, received_bn=0, is_print=True, is_log_print=False): """Return information of the job.""" if is_print: fn = "~/ebloc-broker/broker/eblocbroker_scripts/get_job_info.py" - log(f"$ {fn} {provider} {job_key} {index} {job_id} {received_block_number}", "bold cyan", is_code=True) + log(f"$ {fn} {provider} {job_key} {index} {job_id} {received_bn}", "bold cyan", is_code=True) try: provider = cfg.w3.toChecksumAddress(provider) @@ -192,62 +177,58 @@ def get_job_info(self, provider, job_key, index, job_id, received_block_number=0 "price_data_transfer": job_prices[3], "price_storage": job_prices[4], "price_cache": job_prices[5], - "received_block_number": received_block_number, + "received_bn": received_bn, "core": None, "run_time": None, - "actual_run_time": None, + "actual_elapsed_time": None, "cloudStorageID": None, "result_ipfs_hash": "", - "end_timestamp": None, - "refundedGwei": None, - "receivedGwei": None, + "refunded_gwei": None, + "received_gwei": None, "code_hashes": None, "data_transfer_in_to_download": None, "data_transfer_out_used": None, "storage_duration": None, } - received_block_number = self.update_job_cores(provider, job_key, index, received_block_number) - if not received_block_number or received_block_number == self.deployed_block_number: + received_bn = self.update_job_cores(provider, job_key, index, received_bn) + if not received_bn or received_bn == self.deployed_block_number: # First reading from the mongoDB, this will increase the speed to fetch from the logged data - received_block_number_temp = self.mongo_broker.get_job_block_number( - self.job_info["job_owner"], job_key, index - ) - if received_block_number == 0 and received_block_number_temp == 0: - received_block_number = self.deployed_block_number - - if received_block_number > self.deployed_block_number: - self.job_info["received_block_number"] = received_block_number + received_bn_temp = self.mongo_broker.get_job_block_number(self.job_info["job_owner"], job_key, index) + if received_bn == 0 and received_bn_temp == 0: + received_bn = self.deployed_block_number + + if received_bn > self.deployed_block_number: + self.job_info["received_bn"] = received_bn # else: - # to_block = int(received_block_number) + # to_block = int(received_bn) event_filter = self._eblocbroker.events.LogProcessPayment.createFilter( argument_filters={"provider": str(provider)}, - fromBlock=int(received_block_number), + fromBlock=int(received_bn), toBlock="latest", ) for logged_receipt in event_filter.get_all_entries(): if logged_receipt.args["jobKey"] == job_key and logged_receipt.args["index"] == int(index): self.job_info.update({"result_ipfs_hash": logged_receipt.args["resultIpfsHash"]}) - # self.job_info.update({"end_timestamp": logged_receipt.args["endTimestamp"]}) - self.job_info.update({"receivedGwei": logged_receipt.args["receivedGwei"]}) - self.job_info.update({"refundedGwei": logged_receipt.args["refundedGwei"]}) + self.job_info.update({"received_gwei": logged_receipt.args["receivedGwei"]}) + self.job_info.update({"refunded_gwei": logged_receipt.args["refundedGwei"]}) self.job_info.update({"data_transfer_in_to_download": logged_receipt.args["dataTransferIn"]}) self.job_info.update({"data_transfer_out_used": logged_receipt.args["dataTransferOut"]}) self.job_info.update({"data_transfer_out_used": logged_receipt.args["dataTransferOut"]}) - self.job_info["actual_run_time"] = ceil( - self.job_info["end_timestamp"] - self.job_info["start_timestamp"] - ) + self.job_info.update({"actual_elapsed_time": logged_receipt.args["elapsedTime"]}) + if self.job_info["result_ipfs_hash"] == empty_bytes32: + self.job_info.update({"result_ipfs_hash": b""}) + break except Exception as e: - log(f"E: Failed to get_job_info: {traceback.format_exc()}") raise e if str(self.job_info["core"]) == "0": raise Exception("Failed to get_job_info: Out of index") if is_log_print: - self.get_job_info_print(provider, job_key, index, received_block_number) + self.get_job_info_print(provider, job_key, index, received_bn) - if self.job_info["storage_duration"] is None: + if not self.job_info["storage_duration"]: self.job_info["storage_duration"] = [] for _ in range(len(self.job_info["cacheType"])): self.job_info["storage_duration"].append(0) @@ -256,7 +237,7 @@ def get_job_info(self, provider, job_key, index, job_id, received_block_number=0 def main(): - received_block_number = 0 + received_bn = 0 job_id = 0 if len(sys.argv) > 3: provider = str(sys.argv[1]) @@ -266,16 +247,12 @@ def main(): job_id = int(sys.argv[4]) if len(sys.argv) == 6: - received_block_number = int(sys.argv[5]) + received_bn = int(sys.argv[5]) else: - log("E: Provide as arguments") + log("E: Provide as arguments") sys.exit(1) - try: - Ebb = cfg.Ebb - Ebb.get_job_info(provider, job_key, index, job_id, received_block_number, is_log_print=True) - except Exception as e: - raise e + cfg.Ebb.get_job_info(provider, job_key, index, job_id, received_bn, is_log_print=True) if __name__ == "__main__": diff --git a/broker/eblocbroker_scripts/job.py b/broker/eblocbroker_scripts/job.py index 333e6c20..7c0d9efc 100755 --- a/broker/eblocbroker_scripts/job.py +++ b/broker/eblocbroker_scripts/job.py @@ -276,7 +276,7 @@ def add_empty_data_item(self): self.storage_hours.append(0) self.storage_ids.append(StorageID.NONE) self.data_transfer_ins.append(0) - self.data_prices_set_block_numbers.append(0) # TODO: calculate from the contract + self.data_prices_set_block_numbers.append(0) def print_before_submit(self): for idx, code_hash in enumerate(self.code_hashes_str): @@ -305,18 +305,17 @@ def _search_best_provider(self, requester, is_verbose=False): selected_provider = provider selected_price = _price - is_all_same = all(x == price_list[0] for x in price_list) - return selected_provider, selected_price, is_all_same + is_all_equal = all(x == price_list[0] for x in price_list) + return selected_provider, selected_price, is_all_equal def search_best_provider(self, requester): - provider_to_share, best_price, is_all_same = self._search_best_provider(requester, is_verbose=True) + provider_to_share, best_price, is_all_equal = self._search_best_provider(requester, is_verbose=True) self.price, *_ = self.cost(provider_to_share, requester) if self.price != best_price: raise Exception(f"job_price={self.price} and best_price={best_price} does not match") - if is_all_same: # force to submit given provider address + if is_all_equal: # force to submit given provider address provider_to_share = self.Ebb.w3.toChecksumAddress(self.provider_addr) - # breakpoint() # DEBUG log(f"[green]##[/green] provider_to_share={provider_to_share} | price={best_price}", "bold") return self.Ebb.w3.toChecksumAddress(provider_to_share) @@ -424,7 +423,7 @@ def set_storage_cost(self, is_verbose=False): and ds.is_verified_used ): if is_verbose: - log(f"==> For {bytes32_to_ipfs(code_hash)} cost of storage is not paid") + log(f"==> for {bytes32_to_ipfs(code_hash)} cost of storage is not paid") else: if self.job.data_prices_set_block_numbers[idx] > 0 or self.job.storage_ids[idx] == StorageID.NONE: if self.job.data_prices_set_block_numbers[idx] == 0: @@ -437,12 +436,11 @@ def set_storage_cost(self, is_verbose=False): data_price_set_bn = self.job.data_prices_set_block_numbers[idx] # if true, registered data's price should be considered for storage - output = self.ebb.getRegisteredDataPrice( + (data_price, *_) = self.Ebb.get_registered_data_price( self.job.provider, code_hash, data_price_set_bn, ) - data_price = output[0] self.storage_cost += data_price self.registered_data_cost_list[_code_hash] = data_price self.registered_data_cost += data_price diff --git a/broker/eblocbroker_scripts/log_job.py b/broker/eblocbroker_scripts/log_job.py index 1f9e36d9..f9cdfd45 100755 --- a/broker/eblocbroker_scripts/log_job.py +++ b/broker/eblocbroker_scripts/log_job.py @@ -35,7 +35,7 @@ def handle_event(logged_jobs): log(f"received={job.args['received']}") for value in job.args["sourceCodeHash"]: sourceCodeHash = job.args["sourceCodeHash"][value] - log(f"source_code_hash{br(value)} => {bytes32_to_ipfs(sourceCodeHash)}") + log(f"code_hash{br(value)} => {bytes32_to_ipfs(sourceCodeHash)}") console_ruler() @@ -103,8 +103,8 @@ def main(): from_block = int(sys.argv[1]) provider = str(sys.argv[2]) # Only obtains jobs that are submitted to the provider. else: - from_block = 13172386 - provider = "0x57b60037b82154ec7149142c606ba024fbb0f991" + from_block = 15867616 + provider = "0x1926b36af775e1312fdebcc46303ecae50d945af" handle_event(logged_jobs=Ebb.run_log_job(from_block, provider)) diff --git a/broker/eblocbroker_scripts/process_payment.py b/broker/eblocbroker_scripts/process_payment.py index dfbfc66c..327b8b7a 100755 --- a/broker/eblocbroker_scripts/process_payment.py +++ b/broker/eblocbroker_scripts/process_payment.py @@ -23,7 +23,7 @@ def process_payment( data_transfer_out, core, run_time, - received_block_number=0, + received_bn=0, ): """Process payment of the received job.""" if not result_ipfs_hash: @@ -42,7 +42,7 @@ def process_payment( if len(result_ipfs_hash) != 46 and cloud_storage_id in (StorageID.IPFS, StorageID.IPFS_GPG): raise Exception("Result ipfs's length does not match with its original length, check your job_key") - self.get_job_info(env.PROVIDER_ID, job_key, index, job_id, received_block_number, is_print=False) + self.get_job_info(env.PROVIDER_ID, job_key, index, job_id, received_bn, is_print=False) if self.job_info["stateCode"] == state.code["COMPLETED"]: log(f"warning: job ({job_key},{index},{job_id}) is completed and already get paid") sys.exit(1) diff --git a/broker/eblocbroker_scripts/register_data.py b/broker/eblocbroker_scripts/register_data.py index acdfbf9d..3b4340cc 100755 --- a/broker/eblocbroker_scripts/register_data.py +++ b/broker/eblocbroker_scripts/register_data.py @@ -1,11 +1,10 @@ #!/usr/bin/env python3 -from contextlib import suppress - from broker import cfg from broker._utils.tools import log from broker._utils.web3_tools import get_tx_status from broker.config import env +from broker.eblocbroker_scripts.get_data_price import get_latest_data_price from broker.errors import QuietExit from broker.utils import print_tb @@ -23,23 +22,26 @@ def _register_data(source_code_hash, data_price, commitment_dur): log(f"warning: provider [green]{env.PROVIDER_ID}[/green]'s orcid id is not authenticated yet") raise QuietExit - source_code_hash_bytes = cfg.w3.toBytes(text=source_code_hash) - with suppress(Exception): - (price, _commitment_dur) = cfg.Ebb.get_registered_data_price(env.PROVIDER_ID, source_code_hash_bytes, 0) - bn = cfg.Ebb.get_registered_data_bn(env.PROVIDER_ID, source_code_hash_bytes) + code_hash_bytes = cfg.w3.toBytes(text=source_code_hash) + try: + (price, _commitment_dur) = get_latest_data_price(env.PROVIDER_ID, code_hash_bytes, is_verbose=False) + bn = cfg.Ebb.get_registered_data_bn(env.PROVIDER_ID, code_hash_bytes) if bn[0] == 0: - log(f"E: registered block number returns zero for {source_code_hash_bytes}") + log(f"E: registered block number returns zero for {code_hash_bytes}") is_exit = True log( - f"## data([green]{source_code_hash}[/green]) is already registerered.\n" - "Use [blue]./update_data_price.py[/blue] to update its price" + f"## data([green]{source_code_hash}[/green]) is already registerered" + # "\nUse [blue]./update_data_price.py[/blue] to update its price" ) if data_price == price: is_exit = True else: log("## Update price") is_update = True + except Exception as e: + print_tb(e) + breakpoint() # DEBUG if is_exit: raise QuietExit @@ -50,9 +52,9 @@ def _register_data(source_code_hash, data_price, commitment_dur): try: if not is_update: - tx = Ebb.register_data(source_code_hash_bytes, data_price, commitment_dur) + tx = Ebb.register_data(code_hash_bytes, data_price, commitment_dur) else: - tx = Ebb.update_data_price(source_code_hash_bytes, data_price, commitment_dur) + tx = Ebb.update_data_price(code_hash_bytes, data_price, commitment_dur) get_tx_status(Ebb.tx_id(tx)) except QuietExit as e: diff --git a/broker/eblocbroker_scripts/register_provider.py b/broker/eblocbroker_scripts/register_provider.py index 19eab9fe..88259d3c 100755 --- a/broker/eblocbroker_scripts/register_provider.py +++ b/broker/eblocbroker_scripts/register_provider.py @@ -61,14 +61,14 @@ def get_ipfs_id() -> str: except ipfshttpclient.exceptions.ConnectionError: log( "E: Failed to establish a new connection to IPFS, please run it on the background.\n" - "Please run [magenta]~/ebloc-broker/broker/_daemons/ipfs.py" + "Please run [m]~/ebloc-broker/broker/_daemons/ipfs.py" ) sys.exit(1) except Exception as e: print_tb(e) log( "E: Failed to establish a new connection to IPFS, please run it on the background.\n" - "Please run [magenta]~/ebloc-broker/broker/_daemons/ipfs.py" + "Please run [m]~/ebloc-broker/broker/_daemons/ipfs.py" ) sys.exit(1) @@ -80,7 +80,7 @@ def get_ipfs_id() -> str: def error_msg(key, yaml_fn): - log(f"E: [blue]{key}[/blue] is empty in [magenta]{yaml_fn}") + log(f"E: [blue]{key}[/blue] is empty in [m]{yaml_fn}") def register_provider_wrapper(yaml_fn): diff --git a/broker/eblocbroker_scripts/update_data_price.py b/broker/eblocbroker_scripts/update_data_price.py index 66a4b93b..206a5f7a 100755 --- a/broker/eblocbroker_scripts/update_data_price.py +++ b/broker/eblocbroker_scripts/update_data_price.py @@ -14,22 +14,20 @@ def _update_data_price(): log(f"warning: Provider {env.PROVIDER_ID} is not registered.\n") raise QuietExit - source_code_hash = "b6aaf03752dc68d625fc57b451faa2bf" - new_data_price = 21 + code_hash = "050e6cc8dd7e889bf7874689f1e1ead6" + new_data_price = 20 commitment_block_duration = 600 - source_code_hash_bytes = cfg.w3.toBytes(text=source_code_hash) + code_hash_bytes = cfg.w3.toBytes(text=code_hash) try: - (price, _commitment_block_duration) = cfg.Ebb.get_registered_data_price( - env.PROVIDER_ID, source_code_hash_bytes, 0 - ) + (price, _commitment_block_duration) = cfg.Ebb.get_registered_data_price(env.PROVIDER_ID, code_hash_bytes, 0) if price == new_data_price and _commitment_block_duration == commitment_block_duration: - log(f"## data([green]{source_code_hash}[/green]) already registerered with the given values") + log(f"## data([green]{code_hash}[/green]) already registerered with the given values") raise QuietExit except: raise QuietExit try: - tx = Ebb.update_data_price(source_code_hash_bytes, new_data_price, commitment_block_duration) + tx = Ebb.update_data_price(code_hash_bytes, new_data_price, commitment_block_duration) get_tx_status(Ebb.tx_id(tx)) except QuietExit: pass diff --git a/broker/eblocbroker_scripts/update_provider_info.py b/broker/eblocbroker_scripts/update_provider_info.py index 4bf15284..3f52468e 100755 --- a/broker/eblocbroker_scripts/update_provider_info.py +++ b/broker/eblocbroker_scripts/update_provider_info.py @@ -57,10 +57,10 @@ def update_provider_info(self, gpg_fingerprint, gmail, f_id, ipfs_id): gpg_fingerprint = cfg.ipfs.get_gpg_fingerprint(env.GMAIL) f_id = env.OC_USER - log(f"## gmail=[magenta]{env.GMAIL}") + log(f"## gmail=[m]{env.GMAIL}") log(f"## gpg_fingerprint={gpg_fingerprint}") - log(f"## ipfs_id=[magenta]{ipfs_id}") - log(f"## fid=[magenta]{f_id}") + log(f"## ipfs_id=[m]{ipfs_id}") + log(f"## fid=[m]{f_id}") try: cfg.ipfs.is_gpg_published(gpg_fingerprint) tx_hash = Ebb.update_provider_info(gpg_fingerprint, env.GMAIL, f_id, ipfs_id) diff --git a/broker/end_code.py b/broker/end_code.py index f374e067..7ebf43e1 100755 --- a/broker/end_code.py +++ b/broker/end_code.py @@ -139,7 +139,7 @@ def upload(self, key, is_job_key): raise Exception(f"{WHERE(1)} E: {key} does not have a match, meta_data={meta_data}. {e}") from e mime_type = gdrive.get_file_info(gdrive_info, "Mime") - log(f"mime_type=[magenta]{mime_type}", "bold") + log(f"mime_type=[m]{mime_type}", "bold") self.data_transfer_out += calculate_size(self.patch_file) log(f"data_transfer_out={self.data_transfer_out} MB =>" f" rounded={int(self.data_transfer_out)} MB", "bold") if "folder" in mime_type: @@ -559,7 +559,7 @@ def run(self): self.elapsed_time = run_time[self.job_id] log(f"finalized_elapsed_time={self.elapsed_time}", "bold") - log("## job_info=", "bold magenta", end="") + log("## job_info=", "info", end="") log(pprint.pformat(self.job_info), "bold") try: self.get_cloud_storage_class(0).initialize(self) diff --git a/broker/gdrive/README.org b/broker/gdrive/README.org index e528627b..d4e33b36 100644 --- a/broker/gdrive/README.org +++ b/broker/gdrive/README.org @@ -1,7 +1,7 @@ * Installation -link: -https://stackoverflow.com/questions/65396850/how-to-handle-app-is-temporarily-blocked-from-logging-in-with-your-google-accou/65507155#65507155 +- [[https://console.cloud.google.com/apis/credentials]] +- https://stackoverflow.com/questions/65396850/how-to-handle-app-is-temporarily-blocked-from-logging-in-with-your-google-accou/65507155#65507155 ~@tellowkrinkle's [comment][1] help me to solve the issue~ diff --git a/broker/gdrive/submit.py b/broker/gdrive/submit.py index 21f4b49e..a41876a9 100755 --- a/broker/gdrive/submit.py +++ b/broker/gdrive/submit.py @@ -48,7 +48,7 @@ def _submit(job, provider, key, requester, required_confs): def _share_folders(folder_ids_to_share, provider_gmail): for folder_id in folder_ids_to_share: cmd = ["gdrive", "share", folder_id, "--role", "writer", "--type", "user", "--email", provider_gmail] - log(f"share_output=[magenta]{run(cmd)}", "bold") + log(f"share_output=[m]{run(cmd)}", "bold") def submit_gdrive(job: Job, is_pass=False, required_confs=1): @@ -78,7 +78,7 @@ def submit_gdrive(job: Job, is_pass=False, required_confs=1): try: job.Ebb.is_provider_valid(provider_addr_to_submit) provider_info = job.Ebb.get_provider_info(provider_addr_to_submit) - # log(f"## provider_addr_to_submit=[magenta]{provider_addr_to_submit}") + # log(f"## provider_addr_to_submit=[m]{provider_addr_to_submit}") log(f"==> provider's available_core_num={provider_info['available_core_num']}") log(f"==> provider's price_core_min={provider_info['price_core_min']}") provider_gmail = provider_info["gmail"] diff --git a/broker/imports.py b/broker/imports.py index b3f2e4f9..a5491f1f 100755 --- a/broker/imports.py +++ b/broker/imports.py @@ -124,7 +124,7 @@ def connect_into_eblocbroker() -> None: try: log( "warning: [green]bloxberg[/green] key is added into the " - "[magenta]~/.brownie/network-config.yaml[/magenta] file. Please try again." + "[m]~/.brownie/network-config.yaml[/m] file. Please try again." ) network.connect("bloxberg") except KeyError: @@ -133,7 +133,7 @@ def connect_into_eblocbroker() -> None: project = project.load(env.CONTRACT_PROJECT_PATH) config.ebb = project.eBlocBroker.at(env.CONTRACT_ADDRESS) config.ebb.contract_address = cfg.w3.toChecksumAddress(env.CONTRACT_ADDRESS) - #: for the contract's events + #: required for to fetch the contract's events config._eblocbroker = cfg.w3.eth.contract(env.CONTRACT_ADDRESS, abi=read_abi_file()) except Exception as e: print_tb(e) diff --git a/broker/ipfs/submit.py b/broker/ipfs/submit.py index e6d36299..2a33417b 100755 --- a/broker/ipfs/submit.py +++ b/broker/ipfs/submit.py @@ -59,7 +59,7 @@ def pre_check(job: Job, requester): sys.exit() if not os.path.isfile(env.GPG_PASS_FILE): - log(f"E: Please store your gpg password in the [magenta]{env.GPG_PASS_FILE}[/magenta]\nfile for decrypting") + log(f"E: Please store your gpg password in the [m]{env.GPG_PASS_FILE}[/m]\nfile for decrypting") raise QuietExit start_ipfs_daemon() diff --git a/broker/libs/_git.py b/broker/libs/_git.py index db30d9a1..425f86d1 100644 --- a/broker/libs/_git.py +++ b/broker/libs/_git.py @@ -127,7 +127,7 @@ def diff_patch(path: Path, source_code_hash, index, target_path, home_dir): patch_upload_fn = f"{patch_name}.gz" # file to be uploaded as zip patch_file = f"{target_path}/{patch_upload_fn}" - log(f"patch_path=[magenta]{patch_upload_fn}", "bold") + log(f"patch_path=[m]{patch_upload_fn}", "bold") try: run(["env", f"HOME={home_dir}", "git", "add", "-A"]) diff_and_gzip(patch_file, home_dir) @@ -242,7 +242,7 @@ def apply_patch(git_folder, patch_file, is_gpg=False): with cd(git_folder): base_name = path_leaf(patch_file) - log(f"==> [magenta]{base_name}") + log(f"==> [m]{base_name}") # folder_name = base_name_split[2] # # base_name_split = base_name.split("_") diff --git a/broker/libs/eudat.py b/broker/libs/eudat.py index e0f73b2b..c2dd6396 100755 --- a/broker/libs/eudat.py +++ b/broker/libs/eudat.py @@ -130,9 +130,7 @@ def login(user, password_path: Path, fn: str) -> None: f = open(fn, "rb") config.oc = pickle.load(f) try: - status_str = ( - f"[bold]Login into owncloud from the dumped_object=[magenta]{fn}[/magenta] [yellow]...[/yellow]" - ) + status_str = f"[bold]Login into owncloud from the dumped_object=[m]{fn}[/m] [yellow]...[/yellow]" with cfg.console.status(status_str): config.oc.get_config() @@ -308,7 +306,7 @@ def _submit(provider, requester, job, required_confs=1): # provider_addr_to_submit = provider provider_addr_to_submit = job.search_best_provider(requester) provider_info = job.Ebb.get_provider_info(provider_addr_to_submit) - log(f"==> provider_fid=[magenta]{provider_info['f_id']}") + log(f"==> provider_fid=[m]{provider_info['f_id']}") _share_folders(provider_info, requester_name, folders_hash) # print(job.code_hashes) try: diff --git a/broker/libs/gdrive.py b/broker/libs/gdrive.py index 088f98ca..7e50f140 100755 --- a/broker/libs/gdrive.py +++ b/broker/libs/gdrive.py @@ -298,7 +298,7 @@ def size(key, mime_type, folder_name, gdrive_info, results_folder_prev, code_has try: output = get_file_id(key) - log(f"==> data_id=[magenta]{key}") + log(f"==> data_id=[m]{key}") log(output, "bold green") data_files_id = fetch_grive_output(output, "meta_data.json") if not data_files_id: diff --git a/broker/libs/ipfs.py b/broker/libs/ipfs.py index 271e73aa..04e09e80 100755 --- a/broker/libs/ipfs.py +++ b/broker/libs/ipfs.py @@ -168,7 +168,7 @@ def gpg_encrypt(self, from_gpg_fingerprint, recipient_gpg_fingerprint, target): for attempt in range(5): try: cmd = ["gpg", "--keyserver", "hkps://keyserver.ubuntu.com", "--recv-key", recipient_gpg_fingerprint] - log(f"{br(attempt)} cmd: [magenta]{' '.join(cmd)}", "bold") + log(f"{br(attempt)} cmd: [m]{' '.join(cmd)}", "bold") run(cmd, suppress_stderr=True) # this may not work if it is requested too much in short time break except Exception as e: @@ -189,7 +189,7 @@ def gpg_encrypt(self, from_gpg_fingerprint, recipient_gpg_fingerprint, target): encrypt_target, ] run(cmd) - log(f"==> gpg_file=[magenta]{encrypted_file_target}") + log(f"==> gpg_file=[m]{encrypted_file_target}") return encrypted_file_target except Exception as e: print_tb(e) diff --git a/broker/libs/mongodb.py b/broker/libs/mongodb.py index 99dc9cd0..a3b4751f 100755 --- a/broker/libs/mongodb.py +++ b/broker/libs/mongodb.py @@ -57,7 +57,7 @@ def add_item(self, job_key, index, source_code_hash_list, requester_id, timestam "requester_addr": job_info["job_owner"], "requester_id": requester_id, "source_code_hash": source_code_hash_list, - "received_block_number": job_info["received_block_number"], + "received_bn": job_info["received_bn"], "timestamp": timestamp, "cloudStorageID": cloud_storage_id, "storage_duration": job_info["storage_duration"], @@ -124,7 +124,7 @@ def add_item_share_id(self, key, share_id, share_token): def get_job_block_number(self, requester_addr, key, index) -> int: cursor = self.collection.find({"requester_addr": requester_addr.lower(), "job_key": key, "index": index}) for document in cursor: - return document["received_block_number"] + return document["received_bn"] return 0 diff --git a/broker/libs/user_setup.py b/broker/libs/user_setup.py index 7e6dae9e..db70791f 100755 --- a/broker/libs/user_setup.py +++ b/broker/libs/user_setup.py @@ -62,7 +62,7 @@ def set_folder_permission(path, user_name, slurm_user): def user_add(user_address, basedir, slurm_user): user_address = user_address.lower() - log(f"#> adding user=[magenta]{user_address}[/magenta]", end="") + log(f"#> adding user=[m]{user_address}[/m]", end="") try: # convert ethereum user address into 32-bits user_name = hashlib.md5(user_address.encode("utf-8")).hexdigest() log(f" | user_name={user_name}", "bold") @@ -92,7 +92,7 @@ def user_add(user_address, basedir, slurm_user): add_user_to_slurm(user_name) # force to add user to slurm mkdir(f"{user_dir}/cache") else: - log(f"## [magenta]{user_address}[/magenta] => [blue]{user_name}[/blue] has already been created") + log(f"## [m]{user_address}[/m] => [blue]{user_name}[/blue] has already been created") def main(): diff --git a/broker/link.py b/broker/link.py index 46e81ba8..f2528148 100755 --- a/broker/link.py +++ b/broker/link.py @@ -89,7 +89,7 @@ def link_folders(self, paths=None): log() folder_new_hash = generate_md5sum(dest) - assert folder_hash == folder_new_hash, "hash of original and linked folder does not match" + assert folder_hash == folder_new_hash, "hash of the original and the linked folder does not match" def check_link_folders(folders_to_share, registered_data_files, source_code_path, is_pass=False): @@ -114,7 +114,7 @@ def check_link_folders(folders_to_share, registered_data_files, source_code_path print("") if not is_pass: question_yes_no( - "#> Would you like to continue with linked folder path in your `[magenta]run.sh[/magenta]` file?\n" + "#> Would you like to continue with linked folder path in your `[m]run.sh[/m]` file?\n" "If no, please feel free to update your run.sh file and continue", is_exit=True, ) @@ -123,11 +123,11 @@ def check_link_folders(folders_to_share, registered_data_files, source_code_path def test_with_small_dataset(value): fn = os.path.expanduser("~/test_eblocbroker/run_cppr/run.sh") with open(fn, "r+") as file: - filedata = file.read() + file_data = file.read() - changed_filedata = filedata.replace("DATA_HASH='change_folder_hash'", f"DATA_HASH='{value}'") + changed_file_data = file_data.replace("DATA_HASH='change_folder_hash'", f"DATA_HASH='{value}'") with open(fn, "w+") as file: - file.write(changed_filedata) + file.write(changed_file_data) def check_linked_data(folders_target, folder_link, source_code_path="", is_pass=False): @@ -147,7 +147,7 @@ def check_linked_data(folders_target, folder_link, source_code_path="", is_pass= if not is_pass: print("") question_yes_no( - "#> Would you like to continue with linked folder path in your `[magenta]run.sh[/magenta]` file?\n" + "#> Would you like to continue with the linked folder path in your `[m]run.sh[/m]` file?\n" "If no, feel free to update your run.sh file and continue", is_exit=True, ) diff --git a/broker/python_scripts/apply_patch.py b/broker/python_scripts/apply_patch.py index 7f06b668..2cb44e6f 100755 --- a/broker/python_scripts/apply_patch.py +++ b/broker/python_scripts/apply_patch.py @@ -26,7 +26,7 @@ def appy_patch(base_dir, patch_fn): if patch_file.endswith(".diff.gz"): extract_gzip(patch_file) else: - log(f"==> [magenta]{diff_file_name}[/magenta] exists") + log(f"==> [m]{diff_file_name}[/m] exists") try: git.apply_patch(base_dir, patch_file.replace(".gz", ""), is_gpg=False) diff --git a/broker/python_scripts/get_transaction_log.py b/broker/python_scripts/get_transaction_log.py old mode 100644 new mode 100755 index 937815de..8e5fb975 --- a/broker/python_scripts/get_transaction_log.py +++ b/broker/python_scripts/get_transaction_log.py @@ -7,7 +7,8 @@ from broker import cfg from broker.utils import log -if __name__ == "__main__": + +def main(): if len(sys.argv) == 2: tx_hash = str(sys.argv[1]) event = "LogJob" @@ -25,3 +26,7 @@ processed_logs = cfg.Ebb.eBlocBroker.events.LogReceipt().processReceipt(tx_receipt, errors=DISCARD) log(vars(processed_logs[0].args)) log("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-") + + +if __name__ == "__main__": + main() diff --git a/broker/start_code.py b/broker/start_code.py index 7610c271..3c47601a 100755 --- a/broker/start_code.py +++ b/broker/start_code.py @@ -29,7 +29,7 @@ def start_call(job_key, index, slurm_job_id) -> None: Ebb.mongo_broker.set_job_state_pid(str(job_key), int(index), pid) _log.ll.LOG_FILENAME = env.LOG_PATH / "transactions" / env.PROVIDER_ID.lower() / f"{job_key}_{index}.txt" # _log.ll.IS_PRINT = False - log(f"~/ebloc-broker/broker/start_code.py {job_key} {index} {slurm_job_id}", "bold magenta") + log(f"~/ebloc-broker/broker/start_code.py {job_key} {index} {slurm_job_id}", "info") job_id = 0 # TODO: should be obtained from the user's input _, _, error = popen_communicate(["scontrol", "show", "job", slurm_job_id]) if "slurm_load_jobs error: Invalid job id specified" in str(error): diff --git a/broker/test_setup/README.org b/broker/test_setup/README.org index b3d8f521..d8bd6192 100644 --- a/broker/test_setup/README.org +++ b/broker/test_setup/README.org @@ -2,11 +2,10 @@ ** gdrive - - *For providers:* Download from ~My Driver~ as folders. -:IMPORTANT: DO NOT download from ~Storage~ section, it download ~meta_data.json~ separately. +:IMPORTANT: DO NOT download from ~Storage~ section, it download ~meta_data.json~ separately -- *For requesters:* Download from ~Storage~ zipped patches. +- *For requesters:* Download from ~Storage~ zipped patches ---------------------------------------------------------------------------- diff --git a/broker/test_setup/check_list.org b/broker/test_setup/check_list.org index 366935c5..ca855695 100644 --- a/broker/test_setup/check_list.org +++ b/broker/test_setup/check_list.org @@ -2,20 +2,20 @@ * TASKS -- [X] Download test_eblocbroker -- [X] contract transactions +- [X] download ~test_eblocbroker~ +- [X] contract's transactions from bloxberg | tasks | req | goo0 | goo1 | goo2 | goo3 | |------------------+-----+------+------+------+------| -| tx_from_bloxberg | [X] | [X] | [X] | [X] | [X] | -| ebb.tar.gz | | | | | | -| mongo_fetch | | | | | | +| tx_from_bloxberg | [-] | [] | [] | [] | [] | +| ebb.tar.gz | [X] | [] | [] | [] | [] | +| mongo_fetch | [] | [] | [] | [] | [] | |------------------+-----+------+------+------+------| | patches | req | goo0 | goo1 | goo2 | goo3 | |----------+-----+------+------+------+------| | gdrive | [X] | [X] | [X] | [X] | [X] | -| eudat | [] | [] | [] | [] | [] | +| eudat | [X] | [-] | [-] | [-] | [-] | | ipfs | [] | [] | [] | [] | [] | | ipfs_gpg | [] | [] | [] | [] | [] | |----------+-----+------+------+------+------| diff --git a/broker/test_setup/datasets.org b/broker/test_setup/datasets.org index bbe23023..117f2144 100644 --- a/broker/test_setup/datasets.org +++ b/broker/test_setup/datasets.org @@ -16,7 +16,7 @@ ebe53bd498a9f6446cd77d9252a9847c LB07-bunny-med.tbz2 f82aa511f8631bfc9a82fe6fa30f4b52 LB07-bunny-sml.tbz2 761691119cedfb9836a78a08742b14cc liver.n6c100.tbz2 f93b9a9f63447e0e086322b8416d4a39 liver.n6c10.tbz2 - +bfc83d9f6d5c3d68ca09499190851e86 bone.n26c10.tbz2 #+end_src #+begin_src bash @@ -48,7 +48,6 @@ fe801973c5b22ef6861f2ea79dc1eb9c babyface.n26c10.tbz2 4613abc322e8f2fdeae9a5dd10f17540 BL06-gargoyle-lrg.tbz2 dd0fbccccf7a198681ab838c67b68fbf bone.n6c100.tbz2 45281dfec4618e5d20570812dea38760 bone.n6c10.tbz2 -fa64e96bcee96dbc480a1495bddbf53c LB07-bunny-lrg.tbz2 8f6faf6cfd245cae1b5feb11ae9eb3cf liver.n26c100.tbz2 1bfca57fe54bc46ba948023f754521d6 liver.n26c10.tbz2 #+end_src @@ -64,7 +63,6 @@ wget -bqc https://vision.cs.uwaterloo.ca/files/BL06-camel-lrg.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/BL06-gargoyle-lrg.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/bone.n6c100.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/bone.n6c10.tbz2 -wget -bqc https://vision.cs.uwaterloo.ca/files/LB07-bunny-lrg.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/liver.n26c100.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/liver.n26c10.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/babyface.n6c100.tbz2 @@ -83,8 +81,9 @@ wget -bqc https://vision.cs.uwaterloo.ca/files/LB07-bunny-sml.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/LB07-bunny-med.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/liver.n6c10.tbz2 wget -bqc https://vision.cs.uwaterloo.ca/files/liver.n6c100.tbz2 +wget -bqc https://vision.cs.uwaterloo.ca/files/bone.n26c10.tbz2 #+end_src #+begin_src bash -for i in `ls *.tbz2` ; do echo $i ; done +for i in `command ls *.tbz2` ; do tarx "$i" ; done #+end_src diff --git a/broker/test_setup/job_cppr.yaml b/broker/test_setup/job_cppr.yaml index 81008fbc..1ff51e03 100644 --- a/broker/test_setup/job_cppr.yaml +++ b/broker/test_setup/job_cppr.yaml @@ -1,6 +1,6 @@ config: - requester_address: '0x24056c57e3b0933d2fa7d83fb9667d0efdfae64d' - provider_address: '0x29e613b04125c16db3f3613563bfdd0ba24cb629' + requester_address: '0x8b7356a95c8ba1846eb963fd127741730f666ba8' + provider_address: '0x51e2b36469cdbf58863db70cc38652da84d20c67' source_code: storage_id: eudat cache_type: public @@ -8,12 +8,12 @@ config: storage_hours: 0 data: data1: - hash: 0d6c3288ef71d89fb93734972d4eb903 + hash: f71df9d36cd519d80a3302114779741d data2: - hash: 4613abc322e8f2fdeae9a5dd10f17540 + hash: 1bfca57fe54bc46ba948023f754521d6 data3: cache_type: public - path: /home/alper/test_eblocbroker/dataset_zip/small/KZ2-tsukuba + path: /home/alper/test_eblocbroker/dataset_zip/small/BL06-gargoyle-sml storage_hours: 1 storage_id: eudat data_transfer_out: 5 @@ -21,3 +21,4 @@ config: job1: cores: 1 run_time: 60 + provider_addr: '0x4934a70ba8c1c3acfa72e809118bdd9048563a24' diff --git a/broker/test_setup/job_nas.yaml b/broker/test_setup/job_nas.yaml index 2d571ecd..267ed983 100644 --- a/broker/test_setup/job_nas.yaml +++ b/broker/test_setup/job_nas.yaml @@ -1,8 +1,8 @@ config: - requester_address: '0x77c0b42b5c358ff7c97e268794b9ff6a278a0f1e' - provider_address: '0x29e613b04125c16db3f3613563bfdd0ba24cb629' + requester_address: '0xe2969f599cb904e9a808ec7218bc14fcfa346965' + provider_address: '0x4934a70ba8c1c3acfa72e809118bdd9048563a24' source_code: - storage_id: eudat + storage_id: ipfs_gpg cache_type: public path: ~/test_eblocbroker/NPB3.3-SER_source_code storage_hours: 0 @@ -11,3 +11,4 @@ config: job1: cores: 1 run_time: 60 + provider_addr: '0x1926b36af775e1312fdebcc46303ecae50d945af' diff --git a/broker/test_setup/prepare_data.sh b/broker/test_setup/prepare_data.sh index 84c50eb8..285afb05 100755 --- a/broker/test_setup/prepare_data.sh +++ b/broker/test_setup/prepare_data.sh @@ -9,7 +9,6 @@ rename_all() { mv -v BL06-gargoyle-lrg 4613abc322e8f2fdeae9a5dd10f17540 mv -v bone.n6c100 dd0fbccccf7a198681ab838c67b68fbf mv -v bone.n6c10 45281dfec4618e5d20570812dea38760 - mv -v LB07-bunny-lrg fa64e96bcee96dbc480a1495bddbf53c mv -v liver.n26c100 8f6faf6cfd245cae1b5feb11ae9eb3cf mv -v liver.n26c10 1bfca57fe54bc46ba948023f754521d6 mv -v babyface.n6c100 f1de03edab51f281815c3c1e5ecb88c6 @@ -28,6 +27,7 @@ rename_all() { mv -v LB07-bunny-sml f82aa511f8631bfc9a82fe6fa30f4b52 mv -v liver.n6c100 761691119cedfb9836a78a08742b14cc mv -v liver.n6c10 f93b9a9f63447e0e086322b8416d4a39 + mv -v bone.n26c10 bfc83d9f6d5c3d68ca09499190851e86 } extract () { diff --git a/broker/test_setup/register_data_files.py b/broker/test_setup/register_data_files.py index d561bf6a..74d8b97e 100755 --- a/broker/test_setup/register_data_files.py +++ b/broker/test_setup/register_data_files.py @@ -3,7 +3,9 @@ import time from contextlib import suppress -from broker._utils._log import log, ok +from broker._utils._log import log +from broker.config import env +from broker.eblocbroker_scripts.get_data_price import get_data_price from broker.eblocbroker_scripts.register_data import _register_data hashes_small = [ @@ -36,28 +38,36 @@ "779745f315060d1bc0cd44b7266fb4da", # B "dd0fbccccf7a198681ab838c67b68fbf", # C "45281dfec4618e5d20570812dea38760", # C - "fa64e96bcee96dbc480a1495bddbf53c", # C + "bfc83d9f6d5c3d68ca09499190851e86", # C "8f6faf6cfd245cae1b5feb11ae9eb3cf", # D "1bfca57fe54bc46ba948023f754521d6", # D "f71df9d36cd519d80a3302114779741d", # D ] +def print_prices(hashes): + for code_hash in hashes: + (price, _commitment_dur) = get_data_price(env.PROVIDER_ID, code_hash, is_verbose=False) + log(f"{code_hash}={price}") + + def register_data_files(data_price, hashes): - commitment_dur = 600 + log(f"#> registering data {len(hashes)} files") for code_hash in hashes: with suppress(Exception): - _register_data(code_hash, data_price, commitment_dur) + _register_data(code_hash, data_price, commitment_dur=600) time.sleep(1) - log() - log(f"#> registering data {len(hashes_small)} files{ok()}") - def main(): # register_data_files(data_price=1, hashes=hashes_small) register_data_files(data_price=20, hashes=hashes_medium_1) + log() register_data_files(data_price=30, hashes=hashes_medium_2) + log() + print_prices(hashes_medium_1) + log() + print_prices(hashes_medium_2) if __name__ == "__main__": diff --git a/broker/test_setup/submit_jobs.py b/broker/test_setup/submit_jobs.py index 28c84444..08d69e5d 100755 --- a/broker/test_setup/submit_jobs.py +++ b/broker/test_setup/submit_jobs.py @@ -22,7 +22,6 @@ from broker.test_setup.user_set import providers, requesters from broker.utils import print_tb -# yaml_files = ["job_nas.yaml"] Ebb = cfg.Ebb cfg.IS_FULL_TEST = True is_mini_test = True @@ -32,12 +31,11 @@ _log.ll.LOG_FILENAME = Path.home() / ".ebloc-broker" / "test.log" benchmarks = ["nas", "cppr"] -benchmarks = ["cppr"] storage_ids = ["eudat", "gdrive", "ipfs"] ipfs_ids = ["ipfs", "ipfs_gpg"] -# for provider_address in providers: -# mini_tests_submit(storage_ids, provider_address) +# for provider_addr in providers: +# mini_tests_submit(storage_ids, provider_addr) # if is_mini_test: # benchmarks = ["cppr"] @@ -98,7 +96,7 @@ def create_cppr_job_script(idx): registered_data_hashes_medium[2] = [ "dd0fbccccf7a198681ab838c67b68fbf", # C "45281dfec4618e5d20570812dea38760", # C - "fa64e96bcee96dbc480a1495bddbf53c", # C + "bfc83d9f6d5c3d68ca09499190851e86", # C ] registered_data_hashes_medium[3] = [ "8f6faf6cfd245cae1b5feb11ae9eb3cf", # D @@ -189,12 +187,12 @@ def create_nas_job_script(is_small=False): return benchmark_name -def mini_tests_submit(storage_ids, provider_address): +def mini_tests_submit(storage_ids, provider_addr): is_pass = True required_confs = 0 yaml_fn = Path.home() / "ebloc-broker" / "broker" / "test_setup" / "job_nas.yaml" yaml_cfg = Yaml(yaml_fn) - yaml_cfg["config"]["provider_address"] = provider_address + yaml_cfg["config"]["provider_address"] = provider_addr for storage_id in storage_ids: yaml_cfg["config"]["source_code"]["storage_id"] = storage_id benchmark_name = create_nas_job_script(is_small=True) @@ -208,7 +206,7 @@ def mini_tests_submit(storage_ids, provider_address): if processed_logs: job_result = vars(processed_logs[0].args) job_result["tx_hash"] = tx_hash - job_result["submitted_job_kind"] = f"nas_{benchmark_name}" + job_result["job_kind"] = f"nas_{benchmark_name}" log(job_result) except IndexError: log(f"E: Tx({tx_hash}) is reverted") @@ -219,7 +217,7 @@ def run_job(counter) -> None: :param counter: counter index to keep track of submitted job number """ - for idx, provider_address in enumerate(providers): + for idx, provider_addr in enumerate(providers): # yaml_cfg["config"]["data"]["data3"]["storage_id"] = random.choice(storage_ids) storage_id = (idx + counter) % len(storage_ids) selected_benchmark = random.choice(benchmarks) @@ -228,12 +226,13 @@ def run_job(counter) -> None: storage = random.choice(ipfs_ids) if selected_benchmark == "nas": - log(f" * Submitting job from [cyan]NAS Benchmark[/cyan] to [green]{provider_address}", "bold blue") + log(f" * Submitting job from [cyan]NAS Benchmark[/cyan] to [green]{provider_addr}", "bold blue") yaml_cfg = Yaml(nas_yaml_fn) benchmark_name = create_nas_job_script() elif selected_benchmark == "cppr": - log(f" * Submitting [cyan]job with cppr datasets[/cyan] to provider=[green]{provider_address}", "bold blue") + log(f" * Submitting [cyan]job with cppr datasets[/cyan] to provider=[green]{provider_addr}", "bold blue") yaml_cfg = Yaml(cppr_yam_fn) + log(f"data_set_idx={idx}") hash_medium_data_0, hash_medium_data = create_cppr_job_script(idx) yaml_cfg["config"]["data"]["data1"]["hash"] = hash_medium_data_0 yaml_cfg["config"]["data"]["data2"]["hash"] = hash_medium_data @@ -244,7 +243,7 @@ def run_job(counter) -> None: yaml_cfg["config"]["data"]["data3"]["path"] = str(small_datasets / dir_name) yaml_cfg["config"]["source_code"]["storage_id"] = storage - yaml_cfg["config"]["provider_address"] = provider_address + yaml_cfg["config"]["provider_address"] = provider_addr try: submit_base = SubmitBase(yaml_cfg.path) submission_date = _date() @@ -262,9 +261,9 @@ def run_job(counter) -> None: job_result["submit_timestamp"] = submission_timestamp job_result["tx_hash"] = tx_hash if selected_benchmark == "nas": - job_result["submitted_job_kind"] = f"{selected_benchmark}_{benchmark_name}" + job_result["job_kind"] = f"{selected_benchmark}_{benchmark_name}" elif selected_benchmark == "cppr": - job_result["submitted_job_kind"] = f"{selected_benchmark}_{hash_medium_data_0}_{hash_medium_data}" + job_result["job_kind"] = f"{selected_benchmark}_{hash_medium_data_0}_{hash_medium_data}" ebb_mongo.add_item(tx_hash, job_result) log(job_result) @@ -272,7 +271,6 @@ def run_job(counter) -> None: countdown(seconds=5, is_verbose=True) except Exception as e: print_tb(e) - breakpoint() # DEBUG def main(): @@ -290,9 +288,11 @@ def main(): run_job(counter) counter += 1 - sleep_duration = randint(200, 400) + sleep_duration = randint(250, 450) countdown(sleep_duration) + log(f"#> number_of_submitted_jobs={counter}") + if __name__ == "__main__": try: diff --git a/broker/test_setup/users.py b/broker/test_setup/users.py index 2c6073e0..f66fa6b4 100755 --- a/broker/test_setup/users.py +++ b/broker/test_setup/users.py @@ -15,7 +15,7 @@ _collect_account = collect_account.replace("0x", "") fn = str(Path(expanduser("~/.brownie/accounts")) / _collect_account) _collect_account = Ebb.brownie_load_account(fn, "alper") -log(f"collect_account={Ebb._get_balance(collect_account)}", "bold") +log(f"## collect_account={Ebb._get_balance(collect_account)}") def balances(accounts, is_verbose=False): @@ -27,7 +27,7 @@ def balances(accounts, is_verbose=False): print(fn) account = Ebb.brownie_load_account(str(fn), "alper") - log(Ebb._get_balance(account), "magenta") + log(Ebb._get_balance(account), "m") def collect_all_into_base(): diff --git a/broker/test_setup/watch_tests.sh b/broker/test_setup/watch_tests.sh index 7c895221..569730d0 100755 --- a/broker/test_setup/watch_tests.sh +++ b/broker/test_setup/watch_tests.sh @@ -1,22 +1,19 @@ #!/bin/bash -VENV=$HOME/venv -source $VENV/bin/activate -num=$(ps aux | grep -E "[w]atch.py" | grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l) provider_1="0x29e613b04125c16db3f3613563bfdd0ba24cb629" provider_2="0x1926b36af775e1312fdebcc46303ecae50d945af" provider_3="0x4934a70ba8c1c3acfa72e809118bdd9048563a24" provider_4="0x51e2b36469cdbf58863db70cc38652da84d20c67" +num=$(ps aux | grep -E "[w]atch.py" | grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l) if [ $num -ge 1 ]; then - echo "warning: watch.py is already running, count="$num + echo "warning: `watch.py` is already running" else - rm -f ~/.ebloc-broker/watch_*.out - ~/ebloc-broker/broker/eblocbroker_scripts/watch.py $provider_1 >/dev/null & - ~/ebloc-broker/broker/eblocbroker_scripts/watch.py $provider_2 >/dev/null & - ~/ebloc-broker/broker/eblocbroker_scripts/watch.py $provider_3 >/dev/null & - ~/ebloc-broker/broker/eblocbroker_scripts/watch.py $provider_4 >/dev/null & + rm -f ~/.ebloc-broker/watch.out ~/.ebloc-broker/watch_*.out + ~/ebloc-broker/broker/watch/watch.py $provider_1 >/dev/null & + ~/ebloc-broker/broker/watch/watch.py $provider_2 >/dev/null & + ~/ebloc-broker/broker/watch/watch.py $provider_3 >/dev/null & + ~/ebloc-broker/broker/watch/watch.py $provider_4 >/dev/null & fi - watch --color head -n 15 \ ~/.ebloc-broker/watch_$provider_1.out \ ~/.ebloc-broker/watch_$provider_2.out \ diff --git a/broker/utils.py b/broker/utils.py index ca329bd5..5368eeac 100755 --- a/broker/utils.py +++ b/broker/utils.py @@ -258,6 +258,9 @@ def string_to_bytes32(hash_str: str): def bytes32_to_ipfs(bytes_array): """Convert bytes_array into IPFS hash format.""" + if bytes_array in (b"", ""): + return "" + if isinstance(bytes_array, bytes): merge = Qm + bytes_array return base58.b58encode(merge).decode("utf-8") @@ -337,7 +340,7 @@ def is_gzip_file_empty(fn): if bool(int(size)): return False - log(f"==> Created gzip file is empty:\n [magenta]{fn}[/magenta]") + log(f"==> Created gzip file is empty:\n [m]{fn}[/m]") return True @@ -514,7 +517,7 @@ def question_yes_no(message, is_exit=False): if "[Y/n]:" not in message: message = f"{message} [Y/n]: " - log(text=message, end="", flush=True) + log(text=message, end="") getch = _Getch() while True: choice = getch().lower() diff --git a/broker/eblocbroker_scripts/watch.py b/broker/watch/watch.py similarity index 84% rename from broker/eblocbroker_scripts/watch.py rename to broker/watch/watch.py index 55e1bb6d..9085440c 100755 --- a/broker/eblocbroker_scripts/watch.py +++ b/broker/watch/watch.py @@ -34,7 +34,10 @@ def get_eth_address_from_cfg(): def watch(eth_address="", from_block=None): - from_block = 15394725 + if not from_block: + from_block = Ebb.get_block_number() - cfg.ONE_DAY_BLOCK_DURATION + + from_block = 15867616 if not eth_address: try: eth_address = get_eth_address_from_cfg() @@ -42,9 +45,6 @@ def watch(eth_address="", from_block=None): log(f"E: {e}\neth_address is empty, run as: ./watch.py ") sys.exit(1) - if not from_block: - from_block = Ebb.get_block_number() - cfg.ONE_DAY_BLOCK_DURATION - is_provider = True watch_fn = Path.home() / ".ebloc-broker" / f"watch_{eth_address}.out" _log.ll.LOG_FILENAME = watch_fn @@ -74,6 +74,7 @@ def watch(eth_address="", from_block=None): columns = 80 columns_size = int(int(columns) / 2 - 9) + header = f" [bold yellow]{'{:<44}'.format('KEY')} INDEX STATUS[/bold yellow]" job_full = "" job_count = 0 completed_count = 0 @@ -94,23 +95,23 @@ def watch(eth_address="", from_block=None): is_print=False, ) state_val = state.inv_code[_job["stateCode"]] - _color = "magenta" + c = "magenta" if state_val == "COMPLETED": - _color = "green" + c = "green" completed_count += 1 job_full = ( - f" [bold blue]*[/bold blue] [bold]{_job['job_key']}[/bold] {_job['index']} {_job['provider']} " - f"[bold {_color}]{state_val}[/bold {_color}]\n{job_full}" + f" [bold blue]*[/bold blue] [bold white]{'{:<48}'.format(_job['job_key'])}[/bold white] " + f"{_job['index']} [bold {c}]{state_val}[/bold {c}]\n{job_full}" ) if not watch_only_jobs: job_ruler = ( "[green]" + "=" * columns_size + "[bold cyan] jobs [/bold cyan]" + "=" * columns_size + "[/green]" ) - job_full = f"{job_ruler}\n{job_full}".rstrip() + job_full = f"{job_ruler}\n{header}\n{job_full}".rstrip() else: - job_full = job_full.rstrip() + job_full = f"{header}\n{job_full}".rstrip() is_connected = Ebb.is_web3_connected() _console_clear() @@ -122,7 +123,7 @@ def watch(eth_address="", from_block=None): if not watch_only_jobs: providers = Ebb.get_providers() columns_size = int(int(columns) / 2 - 12) - log("\r" + "=" * columns_size + "[bold cyan] providers [/bold cyan]" + "=" * columns_size, "green") + log("\r" + "=" * columns_size + "[bold] providers [/bold]" + "=" * columns_size, "green") for k, v in providers_info.items(): log(f"** provider_address={k}", end="\r") log(v, end="\r") @@ -132,13 +133,17 @@ def watch(eth_address="", from_block=None): time.sleep(2) +def main(): + eth_address = None + if len(sys.argv) == 2: + eth_address = sys.argv[1] + + watch(eth_address) + + if __name__ == "__main__": try: - eth_address = None - if len(sys.argv) == 2: - eth_address = sys.argv[1] - - watch(eth_address) + main() except KeyboardInterrupt: sys.exit(1) except Exception as e: diff --git a/broker/eblocbroker_scripts/watch.sh b/broker/watch/watch.sh similarity index 61% rename from broker/eblocbroker_scripts/watch.sh rename to broker/watch/watch.sh index b1e5f728..3692e8b9 100755 --- a/broker/eblocbroker_scripts/watch.sh +++ b/broker/watch/watch.sh @@ -1,14 +1,11 @@ #!/bin/bash -VENV=$HOME/venv -source $VENV/bin/activate -num=$(ps aux | grep -E "[w]atch.py" | grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l) address="0x378181ce7b07e8dd749c6f42772574441b20e35f" +num=$(ps aux | grep -E "[w]atch.py" | grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l) if [ $num -ge 1 ]; then - echo "warning: watch.py is already running, count="$num + echo "warning: `watch.py` is already running" else - rm -f ~/.ebloc-broker/watch.out - rm -f ~/.ebloc-broker/watch_*.out + rm -f ~/.ebloc-broker/watch.out ~/.ebloc-broker/watch_*.out ./watch.py $address >/dev/null & fi watch --color cat ~/.ebloc-broker/watch_$address.out diff --git a/broker/eblocbroker_scripts/watch_jobs.py b/broker/watch/watch_jobs.py similarity index 87% rename from broker/eblocbroker_scripts/watch_jobs.py rename to broker/watch/watch_jobs.py index a2f11c2f..69ba7ac9 100755 --- a/broker/eblocbroker_scripts/watch_jobs.py +++ b/broker/watch/watch_jobs.py @@ -14,7 +14,7 @@ def watch(eth_address="", from_block=None): - from_block = 15394725 + from_block = 15867616 # if not eth_address: # # TODO: pull from cfg # eth_address = "0xeab50158e8e51de21616307a99c9604c1c453a02" @@ -61,21 +61,24 @@ def watch(eth_address="", from_block=None): is_print=False, ) if print_only_ipfs_result_hashes: - if _job["result_ipfs_hash"] != empty_bytes32 and _job["result_ipfs_hash"] != "": - result_ipfs_hash = bytes32_to_ipfs(_job["result_ipfs_hash"]) - log(result_ipfs_hash) + if _job["result_ipfs_hash"] != empty_bytes32 and _job["result_ipfs_hash"] not in (b"", ""): + log(bytes32_to_ipfs(_job["result_ipfs_hash"])) # log(f"{_job['job_key']} {_job['index']} {result_ipfs_hash}") else: log(_job) +def main(): + eth_address = None + if len(sys.argv) == 2: + eth_address = sys.argv[1] + + watch(eth_address) + + if __name__ == "__main__": try: - eth_address = None - if len(sys.argv) == 2: - eth_address = sys.argv[1] - - watch(eth_address) + main() except KeyboardInterrupt: sys.exit(1) except Exception as e: diff --git a/requirements.txt b/requirements.txt index 6c92e36e..138ae355 100755 --- a/requirements.txt +++ b/requirements.txt @@ -493,7 +493,7 @@ multipart==0.2.4 multitasking==0.0.9 # via # yfinance -mypy==0.942 +mypy==0.961 # via # flake8-mypy mypy-extensions==0.4.3 @@ -814,7 +814,7 @@ restrictedpython==5.0 # via # accesscontrol rfc3986==1.4.0 -rich==12.3.0 +rich==12.4.4 rlp==2.0.1 # via # eth-account diff --git a/scripts/setup.sh b/scripts/setup.sh index 379ef3f6..6be94621 100755 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -31,9 +31,7 @@ git pull --rebase -v # nodejs # ====== -output=$(node -v) -if [ "$output" == "" ];then - # curl -sL https://deb.nodesource.com/setup_14.x | sudo bash - +if [ "$(node -v)" == "" ];then curl -fsSL https://deb.nodesource.com/setup_17.x | sudo -E bash - sudo apt-get install -y nodejs node -v @@ -83,7 +81,7 @@ install_ipfs () { echo ipfs_current_version=v$ipfs_current_version fi cd /tmp - version="0.11.0" + version="0.13.0" echo "version_to_download=v"$version if [[ "$ipfs_current_version" == "$version" ]]; then echo "$GREEN##$NC Latest version is already downloaded"