From 2d1b08e9dae6e5fcd22329b26e78eb45a97998e8 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 13 Dec 2024 09:44:31 +0100 Subject: [PATCH] refactor with proper values --- .../modules/comp_scheduler/_scheduler_dask.py | 4 +- .../modules/dask_client.py | 6 +-- .../modules/osparc_variables/substitutions.py | 4 +- .../simcore_service_director_v2/utils/dask.py | 6 +-- services/director-v2/tests/unit/conftest.py | 14 ++++-- .../tests/unit/test_modules_dask_client.py | 50 +++++++++---------- .../tests/unit/with_dbs/test_utils_dask.py | 6 +-- 7 files changed, 50 insertions(+), 40 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py index d3230b7d638..70fcc7a4923 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_dask.py @@ -129,7 +129,9 @@ async def _start_tasks( hardware_info=task.hardware_info, callback=wake_up_callback, metadata=comp_run.metadata, - run_id=comp_run.run_id, + resource_tracking_run_id=get_resource_tracking_run_id( + user_id, project_id, node_id, comp_run.iteration + ), ) for node_id, task in scheduled_tasks.items() ), diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 1cfcab0859b..7ca2b9e2d6f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -48,7 +48,7 @@ from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo from models_library.users import UserID -from pydantic import PositiveInt, TypeAdapter, ValidationError +from pydantic import TypeAdapter, ValidationError from pydantic.networks import AnyUrl from servicelib.logging_utils import log_catch from settings_library.s3 import S3Settings @@ -293,7 +293,7 @@ async def send_computation_tasks( remote_fct: ContainerRemoteFct | None = None, metadata: RunMetadataDict, hardware_info: HardwareInfo, - run_id: PositiveInt, + resource_tracking_run_id: str, ) -> list[PublishedComputationTask]: """actually sends the function remote_fct to be remotely executed. if None is kept then the default function that runs container will be started. @@ -397,7 +397,7 @@ async def send_computation_tasks( node_id=node_id, node_image=node_image, metadata=metadata, - run_id=run_id, + resource_tracking_run_id=resource_tracking_run_id, ) task_owner = dask_utils.compute_task_owner( user_id, project_id, node_id, metadata.get("project_metadata", {}) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py index 7dcc3b32c59..5c551665dd2 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/osparc_variables/substitutions.py @@ -21,7 +21,7 @@ from models_library.services_types import RunID from models_library.users import UserID from models_library.utils.specs_substitution import SpecsSubstitutionsResolver -from pydantic import BaseModel, PositiveInt +from pydantic import BaseModel from servicelib.fastapi.app_state import SingletonInAppStateMixin from servicelib.logging_utils import log_context @@ -225,7 +225,7 @@ async def resolve_and_substitute_session_variables_in_specs( product_name: str, project_id: ProjectID, node_id: NodeID, - run_id: RunID | PositiveInt, + run_id: RunID | str, ) -> dict[str, Any]: table = OsparcSessionVariablesTable.get_from_app_state(app) resolver = SpecsSubstitutionsResolver(specs, upgrade=False) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 425486303f6..a267632a98c 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -27,7 +27,7 @@ from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.services import ServiceKey, ServiceVersion from models_library.users import UserID -from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter, ValidationError +from pydantic import AnyUrl, ByteSize, TypeAdapter, ValidationError from servicelib.logging_utils import log_catch, log_context from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.exceptions import ( @@ -342,7 +342,7 @@ async def compute_task_envs( node_id: NodeID, node_image: Image, metadata: RunMetadataDict, - run_id: PositiveInt, + resource_tracking_run_id: str, ) -> ContainerEnvsDict: product_name = metadata.get("product_name", UNDEFINED_DOCKER_LABEL) task_envs = node_image.envs @@ -361,7 +361,7 @@ async def compute_task_envs( product_name=product_name, project_id=project_id, node_id=node_id, - run_id=run_id, + run_id=resource_tracking_run_id, ) # NOTE: see https://github.com/ITISFoundation/osparc-simcore/issues/3638 # we currently do not validate as we are using illegal docker key names with underscores diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 4d93b679239..2ec1fb8955c 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -22,11 +22,14 @@ from models_library.generated_models.docker_rest_api import ( ServiceSpec as DockerServiceSpec, ) +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID from models_library.service_settings_labels import SimcoreServiceLabels from models_library.services import RunID, ServiceKey, ServiceKeyVersion, ServiceVersion from models_library.services_enums import ServiceState +from models_library.users import UserID from models_library.utils._original_fastapi_encoders import jsonable_encoder -from pydantic import PositiveInt, TypeAdapter +from pydantic import TypeAdapter from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.s3 import S3Settings @@ -34,6 +37,9 @@ from simcore_service_director_v2.constants import DYNAMIC_SIDECAR_SCHEDULER_DATA_LABEL from simcore_service_director_v2.core.settings import AppSettings from simcore_service_director_v2.models.dynamic_services_scheduler import SchedulerData +from simcore_service_director_v2.modules.comp_scheduler._utils import ( + get_resource_tracking_run_id, +) @pytest.fixture @@ -341,5 +347,7 @@ async def async_docker_client() -> AsyncIterable[aiodocker.Docker]: @pytest.fixture -def comp_task_run_id() -> PositiveInt: - return 42 +def resource_tracking_run_id( + user_id: UserID, project_id: ProjectID, node_id: NodeID +) -> str: + return get_resource_tracking_run_id(user_id, project_id, node_id, 42) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 176475baa60..97ab940e206 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -46,7 +46,7 @@ from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo from models_library.users import UserID -from pydantic import AnyUrl, ByteSize, PositiveInt, TypeAdapter +from pydantic import AnyUrl, ByteSize, TypeAdapter from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.s3 import S3Settings @@ -442,7 +442,7 @@ async def test_send_computation_task( task_labels: ContainerLabelsDict, empty_hardware_info: HardwareInfo, faker: Faker, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): _DASK_EVENT_NAME = faker.pystr() @@ -504,7 +504,7 @@ def fake_sidecar_fct( ), metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert node_id_to_job_ids assert len(node_id_to_job_ids) == 1 @@ -561,7 +561,7 @@ async def test_computation_task_is_persisted_on_dask_scheduler( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): """rationale: When a task is submitted to the dask backend, a dask future is returned. @@ -597,7 +597,7 @@ def fake_sidecar_fct( remote_fct=fake_sidecar_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -653,7 +653,7 @@ async def test_abort_computation_tasks( faker: Faker, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): _DASK_EVENT_NAME = faker.pystr() @@ -692,7 +692,7 @@ def fake_remote_fct( remote_fct=fake_remote_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -744,7 +744,7 @@ async def test_failed_task_returns_exceptions( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # NOTE: this must be inlined so that the test works, # the dask-worker must be able to import the function @@ -765,7 +765,7 @@ def fake_failing_sidecar_fct( remote_fct=fake_failing_sidecar_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -808,7 +808,7 @@ async def test_send_computation_task_with_missing_resources_raises( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # remove the workers that can handle gpu scheduler_info = dask_client.backend.client.scheduler_info() @@ -835,7 +835,7 @@ async def test_send_computation_task_with_missing_resources_raises( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -854,7 +854,7 @@ async def test_send_computation_task_with_hardware_info_raises( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # NOTE: running on the default cluster will raise missing resources with pytest.raises(MissingComputationalResourcesError): @@ -866,7 +866,7 @@ async def test_send_computation_task_with_hardware_info_raises( remote_fct=None, metadata=comp_run_metadata, hardware_info=hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -884,7 +884,7 @@ async def test_too_many_resources_send_computation_task( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # create an image that needs a huge amount of CPU image = Image( @@ -908,7 +908,7 @@ async def test_too_many_resources_send_computation_task( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -925,7 +925,7 @@ async def test_disconnected_backend_raises_exception( mocked_storage_service_api: respx.MockRouter, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # DISCONNECT THE CLUSTER await dask_spec_local_cluster.close() # type: ignore @@ -938,7 +938,7 @@ async def test_disconnected_backend_raises_exception( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -958,7 +958,7 @@ async def test_changed_scheduler_raises_exception( unused_tcp_port_factory: Callable, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # change the scheduler (stop the current one and start another at the same address) scheduler_address = URL(dask_spec_local_cluster.scheduler_address) @@ -988,7 +988,7 @@ async def test_changed_scheduler_raises_exception( remote_fct=None, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) mocked_user_completed_cb.assert_not_called() @@ -1006,7 +1006,7 @@ async def test_get_tasks_status( fail_remote_fct: bool, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): # NOTE: this must be inlined so that the test works, # the dask-worker must be able to import the function @@ -1034,7 +1034,7 @@ def fake_remote_fct( remote_fct=fake_remote_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -1089,7 +1089,7 @@ async def test_dask_sub_handlers( fake_task_handlers: TaskHandlers, comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): dask_client.register_handlers(fake_task_handlers) _DASK_START_EVENT = "start" @@ -1119,7 +1119,7 @@ def fake_remote_fct( remote_fct=fake_remote_fct, metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 @@ -1164,7 +1164,7 @@ async def test_get_cluster_details( comp_run_metadata: RunMetadataDict, empty_hardware_info: HardwareInfo, faker: Faker, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): cluster_details = await dask_client.get_cluster_details() assert cluster_details @@ -1201,7 +1201,7 @@ def fake_sidecar_fct( ), metadata=comp_run_metadata, hardware_info=empty_hardware_info, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert published_computation_task assert len(published_computation_task) == 1 diff --git a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py index d350a609f99..d33a8617ff5 100644 --- a/services/director-v2/tests/unit/with_dbs/test_utils_dask.py +++ b/services/director-v2/tests/unit/with_dbs/test_utils_dask.py @@ -35,7 +35,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID, SimCoreFileLink, SimcoreS3FileID from models_library.users import UserID -from pydantic import ByteSize, PositiveInt, TypeAdapter +from pydantic import ByteSize, TypeAdapter from pydantic.networks import AnyUrl from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict @@ -647,7 +647,7 @@ async def test_compute_task_envs( run_metadata: RunMetadataDict, input_task_envs: ContainerEnvsDict, expected_computed_task_envs: ContainerEnvsDict, - comp_task_run_id: PositiveInt, + resource_tracking_run_id: str, ): sleeper_task: CompTaskAtDB = published_project.tasks[1] sleeper_task.image.envs = input_task_envs @@ -659,6 +659,6 @@ async def test_compute_task_envs( node_id=sleeper_task.node_id, node_image=sleeper_task.image, metadata=run_metadata, - run_id=comp_task_run_id, + resource_tracking_run_id=resource_tracking_run_id, ) assert task_envs == expected_computed_task_envs