diff --git a/propel_client/cli.py b/propel_client/cli.py index c9639bf..a0f6918 100644 --- a/propel_client/cli.py +++ b/propel_client/cli.py @@ -17,12 +17,13 @@ # # ------------------------------------------------------------------------------ """CLI implementation.""" -import os import json -from pathlib import Path +import os import time +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from functools import wraps +from pathlib import Path from sys import stdin from typing import Any, Callable, Dict, Optional @@ -484,7 +485,10 @@ def agents_get(obj: ClickAPPObject, name_or_id: str) -> None: @click.argument("name_or_id", type=str, required=True) @click.argument("state", type=str, required=True) @click.option("--timeout", type=int, required=False, default=120) -def agents_wait(obj: ClickAPPObject, name_or_id: str, state: str, timeout: int) -> None: +@click.option("--period", type=int, required=False, default=10) +def agents_wait( + obj: ClickAPPObject, name_or_id: str, state: str, timeout: int, period: int +) -> None: """ Wait agent command. @@ -492,14 +496,19 @@ def agents_wait(obj: ClickAPPObject, name_or_id: str, state: str, timeout: int) :param name_or_id: str :param state: str :param timeout: int + :param period: int in seconds """ try: for cur_state in obj.propel_client.agents_wait_for_state_iter( - agent_name_or_id=name_or_id, state=state, timeout=timeout + agent_name_or_id=name_or_id, state=state, timeout=timeout, period=period ): - print("STATE:", cur_state) + click.echo( + f"[Agent: {name_or_id}] state: {cur_state}, waiting for {state} for next {period} seconds" + ) except TimeoutError as e: - raise click.ClickException(f"Timeout during wait for state: {state}") from e + raise click.ClickException( + f"[Agent: {name_or_id}] Timeout during wait for state: {state}" + ) from e @click.command(name="ensure-deleted") @@ -516,7 +525,7 @@ def agents_ensure_deleted(obj: ClickAPPObject, name_or_id: str, timeout: int) -> :param obj: ClickAPPObject """ if _is_deleted(obj.propel_client, name_or_id): - print("already deleted") + click.echo(f"[Agent: {name_or_id}] already deleted") return obj.propel_client.agents_stop(name_or_id) @@ -530,10 +539,12 @@ def agents_ensure_deleted(obj: ClickAPPObject, name_or_id: str, timeout: int) -> break if (time.time() - started) < timeout: - raise click.ClickException("timeout!") + raise click.ClickException( + f"[Agent: {name_or_id}] timeout waiting for deleted!" + ) time.sleep(3) - click.echo("Agent was deleted") + click.echo(f"[Agent: {name_or_id}] Agent was deleted") def _is_deleted(client: PropelClient, name_or_id: str) -> bool: @@ -565,6 +576,7 @@ def agents_restart(obj: ClickAPPObject, name_or_id: str) -> None: :param obj: ClickAPPObject """ agent = obj.propel_client.agents_restart(name_or_id) + click.echo(f"[Agent: {name_or_id}] restart triggered.") print_json(agent) @@ -579,6 +591,7 @@ def agents_stop(obj: ClickAPPObject, name_or_id: str) -> None: :param obj: ClickAPPObject """ agent = obj.propel_client.agents_stop(name_or_id) + click.echo(f"[Agent: {name_or_id}] stop triggered.") print_json(agent) @@ -596,6 +609,7 @@ def agents_variables_add(obj: ClickAPPObject, name_or_id: str, variables: str) - """ variables_list = variables.split(",") or [] if variables else [] agent = obj.propel_client.agents_variables_add(name_or_id, variables_list) + click.echo(f"[Agent: {name_or_id}] variables added {variables_list}.") print_json(agent) @@ -615,6 +629,7 @@ def agents_variables_remove( """ variables_list = variables.split(",") or [] if variables else [] agent = obj.propel_client.agents_variables_remove(name_or_id, variables_list) + click.echo(f"[Agent: {name_or_id}] variables removed {variables_list}.") print_json(agent) @@ -629,6 +644,7 @@ def agents_delete(obj: ClickAPPObject, name_or_id: str) -> None: :param obj: ClickAPPObject """ agent = obj.propel_client.agents_delete(name_or_id) + click.echo(f"[Agent: {name_or_id}] delete triggered.") print_json(agent) @@ -699,8 +715,17 @@ def print_json(data: Dict) -> None: :param data: dict to print """ - result = json.dumps(data, indent=4) - click.echo(result) + click.echo(make_json(data)) + + +def make_json(data: Dict) -> str: + """ + Make json. + + :param data: dict to print + :return: str + """ + return json.dumps(data, indent=4) @click.group(name="service") @@ -737,12 +762,11 @@ def service_deploy( # pylint: disable=too-many-arguments timeout: int, service_dir: str, ) -> None: - "Deploy service with keys ids and variables from service file and env variables." + """Deploy service with keys ids and variables from service file and env variables.""" keys_list = list(map(int, keys.split(","))) service_vars = dict(get_env_vars_for_service(Path(service_dir))) environ_vars_set = set(service_vars.keys()).intersection(set(os.environ.keys())) variable_names = [] - for env_name in sorted(environ_vars_set): env_value = os.environ.get(env_name) variable_name = f"{name.upper()}_{env_name}" @@ -750,14 +774,25 @@ def service_deploy( # pylint: disable=too-many-arguments click.echo(f"Create/update variable: {variable_name}: {env_name}={env_value}") ctx.invoke(variables_create, name=variable_name, key=env_name, value=env_value) + agents = [f"{name}_agent_{idx}" for idx, _ in enumerate(keys_list)] + click.echo(f"Delete agents: {'. '.join(agents)}") + # delete first! + with ThreadPoolExecutor(max_workers=len(agents)) as executor: + for agent_name in agents: + executor.submit(ctx.invoke, agents_ensure_deleted, name_or_id=agent_name) + click.echo( f"Deploy {len(keys_list)} agents for service with variables {','.join(variable_names)}" ) for idx, key_id in enumerate(keys_list): agent_name = f"{name}_agent_{idx}" - click.echo(f"Deploying agent {agent_name} with key {key_id}") + click.echo( + f"[Agent: {agent_name}] Deploying agent {agent_name} with key {key_id}" + ) + ctx.invoke(agents_ensure_deleted, name_or_id=name) + ctx.invoke(seats_ensure) ctx.invoke( - agents_deploy, + agents_create, key=key_id, name=agent_name, variables=",".join(variable_names) if variable_names else None, @@ -766,8 +801,31 @@ def service_deploy( # pylint: disable=too-many-arguments ingress_enabled=ingress_enabled, service_ipfs_hash=service_ipfs_hash, tendermint_ingress_enabled=tendermint_ingress_enabled, - timeout=timeout, ) + with ThreadPoolExecutor(max_workers=len(agents)) as executor: + click.echo("Wait agents deployed") + for agent_name in agents: + executor.submit( + ctx.invoke, + agents_wait, + name_or_id=agent_name, + state="DEPLOYED", + timeout=timeout, + ) + click.echo("Restart agents") + for agent_name in agents: + ctx.invoke(agents_restart, name_or_id=agent_name) + + click.echo("Wait agents restarted") + with ThreadPoolExecutor(max_workers=len(agents)) as executor: + for agent_name in agents: + executor.submit( + ctx.invoke, + agents_wait, + name_or_id=agent_name, + state="STARTED", + timeout=timeout, + ) service_group.add_command(service_deploy) diff --git a/propel_client/utils.py b/propel_client/utils.py index 9b3289c..1a2ce1c 100644 --- a/propel_client/utils.py +++ b/propel_client/utils.py @@ -1,20 +1,23 @@ +"""Various utils.""" from itertools import chain from pathlib import Path -from aea.helpers.env_vars import is_env_variable, ENV_VARIABLE_RE +from aea.helpers.env_vars import ENV_VARIABLE_RE, is_env_variable from autonomy.configurations.loader import load_service_config def get_all_env_vars(d): + """Get env vars from dict.""" for value in d.values(): if is_env_variable(value): result = ENV_VARIABLE_RE.match(value) - _, var_name, type_str, _, default = result.groups() + _, var_name, _, _, default = result.groups() yield var_name, default if isinstance(value, dict): yield from get_all_env_vars(value) def get_env_vars_for_service(service_path: Path): + """Get env vars for service path.""" service = load_service_config(service_path) return set(chain(*(get_all_env_vars(i) for i in service.overrides))) diff --git a/scripts/check_copyright.py b/scripts/check_copyright.py index 156cfc9..a667fa8 100755 --- a/scripts/check_copyright.py +++ b/scripts/check_copyright.py @@ -39,6 +39,7 @@ from pathlib import Path from typing import Dict, Iterator, Optional, Tuple, cast + CURRENT_YEAR = datetime.now().year GIT_PATH = shutil.which("git") START_YEARS = (2021, 2022, 2023)