diff --git a/docs/tutorials.rst b/docs/tutorials.rst index d97a276a..9fafffee 100644 --- a/docs/tutorials.rst +++ b/docs/tutorials.rst @@ -8,6 +8,7 @@ functionality. Working with the TAPE Ensemble object Working with the TAPE Timeseries object + Working with HiPSCat and LSDB data Scaling to Large Data Volume Working with Structure Function Binning Sources in the Ensemble diff --git a/docs/tutorials/working_with_hipscat_and_lsdb.ipynb b/docs/tutorials/working_with_hipscat_and_lsdb.ipynb new file mode 100644 index 00000000..2f130656 --- /dev/null +++ b/docs/tutorials/working_with_hipscat_and_lsdb.ipynb @@ -0,0 +1,255 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "rel_path = \"../../tests/tape_tests/data/small_sky_hipscat\"" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using TAPE with LSDB and HiPSCat Data" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The [Hierarchical Progressive Survey Catalog (HiPSCat)](https://hipscat.readthedocs.io/en/latest/) format is a partitioning of objects on a sphere. Its purpose is for storing data from large astronomy surveys, with the main feature being the adaptive sizing of partitions based on the number of objects in a given region of the sky, using [healpix](https://healpix.jpl.nasa.gov/).\n", + "\n", + "The [Large Survey Database (LSDB)](https://lsdb.readthedocs.io/en/latest/) is a framework that facilitates and enables spatial analysis for extremely large astronomical databases (i.e. querying and crossmatching O(1B) sources). This package uses dask to parallelize operations across multiple HiPSCat partitioned surveys.\n", + "\n", + "Both HiPSCat and LSDB are strong tools in the arsenal of a TAPE user. HiPSCat provides a scalable data format for working at the scale of LSST. While LSDB provides tooling to prepare more complex datasets for TAPE analysis, including operations like cross-matching multiple surveys, cone searches to select data from specific regions of the sky, etc. In this notebook, we'll walk through the process by which these can be used with TAPE." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Loading from HiPSCat data" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "TAPE offers a built-in HiPSCat loader function, which can be used to quickly load in a dataset that is in the HiPSCat format. We'll use a small dummy dataset for this example. Before loading, let's just peek at the data we'll be working with." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "import os\n", + "\n", + "object_path = os.path.join(rel_path, \"small_sky_object_catalog\")\n", + "source_path = os.path.join(rel_path, \"small_sky_source_catalog\")\n", + "\n", + "# Object Schema\n", + "pq.read_metadata(os.path.join(object_path, \"_common_metadata\")).schema" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Source Schema\n", + "pq.read_metadata(os.path.join(source_path, \"_common_metadata\")).schema" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The schema indicates which fields are available in each catalog. Notice the `_hipscat_index` in both, this is a specially constructed index that the data is sorted on and enables efficient use of the HiPSCat format. It's recommended to use this as the ID column in TAPE when loading from hipscatted object and source catalogs. With this established, let's load this data into TAPE." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from tape import Ensemble, ColumnMapper\n", + "\n", + "ens = Ensemble(client=False)\n", + "\n", + "# Setup a ColumnMapper\n", + "colmap = ColumnMapper(\n", + " id_col=\"_hipscat_index\", # using _hipscat_index is recommended\n", + " time_col=\"mjd\", # pulling these from the source schema list above\n", + " flux_col=\"mag\",\n", + " err_col=\"Norder\", # we don't have an error column, using a random column for this toy example\n", + " band_col=\"band\",\n", + ")\n", + "\n", + "ens.from_hipscat(source_path, object_path, column_mapper=colmap, object_index=\"id\", source_index=\"object_id\")\n", + "\n", + "ens.object.head(5)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the `from_hipscat` call, we additionally needed to specify `object_index` and `source_index`, these are a column from both tables that map to the same object-level identifier. It's used to join object and source, and convert the source `_hipscat_index` (which is unique per source) to use the object `_hipscat_index` (unique per object). From here, the `_hipscat_index` will serve as an object ID that ties sources together for TAPE operations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# We're now free to work with our TAPE Ensemble as normal\n", + "import matplotlib.pyplot as plt\n", + "\n", + "ts = ens.to_timeseries(12751184493818150912) # select a lightcurve using the _hipscat_index\n", + "\n", + "# Let's plot this, though it's toy data so it won't look like anything...\n", + "plt.plot(ts.data[\"mjd\"], ts.data[\"mag\"], \".\")\n", + "plt.title(ts.meta[\"id\"])" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Loading from LSDB Catalogs\n", + "\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`Ensemble.from_hipscat` is used to directly ingest HiPSCat data into TAPE. In many cases, you may prefer to do a few operations on your HiPSCat data first using LSDB. Let's walk through how this would look." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Loading into LSDB\n", + "import lsdb\n", + "\n", + "# Load the dataset into LSDB catalog objects\n", + "object_cat = lsdb.read_hipscat(object_path)\n", + "source_cat = lsdb.read_hipscat(source_path)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We've now loaded our catalogs into LSDB catalog objects. From here, we can do LSDB operations on the catalogs. For example, let's perform a cone search to narrow down our list of objects." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "object_cat_cone = object_cat.cone_search(\n", + " ra=315.0,\n", + " dec=-69.5,\n", + " radius=10.0,\n", + ")\n", + "\n", + "print(f\"Original Number of Objects: {len(object_cat._ddf)}\")\n", + "print(f\"New Number of Objects: {len(object_cat_cone._ddf)}\")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With our cone search performed, we can now move into TAPE. We'll first need to create a new source catalog, `joined_source_cat`, which incorporates the result of the cone search and also reindexes onto the object `_hipscat_index`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# We do this to get the source catalog indexed by the objects hipscat index\n", + "joined_source_cat = object_cat_cone.join(\n", + " source_cat, left_on=\"id\", right_on=\"object_id\", suffixes=(\"_object\", \"\")\n", + ")\n", + "\n", + "colmap = ColumnMapper(\n", + " id_col=\"_hipscat_index\",\n", + " time_col=\"mjd\",\n", + " flux_col=\"mag\",\n", + " err_col=\"Norder\", # no error column...\n", + " band_col=\"band\",\n", + ")\n", + "\n", + "ens = Ensemble(client=False)\n", + "\n", + "# We just pass in the catalog objects\n", + "ens.from_lsdb(joined_source_cat, object_cat_cone, column_mapper=colmap)\n", + "\n", + "ens.object.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And from here, we're once again able to work with our TAPE Ensemble as normal." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.11" + }, + "vscode": { + "interpreter": { + "hash": "83afbb17b435d9bf8b0d0042367da76f26510da1c5781f0ff6e6c518eab621ec" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pyproject.toml b/pyproject.toml index 849a862b..633f9e39 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "deprecated", "ipykernel", # Support for Jupyter notebooks "light-curve>=0.7.3,<0.8.0", + "lsdb" ] # On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 302712de..848ab4cb 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -4,6 +4,7 @@ import shutil import warnings import requests +import lsdb import dask.dataframe as dd import numpy as np import pandas as pd @@ -1533,9 +1534,7 @@ def from_dask_dataframe( information mapped from the input dataset. sync_tables: 'bool', optional In the case where an `object_frame`is provided, determines whether an - initial sync is performed between the object and source tables. If - not performed, dynamic information like the number of observations - may be out of date until a sync is performed internally. + initial sync is performed between the object and source tables. npartitions: `int`, optional If specified, attempts to repartition the ensemble to the specified number of partitions @@ -1564,14 +1563,18 @@ def from_dask_dataframe( source_frame = source_frame.repartition(partition_size=partition_size) # Set the index of the source frame and save the resulting table - self.update_frame(source_frame.set_index(self._id_col, drop=True, sorted=sorted, sort=sort)) + if source_frame.index.name != self._id_col: # prevents a potential no-op + self.update_frame(source_frame.set_index(self._id_col, drop=True, sorted=sorted, sort=sort)) + else: + self.update_frame(source_frame) # the index is already set if object_frame is None: # generate an indexed object table from source self.update_frame(self._generate_object_table()) else: self.update_frame(ObjectFrame.from_dask_dataframe(object_frame, ensemble=self)) - self.update_frame(self.object.set_index(self._id_col, sorted=sorted, sort=sort)) + if object_frame.index.name != self._id_col: # prevents a potential no-op + self.update_frame(self.object.set_index(self._id_col, sorted=sorted, sort=sort)) # Optionally sync the tables, recalculates nobs columns if sync_tables: @@ -1587,47 +1590,183 @@ def from_dask_dataframe( ) return self - def from_hipscat(self, dir, source_subdir="source", object_subdir="object", column_mapper=None, **kwargs): - """Read in parquet files from a hipscat-formatted directory structure + def from_lsdb( + self, + source_catalog, + object_catalog=None, + column_mapper=None, + sync_tables=False, + sorted=True, + sort=False, + ): + """Read in from LSDB catalog objects. Parameters ---------- - dir: 'str' - Path to the directory structure - source_subdir: 'str' - Path to the subdirectory which contains source files - object_subdir: 'str' - Path to the subdirectory which contains object files, if None then - files will only be read from the source_subdir + source_catalog: 'dask.Dataframe' + An LSDB catalog that contains source information to be read into + the ensemble. + object_catalog: 'dask.Dataframe', optional + An LSDB catalog containing object information. If not specified, + a minimal ObjectFrame is generated from the source catalog. column_mapper: 'ColumnMapper' object If provided, the ColumnMapper is used to populate relevant column information mapped from the input dataset. - **kwargs: - keyword arguments passed along to - `tape.ensemble.Ensemble.from_parquet` + sync_tables: 'bool', optional + In the case where an `object_catalog`is provided, determines + whether an initial sync is performed between the object and source + tables. Defaults to False. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to True. + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. Returns ---------- ensemble: `tape.ensemble.Ensemble` - The ensemble object with parquet data loaded + The ensemble object with the LSDB catalog data loaded. """ - source_path = os.path.join(dir, source_subdir) - source_files = glob.glob(os.path.join(source_path, "**", "*.parquet"), recursive=True) + # Support for just source catalog is somewhat involved + # The code below mainly tries to catch a few common pitfalls + if object_catalog is None: + # This is tricky, so just raise an error + if column_mapper.map["id_col"] == "_hipscat_index": + raise ValueError( + "Using the _hipscat_index as the id column is not advised without a specified object catalog, as the _hipscat_index is unique per source in this case. Use an object-level id.", + ) + # And if they didn't choose _hipscat_index, it's almost certainly not sorted + # Let's try to catch a bad sorted set, and reroute to sort for better user experience + else: + if sorted is True: + warnings.warn( + f" The sorted flag was set true with a non _hipscat_index id column ({column_mapper.map['id_col']}). This dataset is sorted by _hipscat_index, so the sorted flag has been turned off and sort has been turned on." + ) + sorted = False + sort = True - if object_subdir is not None: - object_path = os.path.join(dir, object_subdir) - object_files = glob.glob(os.path.join(object_path, "**", "*.parquet"), recursive=True) + self.from_dask_dataframe( + source_catalog._ddf, + None, + column_mapper=column_mapper, + sync_tables=sync_tables, + sorted=sorted, + sort=sort, + npartitions=None, + partition_size=None, + ) + + # When we have both object and source, it's much simpler else: - object_files = None + # We are still vulnerable to users choosing a non-_hipscat_index + # Just warn them, though it's likely the function call will fail + if column_mapper.map["id_col"] != "_hipscat_index": + warnings.warn( + f"With hipscat data, it's advised to use the _hipscat_index as the id_col (instead of {column_mapper.map['id_col']}), as the data is sorted using this column. If you'd like to use your chosen id column, make sure it's in both catalogs and use sort=True and sorted=False (these have been auto-set for this call)", + UserWarning, + ) + sorted = False + sort = True - return self.from_parquet( - source_files, - object_files, + self.from_dask_dataframe( + source_catalog._ddf, + object_catalog._ddf, + column_mapper=column_mapper, + sync_tables=sync_tables, + sorted=sorted, + sort=sort, + npartitions=None, + partition_size=None, + ) + + return self + + def from_hipscat( + self, + source_path, + object_path=None, + column_mapper=None, + source_index=None, + object_index=None, + sorted=True, + sort=False, + ): + """Use LSDB to read from a hipscat directory. + + This function utilizes LSDB for reading a hipscat directory into TAPE. + In cases where a user would like to do operations on the LSDB catalog + objects, it's best to use LSDB itself first, and then load the result + into TAPE using `tape.Ensemble.from_lsdb`. A join is performed between + the two tables to modify the source table to use the object index, + using `object_index` and `source_index`. + + Parameters + ---------- + source_path: str or Path + A hipscat directory that contains source information to be read + into the ensemble. + object_path: str or Path, optional + A hipscat directory containing object information. If not + specified, a minimal ObjectFrame is generated from the sources. + column_mapper: 'ColumnMapper' object + If provided, the ColumnMapper is used to populate relevant column + information mapped from the input dataset. + object_index: 'str', optional + The join index of the object table, should be the label for the + object ID contained in the object table. + source_index: 'str', optional + The join index of the source table, should be the label for the + object ID contained in the source table. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to True. + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. + + Returns + ---------- + ensemble: `tape.ensemble.Ensemble` + The ensemble object with the hipscat data loaded. + """ + + # After margin/associated caches are implemented in LSDB, we should use them here + source_catalog = lsdb.read_hipscat(source_path) + + if object_path is not None: + object_catalog = lsdb.read_hipscat(object_path) + + # We do this to get the source catalog indexed by the objects hipscat index + # Very specifically need object.join(source) + joined_source_catalog = object_catalog.join( + source_catalog, + left_on=object_index, + right_on=source_index, + suffixes=("_drop_these_cols", ""), + ) + + else: + object_catalog = None + joined_source_catalog = source_catalog + + # We should also set index column to be object's _hipscat_index + self.from_lsdb( + joined_source_catalog, + object_catalog, column_mapper=column_mapper, - **kwargs, + sync_tables=False, # should never need to sync, the join does it for us + sorted=sorted, + sort=sort, ) + # drop the extra object columns from source + if object_path is not None: + cols_to_drop = [col for col in self.source.columns if col.endswith("_drop_these_cols")] + self.source.drop(columns=cols_to_drop).update_ensemble() + return self + def make_column_map(self): """Returns the current column mapping. diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index 623effa0..22d0b61a 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -245,51 +245,130 @@ def read_parquet( return new_ens +def read_lsdb( + source_catalog, + object_catalog=None, + column_mapper=None, + sync_tables=False, + sorted=True, + sort=False, + dask_client=True, + **kwargs, +): + """Read in from LSDB catalog objects. + + Parameters + ---------- + source_catalog: 'dask.Dataframe' + An LSDB catalog that contains source information to be read into + the ensemble. + object_catalog: 'dask.Dataframe', optional + An LSDB catalog containing object information. If not specified, + a minimal ObjectFrame is generated from the source catalog. + column_mapper: 'ColumnMapper' object + If provided, the ColumnMapper is used to populate relevant column + information mapped from the input dataset. + sync_tables: 'bool', optional + In the case where an `object_catalog`is provided, determines + whether an initial sync is performed between the object and source + tables. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to True. + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. + dask_client: `dask.distributed.client` or `bool`, optional + Accepts an existing `dask.distributed.Client`, or creates one if + `client=True`, passing any additional kwargs to a + dask.distributed.Client constructor call. If `client=False`, the + Ensemble is created without a distributed client. + + Returns + ---------- + ensemble: `tape.ensemble.Ensemble` + The ensemble object with the LSDB catalog data loaded. + """ + + new_ens = Ensemble(dask_client, **kwargs) + + new_ens.from_lsdb( + source_catalog=source_catalog, + object_catalog=object_catalog, + column_mapper=column_mapper, + sync_tables=sync_tables, + sorted=sorted, + sort=sort, + ) + + return new_ens + + def read_hipscat( - dir, - source_subdir="source", - object_subdir="object", + source_path, + object_path=None, column_mapper=None, + source_index=None, + object_index=None, + sorted=True, + sort=False, dask_client=True, **kwargs, ): - """Read in parquet files from a hipscat-formatted directory structure + """Use LSDB to read from a hipscat directory. + + This function utilizes LSDB for reading a hipscat directory into TAPE. + In cases where a user would like to do operations on the LSDB catalog + objects, it's best to use LSDB itself first, and then load the result + into TAPE using `tape.Ensemble.from_lsdb`. A join is performed between + the two tables to modify the source table to use the object index, + using `object_index` and `source_index`. Parameters ---------- - dir: 'str' - Path to the directory structure - source_subdir: 'str' - Path to the subdirectory which contains source files - object_subdir: 'str' - Path to the subdirectory which contains object files, if None then - files will only be read from the source_subdir + source_path: str or Path + A hipscat directory that contains source information to be read + into the ensemble. + object_path: str or Path, optional + A hipscat directory containing object information. If not + specified, a minimal ObjectFrame is generated from the sources. column_mapper: 'ColumnMapper' object If provided, the ColumnMapper is used to populate relevant column information mapped from the input dataset. + object_index: 'str', optional + The join index of the object table, should be the label for the + object ID contained in the object table. + source_index: 'str', optional + The join index of the source table, should be the label for the + object ID contained in the source table. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to True. + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. dask_client: `dask.distributed.client` or `bool`, optional Accepts an existing `dask.distributed.Client`, or creates one if `client=True`, passing any additional kwargs to a dask.distributed.Client constructor call. If `client=False`, the Ensemble is created without a distributed client. - **kwargs: - keyword arguments passed along to - `tape.ensemble.Ensemble.from_parquet` Returns ---------- ensemble: `tape.ensemble.Ensemble` - The ensemble object with parquet data loaded + The ensemble object with the hipscat data loaded. """ new_ens = Ensemble(dask_client, **kwargs) new_ens.from_hipscat( - dir=dir, - source_subdir=source_subdir, - object_subdir=object_subdir, + source_path, + object_path=object_path, column_mapper=column_mapper, - **kwargs, + source_index=source_index, + object_index=object_index, + sorted=sorted, + sort=sort, ) return new_ens diff --git a/tests/tape_tests/conftest.py b/tests/tape_tests/conftest.py index ec6b4521..73430784 100644 --- a/tests/tape_tests/conftest.py +++ b/tests/tape_tests/conftest.py @@ -1,9 +1,11 @@ """Test fixtures for Ensemble manipulations""" + import numpy as np import pandas as pd import dask.dataframe as dd import pytest import tape +import lsdb from dask.distributed import Client @@ -197,13 +199,21 @@ def read_parquet_ensemble_with_known_column_mapper(dask_client): @pytest.fixture def read_parquet_ensemble_from_hipscat(dask_client): """Create an Ensemble from a hipscat/hive-style directory.""" + + colmap = ColumnMapper( + id_col="_hipscat_index", + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + return tape.read_hipscat( - "tests/tape_tests/data", - id_col="ps1_objid", - time_col="midPointTai", - band_col="filterName", - flux_col="psFlux", - err_col="psFluxErr", + "tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog", + "tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog", + column_mapper=colmap, + object_index="id", + source_index="object_id", dask_client=dask_client, ) @@ -362,21 +372,85 @@ def parquet_ensemble_with_known_column_mapper(dask_client): # pylint: disable=redefined-outer-name @pytest.fixture -def parquet_ensemble_from_hipscat(dask_client): +def parquet_ensemble_from_hipscat(): """Create an Ensemble from a hipscat/hive-style directory.""" - ens = Ensemble(client=dask_client) + ens = Ensemble(client=False) + + colmap = ColumnMapper( + id_col="_hipscat_index", + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + ens.from_hipscat( - "tests/tape_tests/data", - id_col="ps1_objid", - time_col="midPointTai", - band_col="filterName", - flux_col="psFlux", - err_col="psFluxErr", + "tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog", + "tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog", + column_mapper=colmap, + object_index="id", + source_index="object_id", ) return ens +# pylint: disable=redefined-outer-name +@pytest.fixture +def ensemble_from_lsdb(): + """Create a dask dataframe from LSDB catalogs""" + object_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog") + source_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog") + + # Pain points: Suffixes here are a bit annoying, and I'd ideally want just the source columns (especially at scale) + # We do this to get the source catalog indexed by the objects hipscat index + joined_source_cat = object_cat.join( + source_cat, left_on="id", right_on="object_id", suffixes=("_object", "") + ) + + colmap = ColumnMapper( + id_col="_hipscat_index", + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + + ens = Ensemble(False) + + # We just avoid needing to invoke the ._ddf property from the catalogs + ens.from_lsdb(joined_source_cat, object_cat, column_mapper=colmap) + + return ens + + +# pylint: disable=redefined-outer-name +@pytest.fixture +def read_ensemble_from_lsdb(): + """Create a dask dataframe from LSDB catalogs""" + object_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog") + source_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog") + + # Pain points: Suffixes here are a bit annoying, and I'd ideally want just the source columns (especially at scale) + # We do this to get the source catalog indexed by the objects hipscat index + joined_source_cat = object_cat.join( + source_cat, left_on="id", right_on="object_id", suffixes=("_object", "") + ) + + colmap = ColumnMapper( + id_col="_hipscat_index", + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + + # We just avoid needing to invoke the ._ddf property from the catalogs + ens = tape.read_lsdb(joined_source_cat, object_cat, column_mapper=colmap, dask_client=False) + + return ens + + # pylint: disable=redefined-outer-name @pytest.fixture def dask_dataframe_ensemble(dask_client): diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet new file mode 100644 index 00000000..e0cb8d94 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/_common_metadata b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/_common_metadata new file mode 100644 index 00000000..4cf7a744 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/_common_metadata differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/_metadata b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/_metadata new file mode 100644 index 00000000..26df207b Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/_metadata differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/catalog_info.json b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/catalog_info.json new file mode 100644 index 00000000..ef9a8b6d --- /dev/null +++ b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/catalog_info.json @@ -0,0 +1,8 @@ +{ + "catalog_name": "small_sky_object_catalog", + "catalog_type": "object", + "total_rows": 131, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec" +} diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/partition_info.csv b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/partition_info.csv new file mode 100644 index 00000000..ed015721 --- /dev/null +++ b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/partition_info.csv @@ -0,0 +1,2 @@ +Norder,Dir,Npix,num_rows +0,0,11,131 diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/point_map.fits b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/point_map.fits new file mode 100644 index 00000000..1971966f Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/point_map.fits differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/provenance_info.json b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/provenance_info.json new file mode 100644 index 00000000..08ef05fe --- /dev/null +++ b/tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog/provenance_info.json @@ -0,0 +1,52 @@ +{ + "catalog_name": "small_sky_object_catalog", + "catalog_type": "object", + "total_rows": 131, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec", + "version": "0.2.1", + "generation_date": "2024.01.09", + "tool_args": { + "tool_name": "hipscat_import", + "version": "0.2.1", + "runtime_args": { + "catalog_name": "small_sky_object_catalog", + "output_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/", + "output_artifact_name": "small_sky_object_catalog", + "tmp_dir": "", + "overwrite": true, + "dask_tmp": "", + "dask_n_workers": 1, + "dask_threads_per_worker": 1, + "catalog_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_object_catalog", + "tmp_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_object_catalog/intermediate", + "epoch": "J2000", + "catalog_type": "object", + "input_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky", + "input_paths": [ + "file:///home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky/catalog.csv" + ], + "input_format": "csv", + "input_file_list": [], + "ra_column": "ra", + "dec_column": "dec", + "use_hipscat_index": false, + "sort_columns": null, + "constant_healpix_order": -1, + "highest_healpix_order": 7, + "pixel_threshold": 1000000, + "mapping_healpix_order": 7, + "debug_stats_only": false, + "file_reader_info": { + "input_reader_type": "CsvReader", + "chunksize": 500000, + "header": "infer", + "schema_file": null, + "separator": ",", + "column_names": null, + "type_map": {} + } + } + } +} diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=0/Dir=0/Npix=4.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=0/Dir=0/Npix=4.parquet new file mode 100644 index 00000000..d5c65653 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=0/Dir=0/Npix=4.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=1/Dir=0/Npix=47.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=1/Dir=0/Npix=47.parquet new file mode 100644 index 00000000..cb0d36f7 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=1/Dir=0/Npix=47.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=176.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=176.parquet new file mode 100644 index 00000000..2c4bbe6d Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=176.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=177.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=177.parquet new file mode 100644 index 00000000..00ad8b93 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=177.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=178.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=178.parquet new file mode 100644 index 00000000..6cd593a1 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=178.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=179.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=179.parquet new file mode 100644 index 00000000..07b1b065 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=179.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=180.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=180.parquet new file mode 100644 index 00000000..90b514cb Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=180.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=181.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=181.parquet new file mode 100644 index 00000000..b17baf4b Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=181.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=182.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=182.parquet new file mode 100644 index 00000000..48e81ff8 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=182.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=183.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=183.parquet new file mode 100644 index 00000000..13263678 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=183.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=184.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=184.parquet new file mode 100644 index 00000000..6f56792f Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=184.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=185.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=185.parquet new file mode 100644 index 00000000..a96dfbef Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=185.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=186.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=186.parquet new file mode 100644 index 00000000..3531aa05 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=186.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=187.parquet b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=187.parquet new file mode 100644 index 00000000..ec824363 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/Norder=2/Dir=0/Npix=187.parquet differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/_common_metadata b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/_common_metadata new file mode 100644 index 00000000..982281c9 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/_common_metadata differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/_metadata b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/_metadata new file mode 100644 index 00000000..bfd79a20 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/_metadata differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/catalog_info.json b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/catalog_info.json new file mode 100644 index 00000000..0491b5c5 --- /dev/null +++ b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/catalog_info.json @@ -0,0 +1,8 @@ +{ + "catalog_name": "small_sky_source_catalog", + "catalog_type": "source", + "total_rows": 17161, + "epoch": "J2000", + "ra_column": "source_ra", + "dec_column": "source_dec" +} \ No newline at end of file diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/partition_info.csv b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/partition_info.csv new file mode 100644 index 00000000..7a5f4e9f --- /dev/null +++ b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/partition_info.csv @@ -0,0 +1,15 @@ +Norder,Dir,Npix,num_rows +0,0,4,50 +1,0,47,2395 +2,0,176,385 +2,0,177,1510 +2,0,178,1634 +2,0,179,1773 +2,0,180,655 +2,0,181,903 +2,0,182,1246 +2,0,183,1143 +2,0,184,1390 +2,0,185,2942 +2,0,186,452 +2,0,187,683 diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/point_map.fits b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/point_map.fits new file mode 100644 index 00000000..e0ac82b9 Binary files /dev/null and b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/point_map.fits differ diff --git a/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/provenance_info.json b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/provenance_info.json new file mode 100644 index 00000000..c004e267 --- /dev/null +++ b/tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog/provenance_info.json @@ -0,0 +1,49 @@ +{ + "catalog_name": "small_sky_source_catalog", + "catalog_type": "source", + "version": "0.0.10.dev7+g0a79f90.d20230418", + "generation_date": "2023.04.20", + "epoch": "J2000", + "ra_kw": "source_ra", + "dec_kw": "source_dec", + "total_rows": 17161, + "tool_args": { + "tool_name": "hipscat_import", + "version": "0.0.4.dev28+g2e31821.d20230420", + "runtime_args": { + "catalog_name": "small_sky_source_catalog", + "output_path": "/home/data", + "output_artifact_name": "small_sky_source_catalog", + "tmp_dir": "", + "overwrite": true, + "dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner_source_table0", + "dask_n_workers": 1, + "dask_threads_per_worker": 1, + "catalog_path": "/home/data/small_sky_source_catalog", + "tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner_source_table0/small_sky_source_catalog/intermediate", + "epoch": "J2000", + "catalog_type": "source", + "input_path": "/home/data/small_sky_source", + "input_paths": [ + "/home/data/small_sky_source/small_sky_source.csv" + ], + "input_format": "csv", + "input_file_list": [], + "ra_column": "source_ra", + "dec_column": "source_dec", + "sort_columns": "source_id", + "highest_healpix_order": 2, + "pixel_threshold": 3000, + "debug_stats_only": false, + "file_reader_info": { + "input_reader_type": "CsvReader", + "chunksize": 500000, + "header": "infer", + "schema_file": null, + "separator": ",", + "column_names": null, + "type_map": {} + } + } + } +} \ No newline at end of file diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index fe92ecac..5fcb2900 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -1,6 +1,8 @@ """Test ensemble manipulations""" + import copy import os +import lsdb import dask.dataframe as dd import numpy as np @@ -47,20 +49,17 @@ def test_with_client(): "parquet_ensemble_with_divisions", "parquet_ensemble_without_client", "parquet_ensemble_from_source", - "parquet_ensemble_from_hipscat", "parquet_ensemble_with_column_mapper", "parquet_ensemble_with_known_column_mapper", "parquet_ensemble_partition_size", "read_parquet_ensemble", "read_parquet_ensemble_without_client", "read_parquet_ensemble_from_source", - "read_parquet_ensemble_from_hipscat", "read_parquet_ensemble_with_column_mapper", "read_parquet_ensemble_with_known_column_mapper", "read_parquet_ensemble", "read_parquet_ensemble_without_client", "read_parquet_ensemble_from_source", - "read_parquet_ensemble_from_hipscat", "read_parquet_ensemble_with_column_mapper", "read_parquet_ensemble_with_known_column_mapper", ], @@ -146,6 +145,49 @@ def test_dataframe_constructors(data_fixture, request): assert len(amplitude) == 5 +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble_from_hipscat", + "read_parquet_ensemble_from_hipscat", + "read_parquet_ensemble_from_hipscat", + "ensemble_from_lsdb", + "read_ensemble_from_lsdb", + ], +) +def test_hipscat_constructors(data_fixture, request): + """ + Tests constructing an ensemble from LSDB and hipscat + """ + parquet_ensemble = request.getfixturevalue(data_fixture) + + # Check to make sure the source and object tables were created + assert parquet_ensemble.source is not None + assert parquet_ensemble.object is not None + + # Make sure divisions are set + assert parquet_ensemble.source.known_divisions + assert parquet_ensemble.object.known_divisions + + # Check that the data is not empty. + obj, source = parquet_ensemble.compute() + assert len(source) == 16135 # full source is 17161, but we drop some in the join with object + assert len(obj) == 131 + + # Check that source and object both have the same ids present + assert sorted(np.unique(list(source.index))) == sorted(np.array(obj.index)) + + # Check the we loaded the correct columns. + for col in [ + parquet_ensemble._time_col, + parquet_ensemble._flux_col, + parquet_ensemble._err_col, + parquet_ensemble._band_col, + ]: + # Check to make sure the critical quantity labels are bound to real columns + assert parquet_ensemble.source[col] is not None + + @pytest.mark.parametrize( "data_fixture", [ @@ -440,6 +482,153 @@ def test_from_source_dict(dask_client): assert 8002 in obj_table.index +@pytest.mark.parametrize("bad_sort_kwargs", [True, False]) +@pytest.mark.parametrize("use_object", [True, False]) +@pytest.mark.parametrize("id_col", ["object_id", "_hipscat_index"]) +def test_from_lsdb_warnings_errors(bad_sort_kwargs, use_object, id_col): + """Test warnings in from_lsdb""" + object_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_object_catalog") + source_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog") + + # Pain points: Suffixes here are a bit annoying, and I'd ideally want just the source columns (especially at scale) + # We do this to get the source catalog indexed by the objects hipscat index + joined_source_cat = object_cat.join( + source_cat, left_on="id", right_on="object_id", suffixes=("_object", "") + ) + + colmap = ColumnMapper( + id_col=id_col, + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + + ens = Ensemble(client=False) + + # We just avoid needing to invoke the ._ddf property from the catalogs + + # When object and source are used with a id_col that is not _hipscat_index + # Check to see if this gives the user the expected warning + if id_col != "_hipscat_index" and use_object: + # need to first rename + object_cat._ddf = object_cat._ddf.rename(columns={"id": id_col}) + with pytest.warns(UserWarning): + ens.from_lsdb(joined_source_cat, object_cat, column_mapper=colmap, sorted=False, sort=True) + + # When using just source and the _hipscat_index is chosen as the id_col + # Check to see if this gives user the expected warning, do not test further + # as this ensemble is incorrect (source _hipscat_index is unique per source) + elif id_col == "_hipscat_index" and not use_object: + with pytest.raises(ValueError): + ens.from_lsdb(joined_source_cat, None, column_mapper=colmap, sorted=True, sort=False) + + # When using just source with bad sort kwargs, check that a warning is + # raised, but this should still yield a valid result + elif bad_sort_kwargs and not use_object: + with pytest.warns(UserWarning): + ens.from_lsdb(joined_source_cat, None, column_mapper=colmap, sorted=True, sort=False) + + else: + return + + +@pytest.mark.parametrize("id_col", ["object_id", "_hipscat_index"]) +def test_from_lsdb_no_object(id_col): + """Ensemble from a hipscat directory, with just the source given""" + source_cat = lsdb.read_hipscat("tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog") + + colmap = ColumnMapper( + id_col=id_col, # don't use _hipscat_index, it's per source + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + + ens = Ensemble(client=False) + + # Just check to make sure users trying to use the _hipscat_index get an error + # this ensemble is incorrect (one id per source) + if id_col == "_hipscat_index": + with pytest.raises(ValueError): + ens.from_lsdb(source_cat, object_catalog=None, column_mapper=colmap, sorted=True, sort=False) + return + else: + ens.from_lsdb(source_cat, object_catalog=None, column_mapper=colmap, sorted=False, sort=True) + + # Check to make sure the source and object tables were created + assert ens.source is not None + assert ens.object is not None + + # Make sure divisions are set + assert ens.source.known_divisions + assert ens.object.known_divisions + + # Check that the data is not empty. + obj, source = ens.compute() + assert len(source) == 17161 + assert len(obj) == 131 + + # Check that source and object both have the same ids present + assert sorted(np.unique(list(source.index))) == sorted(np.array(obj.index)) + + # Check the we loaded the correct columns. + for col in [ + ens._time_col, + ens._flux_col, + ens._err_col, + ens._band_col, + ]: + # Check to make sure the critical quantity labels are bound to real columns + assert ens.source[col] is not None + + +def test_from_hipscat_no_object(): + """Ensemble from a hipscat directory, with just the source given""" + ens = Ensemble(client=False) + + colmap = ColumnMapper( + id_col="object_id", # don't use _hipscat_index, it's per source + time_col="mjd", + flux_col="mag", + err_col="Norder", # no error column... + band_col="band", + ) + + ens.from_hipscat( + "tests/tape_tests/data/small_sky_hipscat/small_sky_source_catalog", + object_path=None, + column_mapper=colmap, + ) + + # Check to make sure the source and object tables were created + assert ens.source is not None + assert ens.object is not None + + # Make sure divisions are set + assert ens.source.known_divisions + assert ens.object.known_divisions + + # Check that the data is not empty. + obj, source = ens.compute() + assert len(source) == 17161 + assert len(obj) == 131 + + # Check that source and object both have the same ids present + assert sorted(np.unique(list(source.index))) == sorted(np.array(obj.index)) + + # Check the we loaded the correct columns. + for col in [ + ens._time_col, + ens._flux_col, + ens._err_col, + ens._band_col, + ]: + # Check to make sure the critical quantity labels are bound to real columns + assert ens.source[col] is not None + + def test_read_source_dict(dask_client): """ Test that tape.read_source_dict() successfully creates data from a dictionary.