Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
Fix Parquet failure in AWS (#355)
Browse files Browse the repository at this point in the history
Explicitly provide a path for the Parquet files to be stored in S3, instead of
deriving that path from the Zarr path. This requires the use of a new
environment variable for the Lambda function: UG_DIAG_PARQUET. This
should fix the issue with the Parquet files tanking the pipeline, because
that seemed to be an issue with include `../` in the path used to try to
save the files.
  • Loading branch information
esheehan-gsl authored Oct 11, 2023
2 parents 4067621 + 5780906 commit 72494e8
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 30 deletions.
7 changes: 4 additions & 3 deletions src/unified_graphics/etl/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def lambda_handler(event, context):
logger.warning("Object details missing from event {event}")
return ""

upload_bucket = os.environ["UG_DIAG_ZARR"]
zarr_upload = os.environ["UG_DIAG_ZARR"]
parquet_upload = os.environ["UG_DIAG_PARQUET"]

bucket = event["detail"]["bucket"]["name"]
key = unquote_plus(event["detail"]["object"]["key"])
Expand All @@ -99,11 +100,11 @@ def lambda_handler(event, context):

logger.info(
f"Saving {bucket}:{key} to the database and to the Zarr "
f"store at: {upload_bucket}"
f"store at: {zarr_upload}"
)
engine = create_engine(os.environ["FLASK_SQLALCHEMY_DATABASE_URI"])
with Session(engine) as session:
diag.save(session, upload_bucket, data)
diag.save(session, zarr_upload, parquet_upload, data)
engine.dispose()

logger.info(f"Done processing {bucket}:{key}")
35 changes: 30 additions & 5 deletions src/unified_graphics/etl/diag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os.path
import re
from collections import namedtuple
from datetime import datetime
Expand Down Expand Up @@ -246,17 +247,28 @@ def prep_dataframe(ds: xr.Dataset) -> pd.DataFrame:
return df


def save(session: Session, path: Union[Path, str], *args: xr.Dataset):
"""Write one or more xarray Datasets to a Zarr at `path`
def save(
session: Session,
zarr_path: Union[Path, str],
parquet_dir: Union[Path, str],
*args: xr.Dataset,
):
"""Write one or more xarray Datasets to a Zarr, Parquet, and PostgreSQL
The `name` and `loop` variables are used along with the
`initialization_time` (non-dimension) coordinates to define the group to
which the Dataset is written in the Zarr.
Parameters
----------
path : Path
session : sqlalchemy.orm.Session
SQLAlchemy database session
zarr_path : Union[Path, str]
The path to the location of the Zarr
parquet_dir : Union[Path, str]
The path to the location where Parquet files are stored
*args : xarray.Dataset
One or more datasets to save
"""
logger.info("Started saving dataset to Zarr and the DB")
for ds in args:
Expand Down Expand Up @@ -314,8 +326,21 @@ def save(session: Session, path: Union[Path, str], *args: xr.Dataset):
analysis.model = wx_model
session.add(analysis)

logger.info(f"Saving dataset to Zarr at: {path}")
ds.to_zarr(path, group=group, mode="a", consolidated=False)
logger.info(f"Saving dataset to Zarr at: {zarr_path}")
ds.to_zarr(zarr_path, group=group, mode="a", consolidated=False)

parquet_path = os.path.join(
parquet_dir,
"_".join((model, background, system, domain, frequency)),
ds.name,
)
logger.info(f"Saving dataframe to Parquet at: {parquet_path}")
prep_dataframe(ds).to_parquet(
parquet_path,
engine="pyarrow",
index=True,
partition_cols=["loop"],
)

logger.info("Saving dataset to Database")
session.commit()
Expand Down
20 changes: 13 additions & 7 deletions tests/etl/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ def test_handler(
mock_save,
monkeypatch,
):
ug_bucket = "s3://test-bucket/test.zarr"
ug_bucket = "s3://test-bucket/"
ug_zarr = f"{ug_bucket}test.zarr"
dl_bucket = "s3://test-diag-bucket"
key = "diag_t_anl.202301300600.nc4.gz"
db_uri = "postgresql+psycopg://postgres:oranges@localhost:5432/test_uri"

monkeypatch.setenv("UG_DIAG_ZARR", ug_bucket)
monkeypatch.setenv("UG_DIAG_ZARR", ug_zarr)
monkeypatch.setenv("UG_DIAG_PARQUET", ug_bucket)
monkeypatch.setenv("FLASK_SQLALCHEMY_DATABASE_URI", db_uri)

context = {}
Expand All @@ -113,15 +115,17 @@ def test_handler(
mock_load.assert_called_once_with(mock_fetch_record.return_value)
mock_create_engine.assert_called_once_with(db_uri)
mock_save.assert_called_once_with(
mock_session().__enter__(), ug_bucket, mock_load.return_value
mock_session().__enter__(), ug_zarr, ug_bucket, mock_load.return_value
)


def test_handler_no_records(monkeypatch):
context = {}
event = {}
ug_bucket = "s3://test-bucket/test.zarr"
monkeypatch.setenv("UG_DIAG_ZARR", ug_bucket)
ug_bucket = "s3://test-bucket/"
ug_zarr = f"{ug_bucket}test.zarr"
monkeypatch.setenv("UG_DIAG_ZARR", ug_zarr)
monkeypatch.setenv("UG_DIAG_PARQUET", ug_bucket)

result = aws.lambda_handler(event, context)

Expand All @@ -132,11 +136,13 @@ def test_handler_no_records(monkeypatch):
@mock.patch("unified_graphics.etl.diag.load")
@mock.patch("unified_graphics.etl.aws.fetch_record")
def test_handler_skip_second_loop(mock_fetch_record, mock_load, mock_save, monkeypatch):
ug_bucket = "s3://test-bucket/test.zarr"
ug_bucket = "s3://test-bucket/"
ug_zarr = f"{ug_bucket}test.zarr"
dl_bucket = "s3://test-diag-bucket"
key = "diag_t_02.202301300600.nc4.gz"

monkeypatch.setenv("UG_DIAG_ZARR", ug_bucket)
monkeypatch.setenv("UG_DIAG_ZARR", ug_zarr)
monkeypatch.setenv("UG_DIAG_PARQUET", ug_bucket)

context = {}
event = {
Expand Down
26 changes: 11 additions & 15 deletions tests/etl/test_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def dataset_to_table(dataset: xr.Dataset) -> pd.DataFrame:

class TestSaveNew:
@pytest.fixture(scope="class", autouse=True)
def dataset(self, model, test_dataset, session, zarr_file):
def dataset(self, model, test_dataset, session, data_path, zarr_file):
(mdl, system, domain, background, frequency) = model
ps = test_dataset(
variable="ps",
Expand All @@ -61,7 +61,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, ps)
diag.save(session, zarr_file, data_path, ps)

return ps

Expand All @@ -75,7 +75,6 @@ def test_zarr_created(self, model, dataset, zarr_file):

xr.testing.assert_equal(result, dataset)

@pytest.mark.xfail
def test_parquet_created(self, dataframe, parquet_file):
result = pd.read_parquet(
parquet_file / "ps",
Expand Down Expand Up @@ -111,7 +110,7 @@ def test_analysis_created(self, model, session):

class TestAddVariable:
@pytest.fixture(scope="class", autouse=True)
def dataset(self, model, test_dataset, session, zarr_file):
def dataset(self, model, test_dataset, session, data_path, zarr_file):
(mdl, system, domain, background, frequency) = model
ps = test_dataset(
variable="ps",
Expand All @@ -124,7 +123,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, ps)
diag.save(session, zarr_file, data_path, ps)

t = test_dataset(
variable="t",
Expand All @@ -137,7 +136,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, t)
diag.save(session, zarr_file, data_path, t)

return (ps, t)

Expand All @@ -151,7 +150,6 @@ def test_zarr(self, dataset, model, zarr_file, variable, expected):
result = xr.open_zarr(zarr_file, group=group, consolidated=False)
xr.testing.assert_equal(result, dataset[expected])

@pytest.mark.xfail
@pytest.mark.parametrize("variable,expected", (("ps", 0), ("t", 1)))
def test_parquet(self, dataframe, parquet_file, variable, expected):
result = pd.read_parquet(
Expand All @@ -168,7 +166,7 @@ def test_analysis_metadata(self, session):

class TestAddLoop:
@pytest.fixture(scope="class", autouse=True)
def dataset(self, model, test_dataset, session, zarr_file):
def dataset(self, model, test_dataset, session, data_path, zarr_file):
(mdl, system, domain, background, frequency) = model
ges = test_dataset(
variable="ps",
Expand All @@ -181,7 +179,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, ges)
diag.save(session, zarr_file, data_path, ges)

anl = test_dataset(
variable="ps",
Expand All @@ -194,7 +192,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, anl)
diag.save(session, zarr_file, data_path, anl)

return (ges, anl)

Expand All @@ -213,7 +211,6 @@ def test_zarr(self, dataset, model, zarr_file, loop, expected):
result = xr.open_zarr(zarr_file, group=group, consolidated=False)
xr.testing.assert_equal(result, dataset[expected])

@pytest.mark.xfail
@pytest.mark.parametrize("loop,expected", (("ges", 0), ("anl", 1)))
def test_parquet(self, dataframe, parquet_file, loop, expected):
result = pd.read_parquet(
Expand All @@ -230,7 +227,7 @@ def test_analysis_metadata(self, session):

class TestAddAnalysis:
@pytest.fixture(scope="class", autouse=True)
def dataset(self, model, test_dataset, session, zarr_file):
def dataset(self, model, test_dataset, session, data_path, zarr_file):
(mdl, system, domain, background, frequency) = model
first = test_dataset(
variable="ps",
Expand All @@ -243,7 +240,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, first)
diag.save(session, zarr_file, data_path, first)

second = test_dataset(
variable="ps",
Expand All @@ -256,7 +253,7 @@ def dataset(self, model, test_dataset, session, zarr_file):
background=background,
)

diag.save(session, zarr_file, second)
diag.save(session, zarr_file, data_path, second)

return (first, second)

Expand All @@ -274,7 +271,6 @@ def test_zarr(self, dataset, model, zarr_file, init_time, expected):
result = xr.open_zarr(zarr_file, group=group, consolidated=False)
xr.testing.assert_equal(result, dataset[expected])

@pytest.mark.xfail
def test_parquet(self, dataframe, parquet_file):
result = pd.read_parquet(
parquet_file / "ps",
Expand Down

0 comments on commit 72494e8

Please sign in to comment.