diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index 70f7f55..f473bdd 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -14,6 +14,9 @@ "branches": [ "HEAD" ], + "install_command": [ + "python -mpip install -r requirements.txt {wheel_file}" + ], "build_command": [ "python -m build --wheel -o {build_cache_dir} {build_dir}" ], diff --git a/pyproject.toml b/pyproject.toml index f8d2800..75d2547 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,4 +42,9 @@ target-version = ["py38"] [tool.isort] profile = "black" -line_length = 110 \ No newline at end of file +line_length = 110 + +[tool.pytest.ini_options] +markers = [ + "dask: mark tests as having a dask client runtime dependency", +] \ No newline at end of file diff --git a/src/hipscat_cloudtests/__init__.py b/src/hipscat_cloudtests/__init__.py index c8236d7..cf375ef 100644 --- a/src/hipscat_cloudtests/__init__.py +++ b/src/hipscat_cloudtests/__init__.py @@ -2,5 +2,5 @@ __all__ = ["greetings", "meaning"] -from .file_checks import assert_text_file_matches +from .file_checks import assert_parquet_file_ids, assert_text_file_matches from .temp_cloud_directory import TempCloudDirectory diff --git a/src/hipscat_cloudtests/file_checks.py b/src/hipscat_cloudtests/file_checks.py index 7db867e..7e366e9 100644 --- a/src/hipscat_cloudtests/file_checks.py +++ b/src/hipscat_cloudtests/file_checks.py @@ -2,6 +2,8 @@ import re +import numpy.testing as npt +import pandas as pd from hipscat.io.file_io.file_io import load_text_file from hipscat.io.file_io.file_pointer import does_file_or_directory_exist @@ -35,3 +37,30 @@ def assert_text_file_matches(expected_lines, file_name, storage_options: dict = assert re.match(expected, contents[i]), ( f"files do not match at line {i+1} " f"(actual: [{contents[i]}] vs expected: [{expected}])" ) + + +def assert_parquet_file_ids( + file_name, id_column, expected_ids, resort_ids=True, storage_options: dict = None +): + """ + Convenience method to read a parquet file and compare the object IDs to + a list of expected objects. + + Args: + file_name (str): fully-specified path of the file to read + id_column (str): column in the parquet file to read IDs from + expected_ids (:obj:`int[]`): list of expected ids in `id_column` + resort_ids (bool): should we re-sort the ids? if False, we will check that the ordering + is the same between the read IDs and expected_ids + storage_options (dict): dictionary of filesystem storage options + """ + data_frame = pd.read_parquet(file_name, engine="pyarrow", storage_options=storage_options) + assert id_column in data_frame.columns + ids = data_frame[id_column].tolist() + if resort_ids: + ids.sort() + expected_ids.sort() + + assert len(ids) == len(expected_ids), f"object list not the same size ({len(ids)} vs {len(expected_ids)})" + + npt.assert_array_equal(ids, expected_ids) diff --git a/src/hipscat_cloudtests/temp_cloud_directory.py b/src/hipscat_cloudtests/temp_cloud_directory.py index 044f788..e052908 100644 --- a/src/hipscat_cloudtests/temp_cloud_directory.py +++ b/src/hipscat_cloudtests/temp_cloud_directory.py @@ -2,6 +2,7 @@ to some unit test execution.""" import os +import time import shortuuid from hipscat.io.file_io import file_io @@ -36,10 +37,25 @@ def __enter__(self): / """ my_uuid = shortuuid.uuid() - self.temp_path = os.path.join(self.prefix_path, f"{self.method_name}{my_uuid}") + self.temp_path = os.path.join(self.prefix_path, f"{self.method_name}-{my_uuid}") return self.temp_path def __exit__(self, exc_type, exc_val, exc_tb): - """Recursively delete the created resources.""" + """Recursively delete the created resources. + + This will try to delete 3 times, with exponential backoff. + We give up after the third attempt.""" + sleep_time = 2 if self.temp_path: - file_io.remove_directory(self.temp_path, ignore_errors=True, storage_options=self.storage_options) + for attempt_number in range(3): + ## Try + try: + file_io.remove_directory(self.temp_path, storage_options=self.storage_options) + return + except RuntimeError: + if attempt_number == 2: + print(f"Failed to remove directory {self.temp_path}. Giving up.") + return + print(f"Failed to remove directory {self.temp_path}. Trying again.") + time.sleep(sleep_time) + sleep_time *= 2 diff --git a/tests/conftest.py b/tests/conftest.py index 40628f7..8c09079 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,6 +41,11 @@ def example_cloud_storage_options(cloud): @pytest.fixture -def small_sky_dir_local(): - cloud_test_path = os.path.dirname(__file__) - return os.path.join(cloud_test_path, "data", SMALL_SKY_DIR_NAME) +def local_data_dir(): + local_data_path = os.path.dirname(__file__) + return os.path.join(local_data_path, "data") + + +@pytest.fixture +def small_sky_dir_local(local_data_dir): + return os.path.join(local_data_dir, SMALL_SKY_DIR_NAME) diff --git a/tests/data/small_sky_parts/catalog_00_of_05.csv b/tests/data/small_sky_parts/catalog_00_of_05.csv new file mode 100644 index 0000000..cb2e278 --- /dev/null +++ b/tests/data/small_sky_parts/catalog_00_of_05.csv @@ -0,0 +1,26 @@ +id,ra,dec,ra_error,dec_error +700,282.5,-58.5,0,0 +701,299.5,-48.5,0,0 +702,310.5,-27.5,0,0 +703,286.5,-69.5,0,0 +704,326.5,-45.5,0,0 +705,335.5,-32.5,0,0 +706,297.5,-36.5,0,0 +707,308.5,-69.5,0,0 +708,307.5,-37.5,0,0 +709,294.5,-45.5,0,0 +710,341.5,-39.5,0,0 +711,305.5,-49.5,0,0 +712,288.5,-49.5,0,0 +713,298.5,-41.5,0,0 +714,303.5,-37.5,0,0 +715,280.5,-35.5,0,0 +716,305.5,-60.5,0,0 +717,303.5,-43.5,0,0 +718,292.5,-60.5,0,0 +719,344.5,-39.5,0,0 +720,344.5,-47.5,0,0 +721,314.5,-34.5,0,0 +722,350.5,-58.5,0,0 +723,315.5,-68.5,0,0 +724,323.5,-41.5,0,0 \ No newline at end of file diff --git a/tests/data/small_sky_parts/catalog_01_of_05.csv b/tests/data/small_sky_parts/catalog_01_of_05.csv new file mode 100644 index 0000000..091e961 --- /dev/null +++ b/tests/data/small_sky_parts/catalog_01_of_05.csv @@ -0,0 +1,27 @@ +id,ra,dec,ra_error,dec_error +725,308.5,-41.5,0,0 +726,341.5,-37.5,0,0 +727,301.5,-44.5,0,0 +728,328.5,-47.5,0,0 +729,299.5,-59.5,0,0 +730,322.5,-61.5,0,0 +731,343.5,-52.5,0,0 +732,337.5,-39.5,0,0 +733,329.5,-65.5,0,0 +734,348.5,-66.5,0,0 +735,299.5,-65.5,0,0 +736,303.5,-52.5,0,0 +737,316.5,-33.5,0,0 +738,345.5,-64.5,0,0 +739,332.5,-57.5,0,0 +740,306.5,-33.5,0,0 +741,303.5,-38.5,0,0 +742,348.5,-45.5,0,0 +743,307.5,-25.5,0,0 +744,349.5,-39.5,0,0 +745,337.5,-38.5,0,0 +746,283.5,-31.5,0,0 +747,327.5,-61.5,0,0 +748,296.5,-63.5,0,0 +749,293.5,-55.5,0,0 +750,338.5,-67.5,0,0 diff --git a/tests/data/small_sky_parts/catalog_02_of_05.csv b/tests/data/small_sky_parts/catalog_02_of_05.csv new file mode 100644 index 0000000..befddf5 --- /dev/null +++ b/tests/data/small_sky_parts/catalog_02_of_05.csv @@ -0,0 +1,27 @@ +id,ra,dec,ra_error,dec_error +751,330.5,-44.5,0,0 +752,291.5,-34.5,0,0 +753,307.5,-45.5,0,0 +754,313.5,-30.5,0,0 +755,303.5,-38.5,0,0 +756,319.5,-35.5,0,0 +757,346.5,-34.5,0,0 +758,325.5,-53.5,0,0 +759,290.5,-48.5,0,0 +760,320.5,-53.5,0,0 +761,329.5,-29.5,0,0 +762,327.5,-51.5,0,0 +763,306.5,-38.5,0,0 +764,297.5,-45.5,0,0 +765,306.5,-35.5,0,0 +766,310.5,-63.5,0,0 +767,314.5,-29.5,0,0 +768,297.5,-60.5,0,0 +769,307.5,-42.5,0,0 +770,285.5,-29.5,0,0 +771,348.5,-67.5,0,0 +772,348.5,-64.5,0,0 +773,293.5,-50.5,0,0 +774,281.5,-54.5,0,0 +775,321.5,-54.5,0,0 +776,344.5,-63.5,0,0 diff --git a/tests/data/small_sky_parts/catalog_03_of_05.csv b/tests/data/small_sky_parts/catalog_03_of_05.csv new file mode 100644 index 0000000..5aaced6 --- /dev/null +++ b/tests/data/small_sky_parts/catalog_03_of_05.csv @@ -0,0 +1,27 @@ +id,ra,dec,ra_error,dec_error +777,307.5,-39.5,0,0 +778,313.5,-36.5,0,0 +779,347.5,-29.5,0,0 +780,326.5,-52.5,0,0 +781,330.5,-46.5,0,0 +782,290.5,-39.5,0,0 +783,286.5,-42.5,0,0 +784,338.5,-40.5,0,0 +785,296.5,-44.5,0,0 +786,336.5,-33.5,0,0 +787,320.5,-47.5,0,0 +788,283.5,-61.5,0,0 +789,287.5,-45.5,0,0 +790,286.5,-35.5,0,0 +791,312.5,-28.5,0,0 +792,320.5,-69.5,0,0 +793,289.5,-58.5,0,0 +794,300.5,-66.5,0,0 +795,306.5,-58.5,0,0 +796,320.5,-33.5,0,0 +797,308.5,-62.5,0,0 +798,316.5,-36.5,0,0 +799,313.5,-31.5,0,0 +800,299.5,-37.5,0,0 +801,309.5,-50.5,0,0 +802,304.5,-49.5,0,0 diff --git a/tests/data/small_sky_parts/catalog_04_of_05.csv b/tests/data/small_sky_parts/catalog_04_of_05.csv new file mode 100644 index 0000000..6bbab46 --- /dev/null +++ b/tests/data/small_sky_parts/catalog_04_of_05.csv @@ -0,0 +1,29 @@ +id,ra,dec,ra_error,dec_error +803,336.5,-25.5,0,0 +804,322.5,-66.5,0,0 +805,297.5,-52.5,0,0 +806,312.5,-29.5,0,0 +807,303.5,-60.5,0,0 +808,320.5,-40.5,0,0 +809,283.5,-34.5,0,0 +810,301.5,-59.5,0,0 +811,315.5,-68.5,0,0 +812,346.5,-60.5,0,0 +813,349.5,-37.5,0,0 +814,312.5,-33.5,0,0 +815,283.5,-68.5,0,0 +816,288.5,-69.5,0,0 +817,318.5,-48.5,0,0 +818,300.5,-55.5,0,0 +819,313.5,-35.5,0,0 +820,286.5,-46.5,0,0 +821,330.5,-52.5,0,0 +822,301.5,-54.5,0,0 +823,338.5,-45.5,0,0 +824,305.5,-28.5,0,0 +825,315.5,-30.5,0,0 +826,335.5,-69.5,0,0 +827,310.5,-40.5,0,0 +828,330.5,-26.5,0,0 +829,314.5,-35.5,0,0 +830,306.5,-50.5,0,0 diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py new file mode 100644 index 0000000..55f917b --- /dev/null +++ b/tests/hipscat_import/conftest.py @@ -0,0 +1,71 @@ +import os +import os.path + +import pytest +from dask.distributed import Client + +DATA_DIR_NAME = "data" +SMALL_SKY_DIR_NAME = "small_sky" + + +@pytest.fixture(scope="session", name="dask_client") +def dask_client(): + """Create a single client for use by all unit test cases.""" + client = Client() + yield client + client.close() + + +def pytest_collection_modifyitems(items): + """Modify dask unit tests to + - ignore event loop deprecation warnings + - have a longer timeout default timeout (5 seconds instead of 1 second) + - require use of the `dask_client` fixture, even if it's not requested + + Individual tests that will be particularly long-running can still override + the default timeout, by using an annotation like: + + @pytest.mark.dask(timeout=10) + def test_long_running(): + ... + """ + first_dask = True + for item in items: + timeout = None + for mark in item.iter_markers(name="dask"): + timeout = 15 + if "timeout" in mark.kwargs: + timeout = int(mark.kwargs.get("timeout")) + if timeout: + if first_dask: + ## The first test requires more time to set up the dask/ray client + timeout += 10 + first_dask = False + item.add_marker(pytest.mark.timeout(timeout)) + item.add_marker(pytest.mark.usefixtures("dask_client")) + item.add_marker(pytest.mark.filterwarnings("ignore::DeprecationWarning")) + + +@pytest.fixture +def tmp_dir_cloud(example_cloud_path): + return os.path.join(example_cloud_path, "hipscat_import", "tmp") + + +@pytest.fixture +def test_data_dir_cloud(example_cloud_path): + return os.path.join(example_cloud_path, "hipscat_import", DATA_DIR_NAME) + + +@pytest.fixture +def small_sky_parts_dir_cloud(test_data_dir_cloud): + return os.path.join(test_data_dir_cloud, "small_sky_parts") + + +@pytest.fixture +def small_sky_parts_dir_local(local_data_dir): + return os.path.join(local_data_dir, "small_sky_parts") + + +@pytest.fixture +def small_sky_catalog_dir_cloud(test_data_dir_cloud): + return os.path.join(test_data_dir_cloud, "small_sky") diff --git a/tests/hipscat_import/test_run_catalog_import.py b/tests/hipscat_import/test_run_catalog_import.py new file mode 100644 index 0000000..573c4b6 --- /dev/null +++ b/tests/hipscat_import/test_run_catalog_import.py @@ -0,0 +1,106 @@ +"""Functional tests for catalog import""" + +import os + +import hipscat_import.catalog.run_import as runner +import pytest +from hipscat.catalog.catalog import Catalog +from hipscat_import.catalog.arguments import ImportArguments +from hipscat_import.catalog.file_readers import CsvReader + +from hipscat_cloudtests import TempCloudDirectory, assert_parquet_file_ids + + +@pytest.mark.dask +def test_catalog_import_write_to_cloud( + dask_client, + small_sky_parts_dir_local, + tmp_dir_cloud, + example_cloud_storage_options, + tmp_path, +): + """Using local CSV files, write a new catalog to the cloud bucket.""" + with TempCloudDirectory( + tmp_dir_cloud, "write_catalog_to_cloud", example_cloud_storage_options + ) as temp_path: + args = ImportArguments( + output_artifact_name="small_sky_object_catalog", + input_path=small_sky_parts_dir_local, + output_storage_options=example_cloud_storage_options, + input_format="csv", + output_path=temp_path, + dask_tmp=tmp_path, + highest_healpix_order=1, + progress_bar=False, + overwrite=True, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path, storage_options=example_cloud_storage_options) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.ra_column == "ra" + assert catalog.catalog_info.dec_column == "dec" + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet") + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids( + output_file, "id", expected_ids, storage_options=example_cloud_storage_options + ) + + +@pytest.mark.dask +def test_catalog_import_read_from_cloud( + dask_client, + small_sky_parts_dir_cloud, + example_cloud_storage_options, + tmp_path, +): + """Using cloud CSV files, write a new catalog to local disk.""" + args = ImportArguments( + output_artifact_name="small_sky_object_catalog", + input_path=small_sky_parts_dir_cloud, + input_storage_options=example_cloud_storage_options, + input_format="csv", + file_reader=CsvReader( + storage_options=example_cloud_storage_options, + ), + output_path=tmp_path, + dask_tmp=tmp_path, + highest_healpix_order=1, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.ra_column == "ra" + assert catalog.catalog_info.dec_column == "dec" + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet") + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(output_file, "id", expected_ids) + + +def test_read_csv_cloud(example_cloud_storage_options, small_sky_parts_dir_cloud): + """Verify we can read the csv file into a single data frame.""" + single_file = os.path.join(small_sky_parts_dir_cloud, "catalog_00_of_05.csv") + total_chunks = 0 + for frame in CsvReader(storage_options=example_cloud_storage_options).read(single_file): + total_chunks += 1 + assert len(frame) == 25 + + assert total_chunks == 1