Skip to content

Commit

Permalink
service deploy agents in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
solarw committed Jan 10, 2024
1 parent 92ea8f2 commit 7cf5750
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 18 deletions.
90 changes: 74 additions & 16 deletions propel_client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
#
# ------------------------------------------------------------------------------
"""CLI implementation."""
import os
import json
from pathlib import Path
import os
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from sys import stdin
from typing import Any, Callable, Dict, Optional

Expand Down Expand Up @@ -484,22 +485,30 @@ def agents_get(obj: ClickAPPObject, name_or_id: str) -> None:
@click.argument("name_or_id", type=str, required=True)
@click.argument("state", type=str, required=True)
@click.option("--timeout", type=int, required=False, default=120)
def agents_wait(obj: ClickAPPObject, name_or_id: str, state: str, timeout: int) -> None:
@click.option("--period", type=int, required=False, default=10)
def agents_wait(
obj: ClickAPPObject, name_or_id: str, state: str, timeout: int, period: int
) -> None:
"""
Wait agent command.
:param obj: ClickAPPObject
:param name_or_id: str
:param state: str
:param timeout: int
:param period: int in seconds
"""
try:
for cur_state in obj.propel_client.agents_wait_for_state_iter(
agent_name_or_id=name_or_id, state=state, timeout=timeout
agent_name_or_id=name_or_id, state=state, timeout=timeout, period=period
):
print("STATE:", cur_state)
click.echo(
f"[Agent: {name_or_id}] state: {cur_state}, waiting for {state} for next {period} seconds"
)
except TimeoutError as e:
raise click.ClickException(f"Timeout during wait for state: {state}") from e
raise click.ClickException(
f"[Agent: {name_or_id}] Timeout during wait for state: {state}"
) from e


@click.command(name="ensure-deleted")
Expand All @@ -516,7 +525,7 @@ def agents_ensure_deleted(obj: ClickAPPObject, name_or_id: str, timeout: int) ->
:param obj: ClickAPPObject
"""
if _is_deleted(obj.propel_client, name_or_id):
print("already deleted")
click.echo(f"[Agent: {name_or_id}] already deleted")
return

obj.propel_client.agents_stop(name_or_id)
Expand All @@ -530,10 +539,12 @@ def agents_ensure_deleted(obj: ClickAPPObject, name_or_id: str, timeout: int) ->
break

if (time.time() - started) < timeout:
raise click.ClickException("timeout!")
raise click.ClickException(
f"[Agent: {name_or_id}] timeout waiting for deleted!"
)
time.sleep(3)

click.echo("Agent was deleted")
click.echo(f"[Agent: {name_or_id}] Agent was deleted")


def _is_deleted(client: PropelClient, name_or_id: str) -> bool:
Expand Down Expand Up @@ -565,6 +576,7 @@ def agents_restart(obj: ClickAPPObject, name_or_id: str) -> None:
:param obj: ClickAPPObject
"""
agent = obj.propel_client.agents_restart(name_or_id)
click.echo(f"[Agent: {name_or_id}] restart triggered.")
print_json(agent)


Expand All @@ -579,6 +591,7 @@ def agents_stop(obj: ClickAPPObject, name_or_id: str) -> None:
:param obj: ClickAPPObject
"""
agent = obj.propel_client.agents_stop(name_or_id)
click.echo(f"[Agent: {name_or_id}] stop triggered.")
print_json(agent)


Expand All @@ -596,6 +609,7 @@ def agents_variables_add(obj: ClickAPPObject, name_or_id: str, variables: str) -
"""
variables_list = variables.split(",") or [] if variables else []
agent = obj.propel_client.agents_variables_add(name_or_id, variables_list)
click.echo(f"[Agent: {name_or_id}] variables added {variables_list}.")
print_json(agent)


Expand All @@ -615,6 +629,7 @@ def agents_variables_remove(
"""
variables_list = variables.split(",") or [] if variables else []
agent = obj.propel_client.agents_variables_remove(name_or_id, variables_list)
click.echo(f"[Agent: {name_or_id}] variables removed {variables_list}.")
print_json(agent)


Expand All @@ -629,6 +644,7 @@ def agents_delete(obj: ClickAPPObject, name_or_id: str) -> None:
:param obj: ClickAPPObject
"""
agent = obj.propel_client.agents_delete(name_or_id)
click.echo(f"[Agent: {name_or_id}] delete triggered.")
print_json(agent)


Expand Down Expand Up @@ -699,8 +715,17 @@ def print_json(data: Dict) -> None:
:param data: dict to print
"""
result = json.dumps(data, indent=4)
click.echo(result)
click.echo(make_json(data))


def make_json(data: Dict) -> str:
"""
Make json.
:param data: dict to print
:return: str
"""
return json.dumps(data, indent=4)


@click.group(name="service")
Expand Down Expand Up @@ -737,27 +762,37 @@ def service_deploy( # pylint: disable=too-many-arguments
timeout: int,
service_dir: str,
) -> None:
"Deploy service with keys ids and variables from service file and env variables."
"""Deploy service with keys ids and variables from service file and env variables."""
keys_list = list(map(int, keys.split(",")))
service_vars = dict(get_env_vars_for_service(Path(service_dir)))
environ_vars_set = set(service_vars.keys()).intersection(set(os.environ.keys()))
variable_names = []

for env_name in sorted(environ_vars_set):
env_value = os.environ.get(env_name)
variable_name = f"{name.upper()}_{env_name}"
variable_names.append(variable_name)
click.echo(f"Create/update variable: {variable_name}: {env_name}={env_value}")
ctx.invoke(variables_create, name=variable_name, key=env_name, value=env_value)

agents = [f"{name}_agent_{idx}" for idx, _ in enumerate(keys_list)]
click.echo(f"Delete agents: {'. '.join(agents)}")
# delete first!
with ThreadPoolExecutor(max_workers=len(agents)) as executor:
for agent_name in agents:
executor.submit(ctx.invoke, agents_ensure_deleted, name_or_id=agent_name)

click.echo(
f"Deploy {len(keys_list)} agents for service with variables {','.join(variable_names)}"
)
for idx, key_id in enumerate(keys_list):
agent_name = f"{name}_agent_{idx}"
click.echo(f"Deploying agent {agent_name} with key {key_id}")
click.echo(
f"[Agent: {agent_name}] Deploying agent {agent_name} with key {key_id}"
)
ctx.invoke(agents_ensure_deleted, name_or_id=name)
ctx.invoke(seats_ensure)
ctx.invoke(
agents_deploy,
agents_create,
key=key_id,
name=agent_name,
variables=",".join(variable_names) if variable_names else None,
Expand All @@ -766,8 +801,31 @@ def service_deploy( # pylint: disable=too-many-arguments
ingress_enabled=ingress_enabled,
service_ipfs_hash=service_ipfs_hash,
tendermint_ingress_enabled=tendermint_ingress_enabled,
timeout=timeout,
)
with ThreadPoolExecutor(max_workers=len(agents)) as executor:
click.echo("Wait agents deployed")
for agent_name in agents:
executor.submit(
ctx.invoke,
agents_wait,
name_or_id=agent_name,
state="DEPLOYED",
timeout=timeout,
)
click.echo("Restart agents")
for agent_name in agents:
ctx.invoke(agents_restart, name_or_id=agent_name)

click.echo("Wait agents restarted")
with ThreadPoolExecutor(max_workers=len(agents)) as executor:
for agent_name in agents:
executor.submit(
ctx.invoke,
agents_wait,
name_or_id=agent_name,
state="STARTED",
timeout=timeout,
)


service_group.add_command(service_deploy)
Expand Down
7 changes: 5 additions & 2 deletions propel_client/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
"""Various utils."""
from itertools import chain
from pathlib import Path

from aea.helpers.env_vars import is_env_variable, ENV_VARIABLE_RE
from aea.helpers.env_vars import ENV_VARIABLE_RE, is_env_variable
from autonomy.configurations.loader import load_service_config


def get_all_env_vars(d):
"""Get env vars from dict."""
for value in d.values():
if is_env_variable(value):
result = ENV_VARIABLE_RE.match(value)
_, var_name, type_str, _, default = result.groups()
_, var_name, _, _, default = result.groups()
yield var_name, default
if isinstance(value, dict):
yield from get_all_env_vars(value)


def get_env_vars_for_service(service_path: Path):
"""Get env vars for service path."""
service = load_service_config(service_path)
return set(chain(*(get_all_env_vars(i) for i in service.overrides)))
1 change: 1 addition & 0 deletions scripts/check_copyright.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from pathlib import Path
from typing import Dict, Iterator, Optional, Tuple, cast


CURRENT_YEAR = datetime.now().year
GIT_PATH = shutil.which("git")
START_YEARS = (2021, 2022, 2023)
Expand Down

0 comments on commit 7cf5750

Please sign in to comment.