Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style(consumer): Convert to railway programming #211

Merged
merged 7 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
"numpy == 2.1.0",
"ocf-blosc2 == 0.0.11",
"psutil == 6.0.0",
"returns == 0.23.0",
"returns == 0.24.0",
"s3fs == 2024.9.0",
"xarray == 2024.9.0",
"zarr == 2.18.3"
Expand Down
13 changes: 3 additions & 10 deletions src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import NamedTuple

from nwp_consumer.internal import handlers, ports, repositories, services
from nwp_consumer.internal import handlers, ports, repositories

log = logging.getLogger("nwp-consumer")

Expand Down Expand Up @@ -61,17 +61,10 @@ def parse_env() -> Adaptors:

def run_cli() -> None:
"""Entrypoint for the CLI handler."""
# TODO: InfoUseCase
adaptors = parse_env()
c = handlers.CLIHandler(
consumer_usecase=services.ConsumerService(
model_repository=adaptors.model_repository,
notification_repository=adaptors.notification_repository,
),
archiver_usecase=services.ArchiverService(
model_repository=adaptors.model_repository,
notification_repository=adaptors.notification_repository,
),
model_adaptor=adaptors.model_repository,
notification_adaptor=adaptors.notification_repository,
)
returncode: int = c.run()
sys.exit(returncode)
Expand Down
4 changes: 3 additions & 1 deletion src/nwp_consumer/internal/entities/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class PerformanceMonitor(Thread):
memory_buffer: list[int]
cpu_buffer: list[float]
start_time: float
end_time: float
end_time: float | None
stop: bool = True

def __enter__(self) -> None:
Expand Down Expand Up @@ -60,6 +60,8 @@ def get_usage(self) -> tuple[int, float]:

def get_runtime(self) -> int:
"""Get the runtime of the thread in seconds."""
if self.end_time is None:
return int(time.time() - self.start_time)
return int(self.end_time - self.start_time)

def run(self) -> None:
Expand Down
61 changes: 34 additions & 27 deletions src/nwp_consumer/internal/handlers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,27 @@
import datetime as dt
import logging

from returns.result import Failure, Success
from returns.result import Failure, ResultE

from nwp_consumer.internal import ports
from nwp_consumer.internal import ports, services

log = logging.getLogger("nwp-consumer")


class CLIHandler:
"""CLI driving actor."""

model_adaptor: type[ports.ModelRepository]
notification_adaptor: type[ports.NotificationRepository]

def __init__(
self,
consumer_usecase: ports.ConsumeUseCase,
archiver_usecase: ports.ArchiveUseCase,
) -> None:
model_adaptor: type[ports.ModelRepository],
notification_adaptor: type[ports.NotificationRepository],
) -> None:
"""Create a new instance."""
self._consumer_usecase = consumer_usecase
self._archiver_usecase = archiver_usecase

self.model_adaptor = model_adaptor
self.notification_adaptor = notification_adaptor

@property
def parser(self) -> argparse.ArgumentParser:
Expand Down Expand Up @@ -82,30 +84,35 @@ def run(self) -> int:
args = self.parser.parse_args()
match args.command:
case "consume":
result = self._consumer_usecase.consume(it=args.init_time)

match result:
case Failure(e):
log.error(f"Failed to consume NWP data: {e}")
return 1
case Success(path):
log.info(f"Successfully consumed NWP data to '{path}'")
return 0
service_result = services.ConsumerService.from_adaptors(
model_adaptor=self.model_adaptor,
notification_adaptor=self.notification_adaptor,
)
result: ResultE[str] = service_result.do(
consume_result
for service in service_result
for consume_result in service.consume(period=args.init_time)
)
if isinstance(result, Failure):
log.error(f"Failed to consume NWP data: {result!s}")
return 1

case "archive":
result = self._archiver_usecase.archive(year=args.year, month=args.month)

match result:
case Failure(e):
log.error(f"Failed to archive NWP data: {e}")
return 1
case Success(path):
log.info(f"Successfully archived NWP data to '{path}'")
return 0
service_result = services.ConsumerService.from_adaptors(
model_adaptor=self.model_adaptor,
notification_adaptor=self.notification_adaptor,
)
result = service_result.do(
consume_result
for service in service_result
for consume_result in service.consume(period=args.init_time)
)
if isinstance(result, Failure):
log.error(f"Failed to archive NWP data: {result!s}")
return 1

case "info":
log.error("Info command is coming soon! :)")
return 0

case _:
log.error(f"Unknown command: {args.command}")
Expand Down
6 changes: 2 additions & 4 deletions src/nwp_consumer/internal/ports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
in the `repositories` module.
"""

from .services import ConsumeUseCase, ArchiveUseCase
from .repositories import ModelRepository, ZarrRepository, NotificationRepository
from .services import ConsumeUseCase
from .repositories import ModelRepository, NotificationRepository

__all__ = [
"ConsumeUseCase",
"ArchiveUseCase",
"ModelRepository",
"ZarrRepository",
"NotificationRepository",
]
11 changes: 0 additions & 11 deletions src/nwp_consumer/internal/ports/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import abc
import datetime as dt
import logging
import pathlib
from collections.abc import Callable, Iterator

import xarray as xr
Expand Down Expand Up @@ -124,16 +123,6 @@ def model() -> entities.ModelMetadata:
"""Metadata about the model."""
pass


class ZarrRepository(abc.ABC):
"""Interface for a repository that stores Zarr NWP data."""

@abc.abstractmethod
def save(self, src: pathlib.Path, dst: pathlib.Path) -> ResultE[str]:
"""Save NWP store data in the repository."""
pass


class NotificationRepository(abc.ABC):
"""Interface for a repository that sends notifications.

Expand Down
55 changes: 7 additions & 48 deletions src/nwp_consumer/internal/ports/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
These interfaces define the signatures that *driving* actors must conform to
in order to interact with the core.

Also sometimes referred to as *primary ports*.
Sometimes referred to as *primary ports*.
"""

import abc
import datetime as dt

from returns.result import ResultE

from nwp_consumer.internal import entities


class ConsumeUseCase(abc.ABC):
"""Interface for the consumer use case.
Expand All @@ -24,16 +22,15 @@ class ConsumeUseCase(abc.ABC):


@abc.abstractmethod
def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
"""Consume NWP data to Zarr format for desired init time.
def consume(self, period: dt.datetime | dt.date | None = None) -> ResultE[str]:
"""Consume NWP data to Zarr format for desired time period.

Where possible the implementation should be as memory-efficient as possible.
The designs of the repository methods also enable parallel processing within
the implementation.

Args:
it: The initialization time for which to consume data.
If None, the latest available forecast should be consumed.
period: The period for which to gather init time data.

Returns:
The path to the produced Zarr store.
Expand All @@ -46,51 +43,13 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
pass

@abc.abstractmethod
def postprocess(self, options: entities.PostProcessOptions) -> ResultE[str]:
"""Postprocess the produced Zarr according to given options."""
pass


class ArchiveUseCase(abc.ABC):
"""Interface for the archive use case.

Defines the business-critical methods for the following use cases:

- 'A user should be able to archive NWP data for a given time period.'
"""

@abc.abstractmethod
def archive(self, year: int, month: int) -> ResultE[str]:
"""Archive NWP data to Zarr format for the given month.
def archive(self, period: dt.date) -> ResultE[str]:
"""Archive NWP data to Zarr format for desired time period.

Args:
year: The year for which to archive data.
month: The month for which to archive data.
period: The period for which to gather init time data.

Returns:
The path to the produced Zarr store.
"""
pass

class InfoUseCase(abc.ABC):
"""Interface for the notification use case.

Defines the business-critical methods for the following use cases:

- 'A user should be able to retrieve information about the service.'
"""

@abc.abstractmethod
def available_models(self) -> list[str]:
"""Get a list of available models."""
pass

@abc.abstractmethod
def model_repository_info(self) -> str:
"""Get information about the model repository."""
pass

@abc.abstractmethod
def model_info(self) -> str:
"""Get information about the model."""
pass
Loading
Loading