Skip to content

Commit

Permalink
refactor with proper values
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Dec 13, 2024
1 parent ff3298c commit 2d1b08e
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ async def _start_tasks(
hardware_info=task.hardware_info,
callback=wake_up_callback,
metadata=comp_run.metadata,
run_id=comp_run.run_id,
resource_tracking_run_id=get_resource_tracking_run_id(
user_id, project_id, node_id, comp_run.iteration
),
)
for node_id, task in scheduled_tasks.items()
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from models_library.projects_nodes_io import NodeID
from models_library.resource_tracker import HardwareInfo
from models_library.users import UserID
from pydantic import PositiveInt, TypeAdapter, ValidationError
from pydantic import TypeAdapter, ValidationError
from pydantic.networks import AnyUrl
from servicelib.logging_utils import log_catch
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -293,7 +293,7 @@ async def send_computation_tasks(
remote_fct: ContainerRemoteFct | None = None,
metadata: RunMetadataDict,
hardware_info: HardwareInfo,
run_id: PositiveInt,
resource_tracking_run_id: str,
) -> list[PublishedComputationTask]:
"""actually sends the function remote_fct to be remotely executed. if None is kept then the default
function that runs container will be started.
Expand Down Expand Up @@ -397,7 +397,7 @@ async def send_computation_tasks(
node_id=node_id,
node_image=node_image,
metadata=metadata,
run_id=run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
task_owner = dask_utils.compute_task_owner(
user_id, project_id, node_id, metadata.get("project_metadata", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from models_library.services_types import RunID
from models_library.users import UserID
from models_library.utils.specs_substitution import SpecsSubstitutionsResolver
from pydantic import BaseModel, PositiveInt
from pydantic import BaseModel
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.logging_utils import log_context

Expand Down Expand Up @@ -225,7 +225,7 @@ async def resolve_and_substitute_session_variables_in_specs(
product_name: str,
project_id: ProjectID,
node_id: NodeID,
run_id: RunID | PositiveInt,
run_id: RunID | str,
) -> dict[str, Any]:
table = OsparcSessionVariablesTable.get_from_app_state(app)
resolver = SpecsSubstitutionsResolver(specs, upgrade=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from models_library.projects_nodes_io import NodeID, NodeIDStr
from models_library.services import ServiceKey, ServiceVersion
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter, ValidationError
from pydantic import AnyUrl, ByteSize, TypeAdapter, ValidationError
from servicelib.logging_utils import log_catch, log_context
from simcore_sdk import node_ports_v2
from simcore_sdk.node_ports_common.exceptions import (
Expand Down Expand Up @@ -342,7 +342,7 @@ async def compute_task_envs(
node_id: NodeID,
node_image: Image,
metadata: RunMetadataDict,
run_id: PositiveInt,
resource_tracking_run_id: str,
) -> ContainerEnvsDict:
product_name = metadata.get("product_name", UNDEFINED_DOCKER_LABEL)
task_envs = node_image.envs
Expand All @@ -361,7 +361,7 @@ async def compute_task_envs(
product_name=product_name,
project_id=project_id,
node_id=node_id,
run_id=run_id,
run_id=resource_tracking_run_id,
)
# NOTE: see https://github.com/ITISFoundation/osparc-simcore/issues/3638
# we currently do not validate as we are using illegal docker key names with underscores
Expand Down
14 changes: 11 additions & 3 deletions services/director-v2/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@
from models_library.generated_models.docker_rest_api import (
ServiceSpec as DockerServiceSpec,
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.service_settings_labels import SimcoreServiceLabels
from models_library.services import RunID, ServiceKey, ServiceKeyVersion, ServiceVersion
from models_library.services_enums import ServiceState
from models_library.users import UserID
from models_library.utils._original_fastapi_encoders import jsonable_encoder
from pydantic import PositiveInt, TypeAdapter
from pydantic import TypeAdapter
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.s3 import S3Settings
from simcore_sdk.node_ports_v2 import FileLinkType
from simcore_service_director_v2.constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL
from simcore_service_director_v2.core.settings import AppSettings
from simcore_service_director_v2.models.dynamic_services_scheduler import SchedulerData
from simcore_service_director_v2.modules.comp_scheduler._utils import (
get_resource_tracking_run_id,
)


@pytest.fixture
Expand Down Expand Up @@ -341,5 +347,7 @@ async def async_docker_client() -> AsyncIterable[aiodocker.Docker]:


@pytest.fixture
def comp_task_run_id() -> PositiveInt:
return 42
def resource_tracking_run_id(
user_id: UserID, project_id: ProjectID, node_id: NodeID
) -> str:
return get_resource_tracking_run_id(user_id, project_id, node_id, 42)
50 changes: 25 additions & 25 deletions services/director-v2/tests/unit/test_modules_dask_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from models_library.projects_nodes_io import NodeID
from models_library.resource_tracker import HardwareInfo
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter
from pydantic import AnyUrl, ByteSize, TypeAdapter
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.s3 import S3Settings
Expand Down Expand Up @@ -442,7 +442,7 @@ async def test_send_computation_task(
task_labels: ContainerLabelsDict,
empty_hardware_info: HardwareInfo,
faker: Faker,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
_DASK_EVENT_NAME = faker.pystr()

Expand Down Expand Up @@ -504,7 +504,7 @@ def fake_sidecar_fct(
),
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert node_id_to_job_ids
assert len(node_id_to_job_ids) == 1
Expand Down Expand Up @@ -561,7 +561,7 @@ async def test_computation_task_is_persisted_on_dask_scheduler(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
"""rationale:
When a task is submitted to the dask backend, a dask future is returned.
Expand Down Expand Up @@ -597,7 +597,7 @@ def fake_sidecar_fct(
remote_fct=fake_sidecar_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -653,7 +653,7 @@ async def test_abort_computation_tasks(
faker: Faker,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
_DASK_EVENT_NAME = faker.pystr()

Expand Down Expand Up @@ -692,7 +692,7 @@ def fake_remote_fct(
remote_fct=fake_remote_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -744,7 +744,7 @@ async def test_failed_task_returns_exceptions(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# NOTE: this must be inlined so that the test works,
# the dask-worker must be able to import the function
Expand All @@ -765,7 +765,7 @@ def fake_failing_sidecar_fct(
remote_fct=fake_failing_sidecar_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -808,7 +808,7 @@ async def test_send_computation_task_with_missing_resources_raises(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# remove the workers that can handle gpu
scheduler_info = dask_client.backend.client.scheduler_info()
Expand All @@ -835,7 +835,7 @@ async def test_send_computation_task_with_missing_resources_raises(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -854,7 +854,7 @@ async def test_send_computation_task_with_hardware_info_raises(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# NOTE: running on the default cluster will raise missing resources
with pytest.raises(MissingComputationalResourcesError):
Expand All @@ -866,7 +866,7 @@ async def test_send_computation_task_with_hardware_info_raises(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -884,7 +884,7 @@ async def test_too_many_resources_send_computation_task(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# create an image that needs a huge amount of CPU
image = Image(
Expand All @@ -908,7 +908,7 @@ async def test_too_many_resources_send_computation_task(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)

mocked_user_completed_cb.assert_not_called()
Expand All @@ -925,7 +925,7 @@ async def test_disconnected_backend_raises_exception(
mocked_storage_service_api: respx.MockRouter,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# DISCONNECT THE CLUSTER
await dask_spec_local_cluster.close() # type: ignore
Expand All @@ -938,7 +938,7 @@ async def test_disconnected_backend_raises_exception(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -958,7 +958,7 @@ async def test_changed_scheduler_raises_exception(
unused_tcp_port_factory: Callable,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# change the scheduler (stop the current one and start another at the same address)
scheduler_address = URL(dask_spec_local_cluster.scheduler_address)
Expand Down Expand Up @@ -988,7 +988,7 @@ async def test_changed_scheduler_raises_exception(
remote_fct=None,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
mocked_user_completed_cb.assert_not_called()

Expand All @@ -1006,7 +1006,7 @@ async def test_get_tasks_status(
fail_remote_fct: bool,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
# NOTE: this must be inlined so that the test works,
# the dask-worker must be able to import the function
Expand Down Expand Up @@ -1034,7 +1034,7 @@ def fake_remote_fct(
remote_fct=fake_remote_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -1089,7 +1089,7 @@ async def test_dask_sub_handlers(
fake_task_handlers: TaskHandlers,
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
dask_client.register_handlers(fake_task_handlers)
_DASK_START_EVENT = "start"
Expand Down Expand Up @@ -1119,7 +1119,7 @@ def fake_remote_fct(
remote_fct=fake_remote_fct,
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down Expand Up @@ -1164,7 +1164,7 @@ async def test_get_cluster_details(
comp_run_metadata: RunMetadataDict,
empty_hardware_info: HardwareInfo,
faker: Faker,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
cluster_details = await dask_client.get_cluster_details()
assert cluster_details
Expand Down Expand Up @@ -1201,7 +1201,7 @@ def fake_sidecar_fct(
),
metadata=comp_run_metadata,
hardware_info=empty_hardware_info,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert published_computation_task
assert len(published_computation_task) == 1
Expand Down
6 changes: 3 additions & 3 deletions services/director-v2/tests/unit/with_dbs/test_utils_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID, SimCoreFileLink, SimcoreS3FileID
from models_library.users import UserID
from pydantic import ByteSize, PositiveInt, TypeAdapter
from pydantic import ByteSize, TypeAdapter
from pydantic.networks import AnyUrl
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
Expand Down Expand Up @@ -647,7 +647,7 @@ async def test_compute_task_envs(
run_metadata: RunMetadataDict,
input_task_envs: ContainerEnvsDict,
expected_computed_task_envs: ContainerEnvsDict,
comp_task_run_id: PositiveInt,
resource_tracking_run_id: str,
):
sleeper_task: CompTaskAtDB = published_project.tasks[1]
sleeper_task.image.envs = input_task_envs
Expand All @@ -659,6 +659,6 @@ async def test_compute_task_envs(
node_id=sleeper_task.node_id,
node_image=sleeper_task.image,
metadata=run_metadata,
run_id=comp_task_run_id,
resource_tracking_run_id=resource_tracking_run_id,
)
assert task_envs == expected_computed_task_envs

0 comments on commit 2d1b08e

Please sign in to comment.