diff --git a/pyproject.toml b/pyproject.toml index c023bb5b..539c0710 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "eccodes == 2.38.3", "ecmwf-api-client == 1.6.3", "cfgrib == 0.9.14.1", - "dagster-pipes == 1.8.5", + "dagster-pipes == 1.9.3", "joblib == 1.4.2", "numpy == 2.1.0", "ocf-blosc2 == 0.0.11", diff --git a/src/nwp_consumer/internal/repositories/notification_repositories/dagster.py b/src/nwp_consumer/internal/repositories/notification_repositories/dagster.py index 0998f1f8..04e7e5a9 100644 --- a/src/nwp_consumer/internal/repositories/notification_repositories/dagster.py +++ b/src/nwp_consumer/internal/repositories/notification_repositories/dagster.py @@ -12,7 +12,7 @@ import logging from typing import override -from dagster_pipes import PipesContext, open_dagster_pipes +from dagster_pipes import open_dagster_pipes from returns.result import ResultE, Success from nwp_consumer.internal import entities, ports @@ -28,9 +28,8 @@ def notify( self, message: entities.StoreCreatedNotification | entities.StoreAppendedNotification, ) -> ResultE[str]: - with open_dagster_pipes(): - context = PipesContext.get() - context.report_asset_materialization( + with open_dagster_pipes() as pipesctx: + pipesctx.report_asset_materialization( metadata={ "filename": {"raw_value": message.filename, "type": "text"}, "size_mb": {"raw_value": message.size_mb, "type": "float"},