Skip to content

Commit

Permalink
Add minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
avatar-lavventura committed Apr 8, 2023
1 parent 68ec85c commit 7a2e26e
Show file tree
Hide file tree
Showing 31 changed files with 379 additions and 263 deletions.
68 changes: 68 additions & 0 deletions .depreciated/imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python3

from web3 import IPCProvider, Web3
from web3.middleware import geth_poa_middleware
from web3.providers.rpc import HTTPProvider

from broker.utils import is_geth_on, run, terminate


def _connect_to_web3() -> None:
"""Connect to web3 of the corresponding ethereum blockchain.
* bloxberg:
__ https://bloxberg.org
"""
if env.IS_GETH_TUNNEL or not env.IS_EBLOCPOA:
if env.IS_BLOXBERG:
# TODO you can use brownie's connected web3?
cfg.w3 = Web3(HTTPProvider("https://core.bloxberg.org"))
else:
cfg.w3 = Web3(HTTPProvider(f"http://localhost:{env.RPC_PORT}"))
else:
cfg.w3 = Web3(IPCProvider(env.DATADIR.joinpath("geth.ipc")))
#: inject the poa compatibility middleware to the innermost layer
cfg.w3.middleware_onion.inject(geth_poa_middleware, layer=0)


def connect_to_web3() -> None:
"""Connect to the private ethereum network using web3.
Note that you should create only one RPC Provider per process, as it
recycles underlying TCP/IP network connections between your process and
Ethereum node
"""
if cfg.w3.isConnected():
return

web3_ipc_fn = env.DATADIR.joinpath("geth.ipc")
for _ in range(5):
_connect_to_web3()
if not cfg.w3.isConnected():
try:
if env.IS_GETH_TUNNEL:
raise Exception("Web3ConnectError: try tunnelling: ssh -f -N -L 8545:localhost:8545 username@<ip>")

if env.IS_BLOXBERG:
log("E: web3 is not connected into [green]BLOXBERG[/green]")
else:
is_geth_on()
except QuietExit:
pass
except Exception as e:
print_tb(e)
sys.exit(1)

if not env.IS_GETH_TUNNEL and not env.IS_BLOXBERG:
log(
"E: If web3 is not connected please start geth server and give permission \n"
"to /private/geth.ipc file doing: ",
end="",
)
log(f"sudo chown $(logname) {web3_ipc_fn}", "green")
log(f"#> running `sudo chown $(whoami) {web3_ipc_fn}`")
run(["sudo", "chown", env.WHOAMI, web3_ipc_fn])
else:
break
else:
terminate(is_traceback=False)
5 changes: 3 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ repos:
entry: bash -c 'mypy "$@" || true' --

- repo: https://github.com/psf/black
rev: 22.3.0 # Replace by any tag/version: https://github.com/psf/black/tags
rev: 23.3.0 # Replace by any tag/version: https://github.com/psf/black/tags
hooks:
- id: black
name: black
exclude: ^.old_work/
exclude: ^.depreciated/

- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
exclude: ".depreciated/.*|.old_work/.*"

default_language_version:
python: python3
71 changes: 28 additions & 43 deletions broker/Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from typing import List

import zc.lockfile
from halo import Halo
from ipdb import launch_ipdb_on_exception

from broker import cfg, config
from broker._import import check_connection
from broker._utils import _log
from broker._utils._log import console_ruler, log, ok
from broker._utils.tools import is_process_on, kill_process_by_name, print_tb, squeue
from broker._utils._log import console_ruler, log
from broker._utils.tools import _date, is_process_on, kill_process_by_name, print_tb, squeue
from broker.config import env, setup_logger
from broker.drivers.b2drop import B2dropClass
from broker.drivers.gdrive import GdriveClass
Expand All @@ -31,6 +33,7 @@
from broker.lib import eblocbroker_function_call, pre_check, run_storage_thread, session_start_msg, state
from broker.libs import eudat, gdrive, slurm
from broker.libs.user_setup import give_rwe_access, user_add
from broker.python_scripts.add_bloxberg_into_network_config import read_network_config
from broker.utils import (
StorageID,
check_ubuntu_packages,
Expand Down Expand Up @@ -259,7 +262,9 @@ def process_logged_job(self, idx):
log("==> [yellow]job_info:", "bold")
log(self.job_info)
except Exception as e:
print_tb(e)
if "Max retries exceeded with url" not in str(e):
print_tb(e)

return

for job in range(1, len(self.job_info["core"])):
Expand Down Expand Up @@ -317,11 +322,21 @@ def process_logged_jobs(self):
except JobException as e:
log(str(e))
except Exception as e:
print_tb(e)
if "Max retries exceeded with url" not in str(e):
print_tb(e)

log(str(e))
# breakpoint() # DEBUG


def check_bn_progress(current_bn, bn_read):
while current_bn < int(bn_read):
current_bn = Ebb.get_block_number()
time.sleep(4)

return current_bn


def run_driver(given_bn):
"""Run the main driver script for eblocbroker on the background."""
# dummy sudo command to get the password when session starts for only to
Expand Down Expand Up @@ -430,15 +445,10 @@ def run_driver(given_bn):
log(f" * {get_date()} waiting new job to come since bn={bn_read}")
log(f"==> current_block={current_bn} | sync_from={bn_read} ", end="")

flag = True
while current_bn < int(bn_read):
current_bn = Ebb.get_block_number()
if flag:
log()
log(f"## Waiting block number to be updated, it remains constant at {current_bn}")

flag = False
time.sleep(2)
if current_bn < int(bn_read):
msg = f"warning: waiting block number to be updated, it remains constant at {current_bn} "
with Halo(text=msg, text_color="yellow", spinner="line", placement="right"):
current_bn = check_bn_progress(current_bn, bn_read)

bn_read = str(bn_read) # reading events' block number has been updated
if not first_iteration_flag:
Expand All @@ -458,9 +468,8 @@ def run_driver(given_bn):
bn_read = env.config["block_continue"] = current_bn
except Exception as e:
if "Filter not found" in str(e) or "Read timed out" in str(e):
# HTTPSConnectionPool(host='core.bloxberg.org', port=443): Read timed out. (read timeout=10)
try:
log()
# HTTPSConnectionPool(host='core.bloxberg.org', port=443): Read timed out. (read timeout=10)
nc(env.BLOXBERG_HOST, 8545)
except Exception:
log(f"E: Failed to make TCP connecton to {env.BLOXBERG_HOST}")
Expand All @@ -473,32 +482,6 @@ def run_driver(given_bn):
raise e


def reconnect():
log(f"E: {network.show_active()} is not connected through {env.BLOXBERG_HOST}")
if cfg.NETWORK_ID == "bloxberg":
cfg.NETWORK_ID = "bloxberg_core"
elif cfg.NETWORK_ID == "bloxberg_core":
with suppress(Exception):
nc("alpy-bloxberg.duckdns.org", 8545)
cfg.NETWORK_ID = "bloxberg"

log(f"Trying at {cfg.NETWORK_ID} ...")
network.connect(cfg.NETWORK_ID)


def check_connection():
if not network.is_connected():
try:
reconnect()
except Exception as e:
log(f"E: {e}")

if not network.is_connected():
time.sleep(15)
else:
log(f"bloxberg connection {ok()}", is_write=False)


def _run_driver(given_bn, lock):
config.logging = setup_logger(_log.DRIVER_LOG)
while True:
Expand All @@ -517,7 +500,7 @@ def _run_driver(given_bn, lock):
print_tb(e)

check_connection()
console_ruler(character="*")
log(f"#> -=-=-=-=-=-=-=-=-=- [g]RESTARTING[/g] {_date()} -=-=-=-=-=-=-=-=-=- [blue]<#", is_write=False)
continue
finally:
with suppress(Exception):
Expand Down Expand Up @@ -548,7 +531,9 @@ def main(args):

log(f"rootdir: {os.getcwd()}", h=False)
log(f"logfile: {_log.DRIVER_LOG}", h=False)
log(f"Attached to host RPC client listening at '{env.BLOXBERG_HOST}'", h=False)
bloxberg_host = read_network_config(cfg.NETWORK_ID)
log(f"Attached to host RPC client listening at '{bloxberg_host}'", h=False)

print()
is_driver_on(process_count=1, is_print=False)
try:
Expand Down
7 changes: 0 additions & 7 deletions broker/_daemons/ganache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ def main():
if len(sys.argv) == 2:
port = int(sys.argv[1])

# try:
# npm_package = "ganache-cli"
# if not is_npm_installed(npm_package):
# log(f"E: {npm_package} is not installed within npm")
# sys.exit()
# except Exception as e:
# print_tb(e)
if not is_ganache_on(8547):
run(port)

Expand Down
38 changes: 38 additions & 0 deletions broker/_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3

import time
from contextlib import suppress

from broker import cfg
from broker._utils._log import ok
from broker.config import env
from broker.imports import nc
from broker.utils import log
from brownie import network


def reconnect():
log(f"E: {network.show_active()} is not connected through {env.BLOXBERG_HOST}")
if cfg.NETWORK_ID == "bloxberg":
cfg.NETWORK_ID = "bloxberg_core"
elif cfg.NETWORK_ID == "bloxberg_core":
with suppress(Exception):
nc("alpy-bloxberg.duckdns.org", 8545)
cfg.NETWORK_ID = "bloxberg"

log(f"Trying at {cfg.NETWORK_ID}... ", end="")
network.connect(cfg.NETWORK_ID)
log(ok())


def check_connection():
if not network.is_connected():
try:
reconnect()
except Exception as e:
log(f"E: {e}")

if not network.is_connected():
time.sleep(15)
else:
log(f"bloxberg connection through [g]{cfg.NETWORK_ID}[/g] {ok()}", is_write=False)
2 changes: 1 addition & 1 deletion broker/_utils/_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def log(
text = base_str.join(textwrap.wrap(text, 120, break_long_words=False, break_on_hyphens=False))

if is_wrap:
text = "\n".join(textwrap.wrap(text, 80, break_long_words=False, break_on_hyphens=False))
text = "\n".join(textwrap.wrap(text, 120, break_long_words=False, break_on_hyphens=False))

if is_write and IS_WRITE:
if threading.current_thread().name != "MainThread" and cfg.IS_THREADING_ENABLED:
Expand Down
1 change: 1 addition & 0 deletions broker/_utils/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def PrintException() -> str:

def print_tb(message=None, is_print_exc=True) -> None:
"""Log the traceback."""
log(f"{WHERE()} ", end="")
if message:
if isinstance(message, QuietExit):
if str(message):
Expand Down
2 changes: 1 addition & 1 deletion broker/_watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def main():
if len(sys.argv) == 2:
eth_address = sys.argv[1]

from_block = 19926110
from_block = 19954408
watch(eth_address, from_block)


Expand Down
3 changes: 1 addition & 2 deletions broker/bash_scripts/clean_for_new_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clean_gdrive () {

# update block.continue.txt with the current block number
python3 -uB $HOME/ebloc-broker/broker/eblocbroker_scripts/get_block_number.py True
squeue | tail -n+2 | awk '{print $1}' | xargs scancel 2> /dev/null
# timeout 2 squeue | tail -n+2 | awk '{print $1}' | xargs scancel 2> /dev/null

# remove created users users
for user in $(members eblocbroker | tr " " "\n"); do
Expand Down Expand Up @@ -79,7 +79,6 @@ rm -f /tmp/run/driver_popen.pid >/dev/null 2>&1
rm -f ~/.ebloc-broker/.oc_client.pckl
rm -f /var/ebloc-broker/cache/*.tar.gz


# unpin and remove all IPFS content from my machine
# ipfs pin ls --type recursive | cut -d' ' -f1 | ifne xargs -n1 ipfs pin rm
# ipfs repo gc
Expand Down
4 changes: 2 additions & 2 deletions broker/bash_scripts/killall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ done

killall python 2> /dev/null
killall python3 2> /dev/null
echo "## killall all jobs in squeue"
squeue

timeout 2 squeue && echo "## killall all jobs in squeue"
if [ $? -eq 0 ]; then
squeue | tail -n+2 | awk '{print $1}' | ifne xargs scancel 2> /dev/null
fi
Expand Down
3 changes: 1 addition & 2 deletions broker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ def __init__(self) -> None:
self.GDRIVE = self.cfg["gdrive"]
self.DATADIR = Path(self.cfg["datadir"])
self.LOG_DIR = Path(self.cfg["log_path"])

try:
self.BLOXBERG_HOST = read_network_config(Path.home() / ".brownie" / "network-config.yaml")
self.BLOXBERG_HOST = read_network_config(cfg.NETWORK_ID)
except:
self.BLOXBERG_HOST = "https://core.bloxberg.org"

Expand Down
Loading

0 comments on commit 7a2e26e

Please sign in to comment.