Skip to content

Commit

Permalink
Improve watch scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
avatar-lavventura committed Apr 7, 2023
1 parent 95f2e63 commit 68ec85c
Show file tree
Hide file tree
Showing 19 changed files with 37 additions and 35 deletions.
2 changes: 1 addition & 1 deletion broker/Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def check_connection():
if not network.is_connected():
time.sleep(15)
else:
log(f"bloxberg connection{ok()}", is_write=False)
log(f"bloxberg connection {ok()}", is_write=False)


def _run_driver(given_bn, lock):
Expand Down
1 change: 1 addition & 0 deletions broker/_cli/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def register(self):
def authenticate_orc_id(self):
# FIXME: missing functionality
obj = self.subparsers.add_parser("auth_orc_id", help="Authenticate orcid")
obj.add_argument("eth_address", type=str, help="Ethereum address of the user")

def submit(self):
obj = self.subparsers.add_parser("submit", help="Submit job")
Expand Down
2 changes: 1 addition & 1 deletion broker/_daemons/ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def main():
cfg.ipfs.remove_lock_files()
_run()
else:
log(f"## [g]IPFS[/g] daemon is already running{ok()}")
log(f"## [g]IPFS[/g] daemon is already running {ok()}")
sys.exit(100)


Expand Down
2 changes: 1 addition & 1 deletion broker/_utils/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self, path, auto_dump=True):
self.changed = False
self.yaml = YAML()
self.yaml.indent(mapping=2, sequence=4, offset=2)
with FileLock(self.fp_lock, timeout=1):
with FileLock(self.fp_lock, timeout=2):
_remove(self.path_temp)
if self.auto_dump:
if self.path.exists():
Expand Down
3 changes: 2 additions & 1 deletion broker/_watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is_while = True # to fetch on-going results
is_provider = True

cfg.NETWORK_ID = "bloxberg_core"
is_csv = False
analyze_long_test = False

Expand Down Expand Up @@ -257,7 +258,7 @@ def main():
if len(sys.argv) == 2:
eth_address = sys.argv[1]

from_block = 19874011
from_block = 19926110
watch(eth_address, from_block)


Expand Down
13 changes: 5 additions & 8 deletions broker/_watch/watch_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,19 @@ providers=("0x29e613b04125c16db3f3613563bfdd0ba24cb629"

num=$(ps aux | grep -E "[w]atch.py" | grep -v -e "grep" -e "emacsclient" -e "flycheck_" | wc -l)
if [ $num -ge 1 ]; then
echo "warning: `watch.py` is already running"
echo -e "#> 'watch' process is already running"
else
rm -f ~/.ebloc-broker/watch.out ~/.ebloc-broker/watch_*.out \
~/ebloc-broker/broker/_watch/provider_*.txt
for i in "${!providers[@]}"; do
touch ~/.ebloc-broker/watch_${providers[$i]}.out
~/ebloc-broker/broker/_watch/watch.py "${providers[$i]}" >/dev/null &
sleep 1
done
fi

watch --color head -n 15 \
clear
watch --color head -n 16 \
~/.ebloc-broker/watch_${providers[0]}.out \
~/.ebloc-broker/watch_${providers[1]}.out \
~/.ebloc-broker/watch_${providers[2]}.out

: ' commented
cat ~/.ebloc-broker/watch_$provider_1.out
cat ~/.ebloc-broker/watch_$provider_2.out
cat ~/.ebloc-broker/watch_$provider_3.out
'
3 changes: 3 additions & 0 deletions broker/bash_scripts/clean_for_new_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,6 @@ if [[ -d $BASE ]]; then
cd /var/ebloc-broker && fdfind . | as-tree && cd ~
cd $CURRENT_DIR
fi

echo ""
cat ~/.brownie/network-config.yaml| grep bloxberg
2 changes: 1 addition & 1 deletion broker/cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
IS_SEARCH_BEST_PROVIDER_VERBOSE = True
TEST_PROVIDERS = None
TX_LOG_VERBOSE = False
NETWORK_ID = "bloxberg"
NETWORK_ID = "bloxberg" # "bloxberg_core"


class EBB:
Expand Down
10 changes: 5 additions & 5 deletions broker/drivers/b2drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ def _download_folder(self, results_folder_prev, folder_name):
"--show-progres",
"--progress=bar:force:noscroll",
]
log(" ".join(cmd), is_code=True, color="bold yellow")
log(" ".join(cmd), is_code=True, color="yellow")
run(cmd)
with cd(results_folder_prev):
run(["unzip", "-o", "-j", download_fn])

_remove(download_fn)
self.tar_downloaded_path[folder_name] = cached_tar_fn
log(f"## download file from B2DROP{ok()}")
log(f"## download file from B2DROP {ok()}")
return
except:
log("E: Failed to download B2DROP file via wget.\nTrying `config.oc.get_file()` approach...")
Expand Down Expand Up @@ -291,7 +291,7 @@ def get_share_token(self, f_id):
# TODO: if added before or some do nothing
if Ebb.mongo_broker.add_item_share_id(share_key, value["share_id"], value["share_token"]):
# adding into mongoDB for future usage
log(f"#> [g]{share_key}[/g] is added into mongoDB{ok()}")
log(f"#> [g]{share_key}[/g] is added into mongoDB {ok()}")
except Exception as e:
print_tb(e)
log(f"E: {e}")
Expand Down Expand Up @@ -370,7 +370,7 @@ def run(self) -> bool:
time.sleep(0.25)

def _run(self) -> bool:
log(f"{br(get_date())} new job has been received through B2DROP: {self.job_key} {self.index} ", "bold cyan")
log(f"{br(get_date())} new job has been received through B2DROP: {self.job_key} {self.index} ", "cyan")
# TODO: refund check
try:
provider_info = Ebb.get_provider_info(self.logged_job.args["provider"])
Expand Down Expand Up @@ -422,7 +422,7 @@ def _run(self) -> bool:
except KeyError:
try:
shared_id = Ebb.mongo_broker.find_shareid_item(key=share_key)
log(f"#> reading from mongo_broker{ok()}")
log(f"#> reading from mongo_broker {ok()}")
log(shared_id)
except Exception as e:
print_tb(e)
Expand Down
2 changes: 1 addition & 1 deletion broker/drivers/storage_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def is_run_exists_in_tar(self, tar_path) -> bool:
.strip()
)
if output.count("/") == 1:
log(f"[m]./run.sh[/m] exists under the parent folder{ok()}", "bold")
log(f"[m]./run.sh[/m] exists under the parent folder {ok()}", "bold")
return True
else:
log("E: run.sh does not exist under the parent folder")
Expand Down
3 changes: 0 additions & 3 deletions broker/eblocbroker_scripts/Contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ def _setup(self, is_brownie=False):
self.eBlocBroker, self.w3, self._eblocbroker = connect()
except Exception as e:
print_tb(e)
sys.exit(1)

ebb = None # contract object

def brownie_load_account(self, fn="", password="alper"):
"""Load accounts from Brownie for Bloxberg."""
Expand Down
5 changes: 2 additions & 3 deletions broker/eblocbroker_scripts/get_block_number.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python3

from broker.eblocbroker_scripts.utils import Cent
import sys

from broker import cfg
Expand All @@ -22,8 +21,8 @@ def main():
output = Ebb.get_block_number()
if is_write_to_file:
env.config["block_continue"] = output
balance_temp = Ebb.get_balance(env.PROVIDER_ID)
env.config["token_balance"] = int(balance_temp)
#: used in cleaning for a new test
env.config["token_balance"] = int(Ebb.get_balance(env.PROVIDER_ID))
else:
log(f"block_number={output}")
except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions broker/eblocbroker_scripts/log_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def log_loop(event_filter, poll_interval: int = 6):
with suppress(Exception):
bn = cfg.Ebb.get_block_number()

print()
sleep_duration = 0
while True:
# watch_bn() # this may cause timeout error over time
Expand Down
12 changes: 6 additions & 6 deletions broker/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ def connect_to_eblocbroker() -> None:

cfg.w3 = network.web3
except:
from broker.python_scripts import add_bloxberg_into_network_config
# from broker.python_scripts import add_bloxberg_into_network_config

add_bloxberg_into_network_config.main()
# add_bloxberg_into_network_config.main()
try:
log(
"warning: [green]bloxberg[/green] key is added into the "
"[m]~/.brownie/network-config.yaml[/m] file. Please try again."
)
# log(
# "warning: [green]bloxberg[/green] key is added into the "
# "[m]~/.brownie/network-config.yaml[/m] file. Please try again."
# )
network.connect(cfg.NETWORK_ID)
except KeyError:
sys.exit(1)
Expand Down
1 change: 0 additions & 1 deletion broker/libs/eudat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pickle
import shutil
import subprocess
import sys
import time
from contextlib import suppress
from pathlib import Path
Expand Down
4 changes: 4 additions & 0 deletions broker/libs/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pymongo import MongoClient
from rich.pretty import pprint

from broker._utils._log import log


Expand Down Expand Up @@ -149,6 +150,9 @@ def is_received(self, requester_addr, key, index, is_print=False) -> bool:


def main():
# from broker._utils import _log

# _log.IS_WRITE = False
mc = MongoClient()
ebb_mongo = MongoBroker(mc, mc["ebloc_broker"]["cache"])
parser = argparse.ArgumentParser(description="Process MongoDB.")
Expand Down
2 changes: 1 addition & 1 deletion broker/start_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def start_call(key, index, slurm_job_id) -> None:
p2.stdout.close() # type: ignore
date = p3.communicate()[0].decode("utf-8").strip()
start_ts = check_output(["date", "-d", date, "+'%s'"]).strip().decode("utf-8").strip("'")
log(f"{env.EBB_SCRIPTS}/set_job_state_running.py {key} {index} {job_id} {start_ts}", "bold white")
log(f"{env.EBB_SCRIPTS}/set_job_state_running.py {key} {index} {job_id} {start_ts}", is_code=True)
log(f"#> pid={pid}")
for attempt in range(10):
if attempt > 0:
Expand Down
2 changes: 1 addition & 1 deletion broker/test_setup/register_data_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def register_data_files(data_price, hashes):


def main():
## register_data_files(data_price=1, hashes=hashes_small)
# register_data_files(data_price=1, hashes=hashes_small)
# register_data_files(data_price=Cent("0.0002 usd"), hashes=hashes_medium_1)
# register_data_files(data_price=Cent("0.0003 usd"), hashes=hashes_medium_2)

Expand Down
2 changes: 1 addition & 1 deletion broker/test_setup/submit_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def main():

log(f"#> number_of_submitted_jobs={counter}")
except Exception as e:
print(str(e))
print_tb(e)


Expand All @@ -365,5 +364,6 @@ def main():
main()
except Exception as e:
print_tb(e)
log("end")
except KeyboardInterrupt:
sys.exit(1)

0 comments on commit 68ec85c

Please sign in to comment.