From 17d4b143e2f65026a2e5bc98de1efcb371814c80 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Mon, 4 Nov 2024 16:07:33 +0000 Subject: [PATCH] feat(service): Enable concurrency specification --- Containerfile | 3 ++- src/nwp_consumer/__init__.py | 2 ++ .../repositories/model_repositories/ecmwf_realtime.py | 2 +- .../repositories/model_repositories/test_noaa_gfs.py | 1 + src/nwp_consumer/internal/services/archiver_service.py | 9 +++++++-- src/nwp_consumer/internal/services/consumer_service.py | 7 ++++++- 6 files changed, 19 insertions(+), 5 deletions(-) diff --git a/Containerfile b/Containerfile index 864e8f49..44bb9324 100644 --- a/Containerfile +++ b/Containerfile @@ -31,7 +31,7 @@ COPY pyproject.toml /_lock/ # This layer is cached until uv.lock or pyproject.toml change. # Delete any unwanted parts of the installed packages to reduce size RUN --mount=type=cache,target=/root/.cache \ - apt-get update && apt-get install build-essential -y && \ + apt-get update && apt-get install gcc -y && \ echo "Creating virtualenv at /venv" && \ conda create -qy -p /venv python=3.12 numcodecs RUN which gcc @@ -49,6 +49,7 @@ RUN echo "Installing dependencies into /venv" && \ COPY . /src RUN --mount=type=cache,target=/root/.cache \ uv pip install --no-deps --python=$UV_PROJECT_ENVIRONMENT /src +RUN uv pip install dllist && python # Copy the virtualenv into a distroless image # * These are small images that only contain the runtime dependencies diff --git a/src/nwp_consumer/__init__.py b/src/nwp_consumer/__init__.py index 4aaa887d..2cba477d 100644 --- a/src/nwp_consumer/__init__.py +++ b/src/nwp_consumer/__init__.py @@ -23,6 +23,8 @@ +-------------------------------+-------------------------------------+---------------------------------------------+ | MODEL_REPOSITORY | The model repository to use. | ceda-metoffice-global | +-------------------------------+-------------------------------------+---------------------------------------------+ +| CONCURRENCY | Whether to use concurrency. | True | ++-------------------------------+-------------------------------------+---------------------------------------------+ Development Documentation diff --git a/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py b/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py index 24f5995a..0e439ca7 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py @@ -198,7 +198,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]: ).with_suffix(".grib").expanduser() # Only download the file if not already present - if not local_path.exists(): + if not local_path.exists() or local_path.stat().st_size == 0: local_path.parent.mkdir(parents=True, exist_ok=True) log.debug("Requesting file from S3 at: '%s'", url) diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py b/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py index 00a7494e..bd4e6b0d 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_gfs.py @@ -108,3 +108,4 @@ class TestCase: max_step=max(NOAAGFSS3ModelRepository.model().expected_coordinates.step), ) self.assertEqual(result, t.expected) + diff --git a/src/nwp_consumer/internal/services/archiver_service.py b/src/nwp_consumer/internal/services/archiver_service.py index 1cbe3d7a..6fb37cae 100644 --- a/src/nwp_consumer/internal/services/archiver_service.py +++ b/src/nwp_consumer/internal/services/archiver_service.py @@ -2,10 +2,11 @@ import dataclasses import logging +import os import pathlib from typing import TYPE_CHECKING, override -from joblib import Parallel +from joblib import Parallel, cpu_count from returns.result import Failure, ResultE, Success from nwp_consumer.internal import entities, ports @@ -86,8 +87,12 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]: amr = amr_result.unwrap() # Create a generator to fetch and process raw data + n_jobs: int = max(cpu_count() - 1, self.mr.repository().max_connections) + if os.getenv("CONCURRENCY", "True").capitalize() == "False": + n_jobs = 1 + log.debug(f"Downloading using {n_jobs} concurrent thread(s)") da_result_generator = Parallel( - n_jobs=self.mr.repository().max_connections - 1, + n_jobs=n_jobs, prefer="threads", return_as="generator_unordered", )(amr.fetch_init_data(it=it)) diff --git a/src/nwp_consumer/internal/services/consumer_service.py b/src/nwp_consumer/internal/services/consumer_service.py index 4ac41390..d72179df 100644 --- a/src/nwp_consumer/internal/services/consumer_service.py +++ b/src/nwp_consumer/internal/services/consumer_service.py @@ -3,6 +3,7 @@ import dataclasses import datetime as dt import logging +import os import pathlib from typing import override @@ -72,9 +73,13 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: )) amr = amr_result.unwrap() + n_jobs: int = max(cpu_count() - 1, self.mr.repository().max_connections) + if os.getenv("CONCURRENCY", "True").capitalize() == "False": + n_jobs = 1 + log.debug(f"Downloading using {n_jobs} concurrent thread(s)") fetch_result_generator = Parallel( # TODO - fix segfault when using multiple threads - n_jobs=max(cpu_count() - 1, self.mr.repository().max_connections), + n_jobs=n_jobs, prefer="threads", return_as="generator_unordered", )(amr.fetch_init_data(it=it))