Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

187316441 modify engine params for run submission #6

Merged
merged 11 commits into from
Apr 29, 2024
46 changes: 38 additions & 8 deletions dnastack/cli/workbench/runs_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import click
from click import style

from dnastack.cli.workbench.utils import get_ewes_client
from dnastack.cli.workbench.utils import get_ewes_client, NoDefaultEngineError, \
UnableToFindParameterError
from dnastack.client.workbench.ewes.models import ExtendedRunListOptions, ExtendedRunRequest, BatchRunRequest, \
MinimalExtendedRunWithOutputs, MinimalExtendedRunWithInputs, TaskListOptions, State
MinimalExtendedRunWithOutputs, MinimalExtendedRunWithInputs, TaskListOptions, State, ExecutionEngineListOptions
from dnastack.client.workbench.ewes.models import LogType
from dnastack.cli.helpers.command.decorator import command
from dnastack.cli.helpers.command.spec import ArgumentSpec
Expand Down Expand Up @@ -438,8 +439,9 @@ def write_logs(iterable: Iterable[bytes], writer):
name='default_workflow_engine_parameters',
arg_names=['--engine-params'],
help='Set the global engine parameters for all runs that are to be submitted. '
'Engine params can be specified as a KV pair, inlined JSON, or as a json file preceded by the "@"'
'symbol.',
'Engine params can be specified as inlined JSON, json file preceded by the "@" symbol, '
'KV pair, parameter preset ID, or as a comma-separated-list containing any of those types '
'(e.g. my-preset-id,key=value,\'{"literal":"json"}\',@file.json).',
as_option=True,
default=None,
required=False
Expand Down Expand Up @@ -478,8 +480,8 @@ def write_logs(iterable: Iterable[bytes], writer):
),
ArgumentSpec(
name='overrides',
help='Additional arguments to set input values for all runs. The override values can be any JSON-like value'
'such as inline JSON, command separated key value pairs or'
help='Additional arguments to set input values for all runs. The override values can be any '
'JSON-like value such as inline JSON, command separated key value pairs or '
'a json file referenced preceded by the "@" symbol.',
as_option=False,
default=None,
Expand All @@ -506,15 +508,43 @@ def submit_batch(context: Optional[str],

ewes_client = get_ewes_client(context_name=context, endpoint_id=endpoint_id, namespace=namespace)

def get_default_engine_id():
list_options = ExecutionEngineListOptions()
engines = ewes_client.list_engines(list_options)
for engine in engines:
if engine.default:
return engine.id
raise NoDefaultEngineError("No default engine found. Please specify an engine id using the --engine flag "
"or in the workflow engine parameters list using ENGINE_ID_KEY=....")

if default_workflow_engine_parameters:
[param_ids_list, kv_pairs_list, json_literals_list,
files_list] = default_workflow_engine_parameters.extract_arguments_list()

param_presets = merge_param_json_data(kv_pairs_list, json_literals_list, files_list)

if param_ids_list:
if not engine_id:
engine_id = get_default_engine_id()
for param_id in param_ids_list:
try:
param_preset = ewes_client.get_engine_param_preset(engine_id, param_id)
merge(param_presets, param_preset.preset_values)
except Exception as e:
raise UnableToFindParameterError(f"Unable to find engine parameter preset with id {param_id}. {e}")

default_workflow_engine_parameters = param_presets
else:
default_workflow_engine_parameters = None

batch_request: BatchRunRequest = BatchRunRequest(
workflow_url=workflow_url,
workflow_type="WDL",
engine_id=engine_id,
default_workflow_engine_parameters=default_workflow_engine_parameters.parsed_value() if default_workflow_engine_parameters else None,
default_workflow_engine_parameters=default_workflow_engine_parameters,
default_workflow_params=default_workflow_params.parsed_value() if default_workflow_params else None,
default_tags=tags.parsed_value() if tags else None,
run_requests=list()

)

for workflow_param in workflow_params:
Expand Down
19 changes: 12 additions & 7 deletions dnastack/cli/workbench/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from dnastack.cli.config.context import ContextCommandHandler
from dnastack.cli.helpers.client_factory import ConfigurationBasedClientFactory


DEFAULT_WORKBENCH_DESTINATION = "workbench.omics.ai"


Expand All @@ -18,7 +17,7 @@ def _populate_workbench_endpoint():


def get_user_client(context_name: Optional[str] = None,
endpoint_id: Optional[str] = None) -> WorkbenchUserClient:
endpoint_id: Optional[str] = None) -> WorkbenchUserClient:
factory: ConfigurationBasedClientFactory = container.get(ConfigurationBasedClientFactory)
try:
return factory.get(WorkbenchUserClient, endpoint_id=endpoint_id, context_name=context_name)
Expand All @@ -30,8 +29,6 @@ def get_user_client(context_name: Optional[str] = None,
def get_ewes_client(context_name: Optional[str] = None,
endpoint_id: Optional[str] = None,
namespace: Optional[str] = None) -> EWesClient:


if not namespace:
user_client = get_user_client(context_name=context_name, endpoint_id=endpoint_id)
namespace = user_client.get_user_config().default_namespace
Expand Down Expand Up @@ -59,8 +56,6 @@ def get_workflow_client(context_name: Optional[str] = None,
return factory.get(WorkflowClient, endpoint_id=endpoint_id, context_name=context_name, namespace=namespace)




class UnableToMergeJsonError(RuntimeError):
def __init__(self, key):
super().__init__(f'Unable to merge key {key}. The value for {key} must be of type dict, str, int or float')
Expand Down Expand Up @@ -90,6 +85,16 @@ class UnableToCreateFilePathError(Exception):
def __init__(self, message: str):
super().__init__(message)


class UnableToWriteToFileError(Exception):
def __init__(self, message: str):
super().__init__(message)
super().__init__(message)


class UnableToFindParameterError(Exception):
def __init__(self, message: str):
super().__init__(message)

class NoDefaultEngineError(Exception):
def __init__(self, message: str):
super().__init__(message)
78 changes: 68 additions & 10 deletions dnastack/common/json_argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import traceback
from enum import Enum
from io import UnsupportedOperation
from typing import List, Dict, Union
from typing import List, Dict, Union, Tuple

# from dnastack.cli.workbench.utils import UnableToDecodeFileError, UnableToDecodeJSONDataError
from dnastack.common.logger import get_logger
from dnastack.feature_flags import in_global_debug_mode

Expand All @@ -19,17 +20,18 @@
if in_global_debug_mode:
logger.warning("Could start importing httpie modules but failed to finish it.")
traceback.print_exc()
logger.warning("This module will be imported but there will be no guarantee that the dependant code will work normally.")

logger.warning(
"This module will be imported but there will be no guarantee that the dependant code will work normally.")

JSONType = Union[str, bool, int, list, dict]
KV_PAIR_SEPARATOR = ","
LIST_SEPARATOR = ","


class ArgumentType(str, Enum):
JSON_LITERAL_PARAM_TYPE = "JSON_LITERAL"
FILE = "FILE"
KV_PARAM_TYPE = "KEY_VALUE"
STRING_PARAM_TYPE = "STRING_PARAM"
UNKNOWN_PARAM_TYPE = "UNKNOWN"


Expand Down Expand Up @@ -58,10 +60,32 @@ class JsonLike(FileOrValue):
def parsed_value(self) -> JSONType:
value = super().value()
if self.argument_type == ArgumentType.KV_PARAM_TYPE:
return parse_kv_arguments(split_kv_pairs(value))
return parse_kv_arguments(split_arguments_list(value))
else:
return json.loads(value)

def extract_arguments_list(self) -> Tuple[List[str], List[str], List[str], List[str]]:
# ordered as lists of param ids, kv pairs, json literals, files
params_ids = []
kv_pairs = []
json_literals = []
files = []
value = self.raw_value
separated_arguments = split_arguments_list(value)
for arg in separated_arguments:
arg_type = get_argument_type(arg)
if arg_type == ArgumentType.STRING_PARAM_TYPE:
params_ids.append(arg)
if arg_type == ArgumentType.KV_PARAM_TYPE:
kv_pairs.append(arg)
if arg_type == ArgumentType.JSON_LITERAL_PARAM_TYPE:
json_literals.append(arg)
if arg_type == ArgumentType.FILE:
files.append(arg)
if arg_type == ArgumentType.UNKNOWN_PARAM_TYPE:
raise ValueError(f"Cannot recognize parameter: {arg}")
return params_ids, kv_pairs, json_literals, files


def merge(base, override_dict, path=None):
"""
Expand All @@ -83,9 +107,10 @@ def merge(base, override_dict, path=None):
return base


def split_kv_pairs(kv_pairs: str) -> List[str]:
kv_pairs = kv_pairs.replace("\\,", "%2C")
return [kv_pair.replace("\\,", ",").replace("%2C", ",") for kv_pair in kv_pairs.split(KV_PAIR_SEPARATOR)]
def split_arguments_list(args_list: str) -> List[str]:
args_list = args_list.replace("\\,", "%2C")
return [arg.replace("\\,", ",").replace("%2C", ",")
for arg in args_list.split(LIST_SEPARATOR)]


def is_json_object_or_array_string(string: str) -> bool:
Expand All @@ -111,7 +136,7 @@ def get_argument_type(argument: str) -> str:
return ArgumentType.JSON_LITERAL_PARAM_TYPE
if "=" in argument:
return ArgumentType.KV_PARAM_TYPE
return ArgumentType.UNKNOWN_PARAM_TYPE
return ArgumentType.STRING_PARAM_TYPE


def parse_kv_arguments(arguments: List[str]) -> Union[List[JSONType], Dict[str, JSONType]]:
Expand Down Expand Up @@ -140,7 +165,7 @@ def parse_and_merge_arguments(arguments: List[JsonLike]) -> Dict[str, JSONType]:
kv_arguments = list()
for argument in arguments:
if argument.argument_type == ArgumentType.KV_PARAM_TYPE:
kv_arguments.extend(split_kv_pairs(argument.value()))
kv_arguments.extend(split_arguments_list(argument.value()))
elif argument.argument_type == ArgumentType.UNKNOWN_PARAM_TYPE:
raise ValueError(f"Cannot merge non json value from argument: {argument}")
else:
Expand All @@ -150,3 +175,36 @@ def parse_and_merge_arguments(arguments: List[JsonLike]) -> Dict[str, JSONType]:
if kv_arguments_result:
merge(arguments_results, kv_arguments_result)
return arguments_results


def merge_param_json_data(kv_pairs_list: List[str], json_literals_list: List[str],
files_list: List[str]) -> Dict[str, JSONType]:
param_presets = dict()

# in this ordering the JSON content of the file will be overwritten by any keys with the same values
if files_list:
for file_path in files_list:
try:
file_data = read_file_content(file_path)
file_json = json.loads(file_data)
merge(param_presets, file_json)
except Exception as e:
raise ValueError(f"Failed to parse and merge the JSON file {file_path}. {e}")

if json_literals_list:
for json_literal in json_literals_list:
try:
json_literal_data = json.loads(json_literal)
merge(param_presets, json_literal_data)
except Exception as e:
raise ValueError(f"Failed to parse and merge the JSON literal {json_literal}. {e}")

if kv_pairs_list:
try:
kv_params = parse_kv_arguments(kv_pairs_list)
merge(param_presets, kv_params)
except Exception as e:
raise ValueError(f"Failed to parse and merge the key value pairs {kv_pairs_list}. "
f"Ensure they are each separated with a comma. {e}")

return param_presets
73 changes: 72 additions & 1 deletion tests/cli/test_workbench.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import io
import json
import logging
import os
import shutil
Expand All @@ -10,7 +11,8 @@

from dnastack.alpha.client.workflow.models import Workflow, WorkflowVersion
from dnastack.client.workbench.ewes.models import ExtendedRunStatus, ExtendedRun, BatchActionResult, BatchRunResponse, \
MinimalExtendedRunWithInputs, MinimalExtendedRun, MinimalExtendedRunWithOutputs, ExecutionEngine, EngineParamPreset
MinimalExtendedRunWithInputs, MinimalExtendedRun, MinimalExtendedRunWithOutputs, ExecutionEngine, EngineParamPreset, \
BatchRunRequest
from .base import WorkbenchCliTestCase


Expand Down Expand Up @@ -353,6 +355,75 @@ def test_submit_batch_with_multiple_params():

test_submit_batch_with_multiple_params()

def test_submit_batch_with_engine_key_value_param():
submitted_batch = BatchRunResponse(**self.simple_invoke(
'workbench', 'runs', 'submit',
'--url', hello_world_workflow_url,
'--engine-params', 'key=value'
))
self.assertEqual(len(submitted_batch.runs), 1, 'Expected exactly one run to be submitted.')
described_runs = [ExtendedRun(**described_run) for described_run in self.simple_invoke(
'workbench', 'runs', 'describe',
submitted_batch.runs[0].run_id
)]
self.assertEqual(len(described_runs), 1, f'Expected exactly one run. Found {described_runs}')
self.assertEqual(described_runs[0].request.workflow_engine_parameters, {'key': 'value'},
f'Expected workflow engine params to be exactly the same. '
f'Found {described_runs[0].request.workflow_engine_parameters}')

test_submit_batch_with_engine_key_value_param()

def test_submit_batch_with_engine_preset_param():
submitted_batch = BatchRunResponse(**self.simple_invoke(
'workbench', 'runs', 'submit',
'--url', hello_world_workflow_url,
'--engine-params', self.engine_params.id,
))
self.assertEqual(len(submitted_batch.runs), 1, 'Expected exactly one run to be submitted.')
described_runs = [ExtendedRun(**described_run) for described_run in self.simple_invoke(
'workbench', 'runs', 'describe',
submitted_batch.runs[0].run_id
)]

self.assertEqual(len(described_runs), 1, f'Expected exactly one run. Found {described_runs}')
processed_engine_params = {'engine_id': self.execution_engine.id}
processed_engine_params.update(self.engine_params.preset_values)
self.assertEqual(described_runs[0].request.workflow_engine_parameters, processed_engine_params,
f'Expected workflow engine params to be exactly the same. '
f'Found {described_runs[0].request.workflow_engine_parameters}')

test_submit_batch_with_engine_preset_param()

def test_submit_batch_with_engine_mixed_param_types():
submitted_batch = BatchRunResponse(**self.simple_invoke(
'workbench', 'runs', 'submit',
'--url', hello_world_workflow_url,
'--engine-params', f'goodbye=moon,'
f'hello=world,'
f'{self.engine_params.id},'
f'{json.dumps({"hello":"world"})},'
f'@{input_json_file}',
))
self.assertEqual(len(submitted_batch.runs), 1, 'Expected exactly one run to be submitted.')
described_runs = [ExtendedRun(**described_run) for described_run in self.simple_invoke(
'workbench', 'runs', 'describe',
submitted_batch.runs[0].run_id
)]

self.assertEqual(len(described_runs), 1, f'Expected exactly one run. Found {described_runs}')
processed_engine_params = {
'engine_id': self.execution_engine.id,
'test.hello.name': 'bar',
'hello': 'world',
'goodbye': 'moon',
**self.engine_params.preset_values
}
self.assertEqual(described_runs[0].request.workflow_engine_parameters, processed_engine_params,
f'Expected workflow engine params to be exactly the same. '
f'Found {described_runs[0].request.workflow_engine_parameters}')

test_submit_batch_with_engine_mixed_param_types()

def test_workflows_list(self):
result = [Workflow(**workflow) for workflow in self.simple_invoke(
'workbench', 'workflows', 'list'
Expand Down
4 changes: 2 additions & 2 deletions tests/exam_helper_for_workbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class BaseWorkbenchTestCase(WithTestUserTestCase):
).json())))
namespace: str = None
hello_world_workflow: Workflow = None
engine_param = {
engine_params = {
"id": "presetId",
"name": "presetName",
"preset_values": {
Expand Down Expand Up @@ -215,7 +215,7 @@ def _create_execution_engine(cls, session: HttpSession) -> ExecutionEngine:
def _add_execution_engine_parameter(cls, session: HttpSession, engine_id: str) -> EngineParamPreset:
response = session.post(urljoin(cls.workbench_base_url,
f'/services/ewes-service/{cls.namespace}/engines/{engine_id}/param-presets'),
json=cls.engine_param)
json=cls.engine_params)
return EngineParamPreset(**response.json())

@classmethod
Expand Down
Loading