Skip to content

Commit

Permalink
fixes #229 use Pytest for all tests (#242)
Browse files Browse the repository at this point in the history
* fixes #229 use Pytest for all tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
norbline and pre-commit-ci[bot] authored Mar 26, 2024
1 parent 1aea21d commit d62685d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 135 deletions.
41 changes: 18 additions & 23 deletions tests/test_download.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Tests for satip.download.py."""
import os
import unittest
import pytest

import pandas as pd

Expand All @@ -12,27 +12,22 @@
from satip.utils import format_dt_str


class TestDownload(unittest.TestCase):
class TestDownload():
"""Test case for downloader tests."""

def setUp(self) -> None: # noqa
return super().setUp()

def test_download_eumetsat_data(self): # noqa
# Call the downloader on a very short chunk of data:
self.assertIsNone(
download_eumetsat_data(
download_directory=str(os.getcwd() + "/storage/"),
start_date=format_dt_str("2020-06-01 11:59:00"),
end_date=format_dt_str("2020-06-01 12:02:00"),
user_key=os.environ.get("EUMETSAT_USER_KEY"),
user_secret=os.environ.get("EUMETSAT_USER_SECRET"),
auth_filename=None,
number_of_processes=2,
product=["cloud", "rss"],
enforce_full_days=False,
)
)
assert download_eumetsat_data(
download_directory=str(os.getcwd() + "/storage/"),
start_date=format_dt_str("2020-06-01 11:59:00"),
end_date=format_dt_str("2020-06-01 12:02:00"),
user_key=os.environ.get("EUMETSAT_USER_KEY"),
user_secret=os.environ.get("EUMETSAT_USER_SECRET"),
auth_filename=None,
number_of_processes=2,
product=["cloud", "rss"],
enforce_full_days=False,
) is None

def test_determine_datetime_to_download_files(self):
"""Tests correct day-wise-chunked lists.
Expand All @@ -49,10 +44,10 @@ def test_determine_datetime_to_download_files(self):
print(datetimes)
print(datetimes[0])
print(type(datetimes[0][0]))
self.assertEqual(datetimes[0][0], pd.to_datetime("2020-03-08 11:59:00"))
self.assertEqual(datetimes[0][1], pd.to_datetime("2020-03-09 11:58:00"))
self.assertEqual(datetimes[1][0], pd.to_datetime("2020-03-09 11:59:00"))
self.assertEqual(datetimes[1][1], pd.to_datetime("2020-03-10 11:58:00"))
assert datetimes[0][0] == pd.to_datetime("2020-03-08 11:59:00")
assert datetimes[0][1] == pd.to_datetime("2020-03-09 11:58:00")
assert datetimes[1][0] == pd.to_datetime("2020-03-09 11:59:00")
assert datetimes[1][1] == pd.to_datetime("2020-03-10 11:58:00")

def test_get_missing_datetimes_from_list_of_files(self):
"""Tests padding of datetimes if files present are missing data for given days."""
Expand Down Expand Up @@ -81,4 +76,4 @@ def test_get_missing_datetimes_from_list_of_files(self):
# Note that the function returns datetime-objects, but we defined the expected
# values as pd-datetime. Though their str-repr is different, they still pass
# the equality-check when compared for same dates.
self.assertEqual(boundary, res[i][b])
assert boundary == res[i][b]
132 changes: 62 additions & 70 deletions tests/test_scale_to_zero_to_one.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Tests for scale_to_zero_to_one.py."""
import unittest
import pytest

import numpy as np
import pandas as pd
Expand All @@ -8,101 +8,93 @@
from satip.scale_to_zero_to_one import ScaleToZeroToOne, is_dataset_clean


class TestScaleToZeroToOne(unittest.TestCase):
"""Test class for methods of class scale_to_zero_to_one.ScaleToZeroToOne.
@pytest.fixture
def dataset():
"""Fixture for the dataset setup."""
# Set dimensionality of the fake dataset:
Nx, Ny, Nt = 2, 3, 10

We will set up a mock dataset and try the various methods in ScaleToZeroToOne,
checking whether expected results manifest themselves.
"""
# Generate the fake four-dimensional data:
data = np.zeros((Nx, Ny, Nt, 2))
data[:, :, :, 0] = np.linspace(-10, 10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt)
data[:, :, :, 1] = np.linspace(10, -10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt)

# Set some random values in the middle to NaN:
data[Nx // 2, Ny // 2, Nt // 2, :] = np.nan

def setUp(self) -> None:
"""Set up for all following tests, where a common dataarray is defined."""
# Set dimensionality of the fake dataset:
Nx, Ny, Nt = 2, 3, 10

# Generate the fake four-dimensional data:
data = np.zeros((Nx, Ny, Nt, 2))
data[:, :, :, 0] = np.linspace(-10, 10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt)
data[:, :, :, 1] = np.linspace(10, -10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt)

# Set some random values in the middle to NaN:
data[Nx // 2, Ny // 2, Nt // 2, :] = np.nan

self.dataset = xr.DataArray(
data=data,
coords=dict(
lon=(
["x_geostationary", "y_geostationary"],
np.linspace(0, 1.0, Nx, endpoint=True).reshape((Nx, 1)) + np.zeros((Nx, Ny)),
),
lat=(
["x_geostationary", "y_geostationary"],
np.linspace(-1.0, 0.0, Ny, endpoint=True).reshape((1, Ny)) + np.zeros((Nx, Ny)),
),
time=pd.date_range("2019-03-08", periods=Nt),
dataset = xr.DataArray(
data=data,
coords=dict(
lon=(
["x_geostationary", "y_geostationary"],
np.linspace(0, 1.0, Nx, endpoint=True).reshape((Nx, 1)) + np.zeros((Nx, Ny)),
),
dims=["x_geostationary", "y_geostationary", "time", "variable"],
attrs=dict(
description="Some randomly permutated lines in time and space.\
If you find meaning in this, please see a shrink."
lat=(
["x_geostationary", "y_geostationary"],
np.linspace(-1.0, 0.0, Ny, endpoint=True).reshape((1, Ny)) + np.zeros((Nx, Ny)),
),
)
time=pd.date_range("2019-03-08", periods=Nt),
),
dims=["x_geostationary", "y_geostationary", "time", "variable"],
attrs=dict(
description="Some randomly permutated lines in time and space.\
If you find meaning in this, please see a shrink."
),
)

yield dataset

return super().setUp()

def test_fit(self): # noqa: D102
@pytest.mark.usefixtures('dataset')
class TestScaleToZeroToOne:
"""Test class for methods of class scale_to_zero_to_one.ScaleToZeroToOne.
We will set up a mock dataset and try the various methods in ScaleToZeroToOne,
checking whether expected results manifest themselves.
"""

def test_fit(self, dataset):
scaler = ScaleToZeroToOne(
mins=np.asarray([-5, 0]),
maxs=np.asarray([5, 20]),
variable_order=["wrong_var_name_one", "wrong_var_name_two"],
)
scaler.fit(self.dataset, dims=("x_geostationary", "y_geostationary", "time"))
scaler.fit(dataset, dims=("x_geostationary", "y_geostationary", "time"))

# Test whether the min/max-values are logged:
self.assertListEqual(
scaler.mins.values.tolist(),
self.dataset.min(("x_geostationary", "y_geostationary", "time"))
.compute()
.values.tolist(),
)
self.assertListEqual(
scaler.maxs.values.tolist(),
self.dataset.max(("x_geostationary", "y_geostationary", "time"))
.compute()
.values.tolist(),
)
assert scaler.mins.values.tolist() == dataset.min(
("x_geostationary", "y_geostationary", "time")
).compute().values.tolist()
assert scaler.maxs.values.tolist() == dataset.max(
("x_geostationary", "y_geostationary", "time")
).compute().values.tolist()

# Test whether the initially wrong variable names are set correctly now:
self.assertListEqual(scaler.variable_order.tolist(), [0, 1])
assert scaler.variable_order.tolist() == [0, 1]

def test_rescale(self): # noqa: D102
scaler = ScaleToZeroToOne().fit(
self.dataset, dims=("x_geostationary", "y_geostationary", "time")
)
dataset = scaler.rescale(self.dataset)
def test_rescale(self, dataset):
scaler = ScaleToZeroToOne().fit(dataset, dims=("x_geostationary", "y_geostationary", "time"))
dataset = scaler.rescale(dataset)
scaler.fit(dataset, dims=("x_geostationary", "y_geostationary", "time"))

# Assert that all values are between zero and one:
self.assertListEqual(scaler.mins.values.tolist(), [0, 0])
self.assertListEqual(scaler.maxs.values.tolist(), [1, 1])
assert scaler.mins.values.tolist() == [0, 0]
assert scaler.maxs.values.tolist() == [1, 1]

# Are the NaN still in there?
self.assertTrue(np.isnan(dataset).any())
assert np.isnan(dataset).any()

def test_compress_mask(self): # noqa: D102
def test_compress_mask(self, dataset):
# Generate a dataset and rescale it.
# The result should be a dataset which still contains NaNs.
scaler = ScaleToZeroToOne().fit(
self.dataset, dims=("x_geostationary", "y_geostationary", "time")
)
dataset = scaler.rescale(self.dataset)
scaler = ScaleToZeroToOne().fit(dataset, dims=("x_geostationary", "y_geostationary", "time"))
dataset = scaler.rescale(dataset)

# Now compress the dataset and then check if the NaN-values have been replaced with -1:
# Now compress the dataset and then check if the NaNvalues have been replaced with -1:
dataset = scaler.compress_mask(dataset)

self.assertTrue(dataset.min() == -1)
self.assertFalse(np.isnan(dataset).any())
assert dataset.min() == -1
assert not np.isnan(dataset).any()

# While we are at it, lets also test the is_dataset_clean-method:
self.assertTrue(is_dataset_clean(dataset))
# While we are at it, let's also test the is_dataset_clean-method:
assert is_dataset_clean(dataset)
86 changes: 44 additions & 42 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"""
import glob
import os
import unittest
import pytest
from pathlib import Path

import xarray
Expand All @@ -30,56 +30,58 @@
CLOUD_ID = "EO:EUM:DAT:MSG:RSS-CLM"


class TestSatipUtils(unittest.TestCase):
"""Tests for satip.utils."""

def setUp(self) -> None: # noqa D102
# If there is no downloaded RSS-data-set or cloudmask, then download and store them:
if len(list(glob.glob(os.path.join(os.getcwd(), "*.nat")))) == 0:
from satip import eumetsat

download_manager = eumetsat.DownloadManager(
user_key=USER_KEY,
user_secret=USER_SECRET,
data_dir=os.getcwd(),
)
@pytest.fixture(autouse=True)
def setUp() -> None: # noqa D102
# If there is no downloaded RSS-data-set or cloudmask, then download and store them:
if len(list(glob.glob(os.path.join(os.getcwd(), "*.nat")))) == 0:
from satip import eumetsat

# Download one set of RSS data and one cloudmask and store them on disk:
download_manager.download_date_range(
start_date="2020-06-01 11:59:00", end_date="2020-06-01 12:00:00", product_id=RSS_ID
)
download_manager.download_date_range(
start_date="2020-06-01 11:59:00",
end_date="2020-06-01 12:02:00",
product_id=CLOUD_ID,
)
download_manager = eumetsat.DownloadManager(
user_key=USER_KEY,
user_secret=USER_SECRET,
data_dir=os.getcwd(),
)

# Now that we can be sure that those files exist, we can store the filenames.
# Note: The following fields should now contain the full paths to RSS/cloudmask-data.
# As per 01.03.2022, given above date-range, the files you got should be:
# - [path]/MSG3-SEVI-MSG15-0100-NA-20200601115916.810000000Z-NA.nat for the RSS-data
# - [path]/MSG3-SEVI-MSGCLMK-0100-0100-20200601120000.000000000Z-NA.grb for the cloudmask
# However, there is no guarantee that future API-releases will keep the naming stable.
self.rss_filename = list(glob.glob(os.path.join(os.getcwd(), "*.nat")))[0]
self.cloud_mask_filename = list(glob.glob(os.path.join(os.getcwd(), "*.grb")))[0]
# Download one set of RSS data and one cloudmask and store them on disk:
download_manager.download_date_range(
start_date="2020-06-01 11:59:00", end_date="2020-06-01 12:00:00", product_id=RSS_ID
)
download_manager.download_date_range(
start_date="2020-06-01 11:59:00",
end_date="2020-06-01 12:02:00",
product_id=CLOUD_ID,
)

return super().setUp()
# Now that we can be sure that those files exist, we can store the filenames.
# Note: The following fields should now contain the full paths to RSS/cloudmask-data.
# As per 01.03.2022, given above date-range, the files you got should be:
# - [path]/MSG3-SEVI-MSG15-0100-NA-20200601115916.810000000Z-NA.nat for the RSS-data
# - [path]/MSG3-SEVI-MSGCLMK-0100-0100-20200601120000.000000000Z-NA.grb for the cloudmask
# However, there is no guarantee that future API-releases will keep the naming stable.
rss_filename = list(glob.glob(os.path.join(os.getcwd(), "*.nat")))[0]
cloud_mask_filename = list(glob.glob(os.path.join(os.getcwd(), "*.grb")))[0]

return rss_filename, cloud_mask_filename
@pytest.mark.usefixtures("setup")
class TestSatipUtils:
"""Tests for satip.utils."""

@unittest.skip("Skipping as cloud masks are not being used now")
@pytest.mark.skip("Skipping as cloud masks are not being used now")
def test_load_cloudmask_to_dataarray(self): # noqa D102
for area in ["UK", "RSS"]:
cloudmask_dataarray = load_cloudmask_to_dataarray(
Path(self.cloud_mask_filename), temp_directory=Path(os.getcwd()), area=area
)
self.assertEqual(type(cloudmask_dataarray), xarray.DataArray)
assert isinstance(cloudmask_dataarray, xarray.DataArray)

def test_load_native_to_dataarray(self): # noqa D102
for area in ["UK", "RSS"]:
rss_dataarray, hrv_dataarray = load_native_to_dataarray(
Path(self.rss_filename), temp_directory=Path(os.getcwd()), area=area
)
self.assertEqual(type(rss_dataarray), xarray.DataArray)
self.assertEqual(type(hrv_dataarray), xarray.DataArray)
assert isinstance(rss_dataarray, xarray.DataArray)
assert isinstance(hrv_dataarray, xarray.DataArray)

def test_save_dataarray_to_zarr(self): # noqa D102
# The following is a bit ugly, but since we do not want to lump two tests into one
Expand All @@ -98,22 +100,22 @@ def test_save_dataarray_to_zarr(self): # noqa D102
compressor_name="bz2",
zarr_mode="w",
)
self.assertEqual(1, len(list(glob.glob(zarr_path))))
assert len(list(glob.glob(zarr_path))) == 1

def test_data_quality_filter(self):
test = xarray.Dataset({
"data": (("time", "y", "x"), np.zeros((100, 100, 100)))
})

out = data_quality_filter(test, 0.9)
self.assertFalse(out)
assert not out

test = xarray.Dataset({
"data": (("time", "y", "x"), np.ones((100, 100, 100)))
})

out = data_quality_filter(test, 0.9)
self.assertTrue(out)
assert out

def test_get_latest_subdir_path(self):

Expand All @@ -122,11 +124,11 @@ def test_get_latest_subdir_path(self):
if os.path.exists(expected_latest_folder):
os.rmdir(expected_latest_folder)
latest_path = get_latest_subdir_path(data_folder_name)
self.assertEqual(expected_latest_folder, latest_path)
assert expected_latest_folder == latest_path

self.assertFalse(os.path.exists(latest_path))
assert not os.path.exists(latest_path)
latest_path = get_latest_subdir_path(data_folder_name, mkdir=True)
self.assertEqual(expected_latest_folder, latest_path)
self.assertTrue(os.path.exists(latest_path))
assert expected_latest_folder == latest_path
assert os.path.exists(latest_path)
os.rmdir(latest_path)
os.rmdir(data_folder_name)

0 comments on commit d62685d

Please sign in to comment.