diff --git a/tests/test_download.py b/tests/test_download.py index 2621d5ba..23d79a43 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -1,6 +1,6 @@ """Tests for satip.download.py.""" import os -import unittest +import pytest import pandas as pd @@ -12,27 +12,22 @@ from satip.utils import format_dt_str -class TestDownload(unittest.TestCase): +class TestDownload(): """Test case for downloader tests.""" - def setUp(self) -> None: # noqa - return super().setUp() - def test_download_eumetsat_data(self): # noqa # Call the downloader on a very short chunk of data: - self.assertIsNone( - download_eumetsat_data( - download_directory=str(os.getcwd() + "/storage/"), - start_date=format_dt_str("2020-06-01 11:59:00"), - end_date=format_dt_str("2020-06-01 12:02:00"), - user_key=os.environ.get("EUMETSAT_USER_KEY"), - user_secret=os.environ.get("EUMETSAT_USER_SECRET"), - auth_filename=None, - number_of_processes=2, - product=["cloud", "rss"], - enforce_full_days=False, - ) - ) + assert download_eumetsat_data( + download_directory=str(os.getcwd() + "/storage/"), + start_date=format_dt_str("2020-06-01 11:59:00"), + end_date=format_dt_str("2020-06-01 12:02:00"), + user_key=os.environ.get("EUMETSAT_USER_KEY"), + user_secret=os.environ.get("EUMETSAT_USER_SECRET"), + auth_filename=None, + number_of_processes=2, + product=["cloud", "rss"], + enforce_full_days=False, + ) is None def test_determine_datetime_to_download_files(self): """Tests correct day-wise-chunked lists. @@ -49,10 +44,10 @@ def test_determine_datetime_to_download_files(self): print(datetimes) print(datetimes[0]) print(type(datetimes[0][0])) - self.assertEqual(datetimes[0][0], pd.to_datetime("2020-03-08 11:59:00")) - self.assertEqual(datetimes[0][1], pd.to_datetime("2020-03-09 11:58:00")) - self.assertEqual(datetimes[1][0], pd.to_datetime("2020-03-09 11:59:00")) - self.assertEqual(datetimes[1][1], pd.to_datetime("2020-03-10 11:58:00")) + assert datetimes[0][0] == pd.to_datetime("2020-03-08 11:59:00") + assert datetimes[0][1] == pd.to_datetime("2020-03-09 11:58:00") + assert datetimes[1][0] == pd.to_datetime("2020-03-09 11:59:00") + assert datetimes[1][1] == pd.to_datetime("2020-03-10 11:58:00") def test_get_missing_datetimes_from_list_of_files(self): """Tests padding of datetimes if files present are missing data for given days.""" @@ -81,4 +76,4 @@ def test_get_missing_datetimes_from_list_of_files(self): # Note that the function returns datetime-objects, but we defined the expected # values as pd-datetime. Though their str-repr is different, they still pass # the equality-check when compared for same dates. - self.assertEqual(boundary, res[i][b]) + assert boundary == res[i][b] diff --git a/tests/test_scale_to_zero_to_one.py b/tests/test_scale_to_zero_to_one.py index f04d14f2..288efbec 100644 --- a/tests/test_scale_to_zero_to_one.py +++ b/tests/test_scale_to_zero_to_one.py @@ -1,5 +1,5 @@ """Tests for scale_to_zero_to_one.py.""" -import unittest +import pytest import numpy as np import pandas as pd @@ -8,101 +8,93 @@ from satip.scale_to_zero_to_one import ScaleToZeroToOne, is_dataset_clean -class TestScaleToZeroToOne(unittest.TestCase): - """Test class for methods of class scale_to_zero_to_one.ScaleToZeroToOne. +@pytest.fixture +def dataset(): + """Fixture for the dataset setup.""" + # Set dimensionality of the fake dataset: + Nx, Ny, Nt = 2, 3, 10 - We will set up a mock dataset and try the various methods in ScaleToZeroToOne, - checking whether expected results manifest themselves. - """ + # Generate the fake four-dimensional data: + data = np.zeros((Nx, Ny, Nt, 2)) + data[:, :, :, 0] = np.linspace(-10, 10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt) + data[:, :, :, 1] = np.linspace(10, -10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt) + + # Set some random values in the middle to NaN: + data[Nx // 2, Ny // 2, Nt // 2, :] = np.nan - def setUp(self) -> None: - """Set up for all following tests, where a common dataarray is defined.""" - # Set dimensionality of the fake dataset: - Nx, Ny, Nt = 2, 3, 10 - - # Generate the fake four-dimensional data: - data = np.zeros((Nx, Ny, Nt, 2)) - data[:, :, :, 0] = np.linspace(-10, 10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt) - data[:, :, :, 1] = np.linspace(10, -10, Nt, endpoint=True) + np.random.rand(Nx, Ny, Nt) - - # Set some random values in the middle to NaN: - data[Nx // 2, Ny // 2, Nt // 2, :] = np.nan - - self.dataset = xr.DataArray( - data=data, - coords=dict( - lon=( - ["x_geostationary", "y_geostationary"], - np.linspace(0, 1.0, Nx, endpoint=True).reshape((Nx, 1)) + np.zeros((Nx, Ny)), - ), - lat=( - ["x_geostationary", "y_geostationary"], - np.linspace(-1.0, 0.0, Ny, endpoint=True).reshape((1, Ny)) + np.zeros((Nx, Ny)), - ), - time=pd.date_range("2019-03-08", periods=Nt), + dataset = xr.DataArray( + data=data, + coords=dict( + lon=( + ["x_geostationary", "y_geostationary"], + np.linspace(0, 1.0, Nx, endpoint=True).reshape((Nx, 1)) + np.zeros((Nx, Ny)), ), - dims=["x_geostationary", "y_geostationary", "time", "variable"], - attrs=dict( - description="Some randomly permutated lines in time and space.\ - If you find meaning in this, please see a shrink." + lat=( + ["x_geostationary", "y_geostationary"], + np.linspace(-1.0, 0.0, Ny, endpoint=True).reshape((1, Ny)) + np.zeros((Nx, Ny)), ), - ) + time=pd.date_range("2019-03-08", periods=Nt), + ), + dims=["x_geostationary", "y_geostationary", "time", "variable"], + attrs=dict( + description="Some randomly permutated lines in time and space.\ + If you find meaning in this, please see a shrink." + ), + ) + + yield dataset - return super().setUp() - def test_fit(self): # noqa: D102 +@pytest.mark.usefixtures('dataset') +class TestScaleToZeroToOne: + """Test class for methods of class scale_to_zero_to_one.ScaleToZeroToOne. + We will set up a mock dataset and try the various methods in ScaleToZeroToOne, + checking whether expected results manifest themselves. + """ + + def test_fit(self, dataset): scaler = ScaleToZeroToOne( mins=np.asarray([-5, 0]), maxs=np.asarray([5, 20]), variable_order=["wrong_var_name_one", "wrong_var_name_two"], ) - scaler.fit(self.dataset, dims=("x_geostationary", "y_geostationary", "time")) + scaler.fit(dataset, dims=("x_geostationary", "y_geostationary", "time")) # Test whether the min/max-values are logged: - self.assertListEqual( - scaler.mins.values.tolist(), - self.dataset.min(("x_geostationary", "y_geostationary", "time")) - .compute() - .values.tolist(), - ) - self.assertListEqual( - scaler.maxs.values.tolist(), - self.dataset.max(("x_geostationary", "y_geostationary", "time")) - .compute() - .values.tolist(), - ) + assert scaler.mins.values.tolist() == dataset.min( + ("x_geostationary", "y_geostationary", "time") + ).compute().values.tolist() + assert scaler.maxs.values.tolist() == dataset.max( + ("x_geostationary", "y_geostationary", "time") + ).compute().values.tolist() # Test whether the initially wrong variable names are set correctly now: - self.assertListEqual(scaler.variable_order.tolist(), [0, 1]) + assert scaler.variable_order.tolist() == [0, 1] - def test_rescale(self): # noqa: D102 - scaler = ScaleToZeroToOne().fit( - self.dataset, dims=("x_geostationary", "y_geostationary", "time") - ) - dataset = scaler.rescale(self.dataset) + def test_rescale(self, dataset): + scaler = ScaleToZeroToOne().fit(dataset, dims=("x_geostationary", "y_geostationary", "time")) + dataset = scaler.rescale(dataset) scaler.fit(dataset, dims=("x_geostationary", "y_geostationary", "time")) # Assert that all values are between zero and one: - self.assertListEqual(scaler.mins.values.tolist(), [0, 0]) - self.assertListEqual(scaler.maxs.values.tolist(), [1, 1]) + assert scaler.mins.values.tolist() == [0, 0] + assert scaler.maxs.values.tolist() == [1, 1] # Are the NaN still in there? - self.assertTrue(np.isnan(dataset).any()) + assert np.isnan(dataset).any() - def test_compress_mask(self): # noqa: D102 + def test_compress_mask(self, dataset): # Generate a dataset and rescale it. # The result should be a dataset which still contains NaNs. - scaler = ScaleToZeroToOne().fit( - self.dataset, dims=("x_geostationary", "y_geostationary", "time") - ) - dataset = scaler.rescale(self.dataset) + scaler = ScaleToZeroToOne().fit(dataset, dims=("x_geostationary", "y_geostationary", "time")) + dataset = scaler.rescale(dataset) - # Now compress the dataset and then check if the NaN-values have been replaced with -1: + # Now compress the dataset and then check if the NaNvalues have been replaced with -1: dataset = scaler.compress_mask(dataset) - self.assertTrue(dataset.min() == -1) - self.assertFalse(np.isnan(dataset).any()) + assert dataset.min() == -1 + assert not np.isnan(dataset).any() - # While we are at it, lets also test the is_dataset_clean-method: - self.assertTrue(is_dataset_clean(dataset)) + # While we are at it, let's also test the is_dataset_clean-method: + assert is_dataset_clean(dataset) diff --git a/tests/test_utils.py b/tests/test_utils.py index 0eb47ba5..ca98d3e1 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -9,7 +9,7 @@ """ import glob import os -import unittest +import pytest from pathlib import Path import xarray @@ -30,56 +30,58 @@ CLOUD_ID = "EO:EUM:DAT:MSG:RSS-CLM" -class TestSatipUtils(unittest.TestCase): - """Tests for satip.utils.""" - - def setUp(self) -> None: # noqa D102 - # If there is no downloaded RSS-data-set or cloudmask, then download and store them: - if len(list(glob.glob(os.path.join(os.getcwd(), "*.nat")))) == 0: - from satip import eumetsat - download_manager = eumetsat.DownloadManager( - user_key=USER_KEY, - user_secret=USER_SECRET, - data_dir=os.getcwd(), - ) +@pytest.fixture(autouse=True) +def setUp() -> None: # noqa D102 + # If there is no downloaded RSS-data-set or cloudmask, then download and store them: + if len(list(glob.glob(os.path.join(os.getcwd(), "*.nat")))) == 0: + from satip import eumetsat - # Download one set of RSS data and one cloudmask and store them on disk: - download_manager.download_date_range( - start_date="2020-06-01 11:59:00", end_date="2020-06-01 12:00:00", product_id=RSS_ID - ) - download_manager.download_date_range( - start_date="2020-06-01 11:59:00", - end_date="2020-06-01 12:02:00", - product_id=CLOUD_ID, - ) + download_manager = eumetsat.DownloadManager( + user_key=USER_KEY, + user_secret=USER_SECRET, + data_dir=os.getcwd(), + ) - # Now that we can be sure that those files exist, we can store the filenames. - # Note: The following fields should now contain the full paths to RSS/cloudmask-data. - # As per 01.03.2022, given above date-range, the files you got should be: - # - [path]/MSG3-SEVI-MSG15-0100-NA-20200601115916.810000000Z-NA.nat for the RSS-data - # - [path]/MSG3-SEVI-MSGCLMK-0100-0100-20200601120000.000000000Z-NA.grb for the cloudmask - # However, there is no guarantee that future API-releases will keep the naming stable. - self.rss_filename = list(glob.glob(os.path.join(os.getcwd(), "*.nat")))[0] - self.cloud_mask_filename = list(glob.glob(os.path.join(os.getcwd(), "*.grb")))[0] + # Download one set of RSS data and one cloudmask and store them on disk: + download_manager.download_date_range( + start_date="2020-06-01 11:59:00", end_date="2020-06-01 12:00:00", product_id=RSS_ID + ) + download_manager.download_date_range( + start_date="2020-06-01 11:59:00", + end_date="2020-06-01 12:02:00", + product_id=CLOUD_ID, + ) - return super().setUp() + # Now that we can be sure that those files exist, we can store the filenames. + # Note: The following fields should now contain the full paths to RSS/cloudmask-data. + # As per 01.03.2022, given above date-range, the files you got should be: + # - [path]/MSG3-SEVI-MSG15-0100-NA-20200601115916.810000000Z-NA.nat for the RSS-data + # - [path]/MSG3-SEVI-MSGCLMK-0100-0100-20200601120000.000000000Z-NA.grb for the cloudmask + # However, there is no guarantee that future API-releases will keep the naming stable. + rss_filename = list(glob.glob(os.path.join(os.getcwd(), "*.nat")))[0] + cloud_mask_filename = list(glob.glob(os.path.join(os.getcwd(), "*.grb")))[0] + + return rss_filename, cloud_mask_filename +@pytest.mark.usefixtures("setup") +class TestSatipUtils: + """Tests for satip.utils.""" - @unittest.skip("Skipping as cloud masks are not being used now") + @pytest.mark.skip("Skipping as cloud masks are not being used now") def test_load_cloudmask_to_dataarray(self): # noqa D102 for area in ["UK", "RSS"]: cloudmask_dataarray = load_cloudmask_to_dataarray( Path(self.cloud_mask_filename), temp_directory=Path(os.getcwd()), area=area ) - self.assertEqual(type(cloudmask_dataarray), xarray.DataArray) + assert isinstance(cloudmask_dataarray, xarray.DataArray) def test_load_native_to_dataarray(self): # noqa D102 for area in ["UK", "RSS"]: rss_dataarray, hrv_dataarray = load_native_to_dataarray( Path(self.rss_filename), temp_directory=Path(os.getcwd()), area=area ) - self.assertEqual(type(rss_dataarray), xarray.DataArray) - self.assertEqual(type(hrv_dataarray), xarray.DataArray) + assert isinstance(rss_dataarray, xarray.DataArray) + assert isinstance(hrv_dataarray, xarray.DataArray) def test_save_dataarray_to_zarr(self): # noqa D102 # The following is a bit ugly, but since we do not want to lump two tests into one @@ -98,7 +100,7 @@ def test_save_dataarray_to_zarr(self): # noqa D102 compressor_name="bz2", zarr_mode="w", ) - self.assertEqual(1, len(list(glob.glob(zarr_path)))) + assert len(list(glob.glob(zarr_path))) == 1 def test_data_quality_filter(self): test = xarray.Dataset({ @@ -106,14 +108,14 @@ def test_data_quality_filter(self): }) out = data_quality_filter(test, 0.9) - self.assertFalse(out) + assert not out test = xarray.Dataset({ "data": (("time", "y", "x"), np.ones((100, 100, 100))) }) out = data_quality_filter(test, 0.9) - self.assertTrue(out) + assert out def test_get_latest_subdir_path(self): @@ -122,11 +124,11 @@ def test_get_latest_subdir_path(self): if os.path.exists(expected_latest_folder): os.rmdir(expected_latest_folder) latest_path = get_latest_subdir_path(data_folder_name) - self.assertEqual(expected_latest_folder, latest_path) + assert expected_latest_folder == latest_path - self.assertFalse(os.path.exists(latest_path)) + assert not os.path.exists(latest_path) latest_path = get_latest_subdir_path(data_folder_name, mkdir=True) - self.assertEqual(expected_latest_folder, latest_path) - self.assertTrue(os.path.exists(latest_path)) + assert expected_latest_folder == latest_path + assert os.path.exists(latest_path) os.rmdir(latest_path) os.rmdir(data_folder_name)