diff --git a/python/lsst/pipe/tasks/finalizeCharacterization.py b/python/lsst/pipe/tasks/finalizeCharacterization.py index f77be3bdc..a36089007 100644 --- a/python/lsst/pipe/tasks/finalizeCharacterization.py +++ b/python/lsst/pipe/tasks/finalizeCharacterization.py @@ -26,9 +26,10 @@ 'FinalizeCharacterizationConfig', 'FinalizeCharacterizationTask'] +import astropy.table import numpy as np import esutil -import pandas as pd + import lsst.pex.config as pexConfig import lsst.pipe.base as pipeBase @@ -71,7 +72,7 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections, doc=('Catalog of isolated stars with average positions, number of associated ' 'sources, and indexes to the isolated_star_sources catalogs.'), name='isolated_star_presource_associations', - storageClass='DataFrame', + storageClass='ArrowAstropy', dimensions=('instrument', 'tract', 'skymap'), deferLoad=True, multiple=True, @@ -80,7 +81,7 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections, doc=('Catalog of isolated star sources with sourceIds, and indexes to the ' 'isolated_star_cats catalogs.'), name='isolated_star_presources', - storageClass='DataFrame', + storageClass='ArrowAstropy', dimensions=('instrument', 'tract', 'skymap'), deferLoad=True, multiple=True, @@ -96,7 +97,7 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections, finalized_src_table = pipeBase.connectionTypes.Output( doc=('Per-visit catalog of measurements for psf/flag/etc.'), name='finalized_src_table', - storageClass='DataFrame', + storageClass='ArrowAstropy', dimensions=('instrument', 'visit'), ) @@ -293,7 +294,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): butlerQC.put(struct.psf_ap_corr_cat, outputRefs.finalized_psf_ap_corr_cat) - butlerQC.put(pd.DataFrame(struct.output_table), + butlerQC.put(astropy.table.Table(struct.output_table), outputRefs.finalized_src_table) def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict): @@ -538,14 +539,13 @@ def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_ merge_source_counter = 0 for tract in isolated_star_cat_dict: - df_cat = isolated_star_cat_dict[tract].get() - table_cat = df_cat.to_records() + astropy_cat = isolated_star_cat_dict[tract].get() + table_cat = np.asarray(astropy_cat) - df_source = isolated_star_source_dict[tract].get( - parameters={'columns': [self.config.id_column, - 'obj_index']} + astropy_source = isolated_star_source_dict[tract].get( + parameters={'columns': [self.config.id_column, 'obj_index']} ) - table_source = df_source.to_records() + table_source = np.asarray(astropy_source) # Cut isolated star table to those observed in this band, and adjust indexes (use_band,) = (table_cat[f'nsource_{band}'] > 0).nonzero() diff --git a/python/lsst/pipe/tasks/isolatedStarAssociation.py b/python/lsst/pipe/tasks/isolatedStarAssociation.py index 0189544ae..12b513ea1 100644 --- a/python/lsst/pipe/tasks/isolatedStarAssociation.py +++ b/python/lsst/pipe/tasks/isolatedStarAssociation.py @@ -23,10 +23,10 @@ 'IsolatedStarAssociationConfig', 'IsolatedStarAssociationTask'] +import astropy.table import esutil import hpgeom as hpg import numpy as np -import pandas as pd from smatch.matcher import Matcher import lsst.pex.config as pexConfig @@ -43,7 +43,7 @@ class IsolatedStarAssociationConnections(pipeBase.PipelineTaskConnections, source_table_visit = pipeBase.connectionTypes.Input( doc='Source table in parquet format, per visit', name='preSourceTable_visit', - storageClass='DataFrame', + storageClass='ArrowAstropy', dimensions=('instrument', 'visit'), deferLoad=True, multiple=True, @@ -57,13 +57,13 @@ class IsolatedStarAssociationConnections(pipeBase.PipelineTaskConnections, isolated_star_sources = pipeBase.connectionTypes.Output( doc='Catalog of individual sources for the isolated stars', name='isolated_star_presources', - storageClass='DataFrame', + storageClass='ArrowAstropy', dimensions=('instrument', 'tract', 'skymap'), ) isolated_star_cat = pipeBase.connectionTypes.Output( doc='Catalog of isolated star positions', name='isolated_star_presource_associations', - storageClass='DataFrame', + storageClass='ArrowAstropy', dimensions=('instrument', 'tract', 'skymap'), ) @@ -206,19 +206,19 @@ def __init__(self, **kwargs): self.source_selector.log.setLevel(self.source_selector.log.WARN) def runQuantum(self, butlerQC, inputRefs, outputRefs): - input_ref_dict = butlerQC.get(inputRefs) + input_handle_dict = butlerQC.get(inputRefs) tract = butlerQC.quantum.dataId['tract'] - source_table_refs = input_ref_dict['source_table_visit'] + source_table_handles = input_handle_dict['source_table_visit'] - self.log.info('Running with %d source_table_visit dataRefs', - len(source_table_refs)) + self.log.info('Running with %d source_table_visit datasets', + len(source_table_handles)) - source_table_ref_dict_temp = {source_table_ref.dataId['visit']: source_table_ref for - source_table_ref in source_table_refs} + source_table_handle_dict_temp = {source_table_handle.dataId['visit']: source_table_handle for + source_table_handle in source_table_handles} - bands = {source_table_ref.dataId['band'] for source_table_ref in source_table_refs} + bands = {source_table_handle.dataId['band'] for source_table_handle in source_table_handles} for band in bands: if band not in self.config.band_order: self.log.warning('Input data has data from band %s but that band is not ' @@ -226,17 +226,17 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): # TODO: Sort by visit until DM-31701 is done and we have deterministic # dataset ordering. - source_table_ref_dict = {visit: source_table_ref_dict_temp[visit] for - visit in sorted(source_table_ref_dict_temp.keys())} + source_table_handle_dict = {visit: source_table_handle_dict_temp[visit] for + visit in sorted(source_table_handle_dict_temp.keys())} - struct = self.run(input_ref_dict['skymap'], tract, source_table_ref_dict) + struct = self.run(input_handle_dict['skymap'], tract, source_table_handle_dict) - butlerQC.put(pd.DataFrame(struct.star_source_cat), + butlerQC.put(astropy.table.Table(struct.star_source_cat), outputRefs.isolated_star_sources) - butlerQC.put(pd.DataFrame(struct.star_cat), + butlerQC.put(astropy.table.Table(struct.star_cat), outputRefs.isolated_star_cat) - def run(self, skymap, tract, source_table_ref_dict): + def run(self, skymap, tract, source_table_handle_dict): """Run the isolated star association task. Parameters @@ -245,15 +245,16 @@ def run(self, skymap, tract, source_table_ref_dict): Skymap object. tract : `int` Tract number. - source_table_ref_dict : `dict` - Dictionary of source_table refs. Key is visit, value is dataref. + source_table_handle_dict : `dict` + Dictionary of source_table handles. Key is visit, value is + a `lsst.daf.butler.DeferredDatasetHandle`. Returns ------- struct : `lsst.pipe.base.struct` Struct with outputs for persistence. """ - star_source_cat = self._make_all_star_sources(skymap[tract], source_table_ref_dict) + star_source_cat = self._make_all_star_sources(skymap[tract], source_table_handle_dict) primary_bands = self.config.band_order @@ -297,15 +298,16 @@ def run(self, skymap, tract, source_table_ref_dict): return pipeBase.Struct(star_source_cat=star_source_cat, star_cat=primary_star_cat) - def _make_all_star_sources(self, tract_info, source_table_ref_dict): + def _make_all_star_sources(self, tract_info, source_table_handle_dict): """Make a catalog of all the star sources. Parameters ---------- tract_info : `lsst.skymap.TractInfo` Information about the tract. - source_table_ref_dict : `dict` - Dictionary of source_table refs. Key is visit, value is dataref. + source_table_handle_dict : `dict` + Dictionary of source_table handles. Key is visit, value is + a `lsst.daf.butler.DeferredDatasetHandle`. Returns ------- @@ -319,14 +321,13 @@ def _make_all_star_sources(self, tract_info, source_table_ref_dict): poly = tract_info.outer_sky_polygon tables = [] - for visit in source_table_ref_dict: - source_table_ref = source_table_ref_dict[visit] - df = source_table_ref.get(parameters={'columns': all_columns}) - df.reset_index(inplace=True) + for visit in source_table_handle_dict: + source_table_ref = source_table_handle_dict[visit] + tbl = source_table_ref.get(parameters={'columns': all_columns}) - goodSrc = self.source_selector.selectSources(df) + goodSrc = self.source_selector.selectSources(tbl) - table = df[persist_columns][goodSrc.selected].to_records() + table = np.asarray(tbl[persist_columns][goodSrc.selected]) # Append columns that include the row in the source table # and the matched object index (to be filled later). diff --git a/python/lsst/pipe/tasks/multiBand.py b/python/lsst/pipe/tasks/multiBand.py index a575219b6..0cfb09e24 100644 --- a/python/lsst/pipe/tasks/multiBand.py +++ b/python/lsst/pipe/tasks/multiBand.py @@ -280,7 +280,7 @@ class MeasureMergedCoaddSourcesConnections( "These tables contain astrometry and photometry flags, and optionally " "PSF flags."), name="sourceTable_visit", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument", "visit"), multiple=True, deferLoad=True, @@ -289,7 +289,7 @@ class MeasureMergedCoaddSourcesConnections( doc=("Finalized source tables from ``FinalizeCalibrationTask``. These " "tables contain PSF flags from the finalized PSF estimation."), name="finalized_src_table", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument", "visit"), multiple=True, deferLoad=True, diff --git a/python/lsst/pipe/tasks/postprocess.py b/python/lsst/pipe/tasks/postprocess.py index 2418056fa..0dd96fc03 100644 --- a/python/lsst/pipe/tasks/postprocess.py +++ b/python/lsst/pipe/tasks/postprocess.py @@ -35,17 +35,22 @@ "TransformForcedSourceTableConfig", "TransformForcedSourceTableTask", "ConsolidateTractConfig", "ConsolidateTractTask"] +import dataclasses import functools -import pandas as pd import logging -import numpy as np import numbers import os +import numpy as np +import pandas as pd +import astropy.table +import astropy.utils.metadata + import lsst.geom import lsst.pex.config as pexConfig import lsst.pipe.base as pipeBase import lsst.daf.base as dafBase +from lsst.daf.butler.formatters.parquet import pandas_to_astropy from lsst.pipe.base import connectionTypes import lsst.afw.table as afwTable from lsst.afw.image import ExposureSummaryStats, ExposureF @@ -78,6 +83,95 @@ def flattenFilters(df, noDupCols=["coord_ra", "coord_dec"], camelCase=False, inp return newDf +class TableVStack: + """A helper class for stacking astropy tables without having them all in + memory at once. + + Parameters + ---------- + capacity : `int` + Full size of the final table. + + Notes + ----- + Unlike `astropy.table.vstack`, this class requires all tables to have the + exact same columns (it's slightly more strict than even the + ``join_type="exact"`` argument to `astropy.table.vstack`). + """ + + def __init__(self, capacity): + self.index = 0 + self.capacity = capacity + self.result = None + + @classmethod + def from_handles(cls, handles): + """Construct from an iterable of + `lsst.daf.butler.DeferredDatasetHandle`. + + Parameters + ---------- + handles : `~collections.abc.Iterable` [ \ + `lsst.daf.butler.DeferredDatasetHandle` ] + Iterable of handles. Must have a storage class that supports the + "rowcount" component, which is all that will be fetched. + + Returns + ------- + vstack : `TableVStack` + An instance of this class, initialized with capacity equal to the + sum of the rowcounts of all the given table handles. + """ + capacity = sum(handle.get(component="rowcount") for handle in handles) + return cls(capacity=capacity) + + def extend(self, table): + """Add a single table to the stack. + + Parameters + ---------- + table : `astropy.table.Table` + An astropy table instance. + """ + if self.result is None: + self.result = astropy.table.Table() + for name in table.colnames: + column = table[name] + column_cls = type(column) + self.result[name] = column_cls.info.new_like([column], self.capacity, name=name) + self.index = len(table) + self.result.meta = table.meta.copy() + else: + next_index = self.index + len(table) + for name in table.colnames: + self.result[name][self.index:next_index] = table[name] + self.index = next_index + self.result.meta = astropy.utils.metadata.merge(self.result.meta, table.meta) + + @classmethod + def vstack_handles(cls, handles): + """Vertically stack tables represented by deferred dataset handles. + + Parameters + ---------- + handles : `~collections.abc.Iterable` [ \ + `lsst.daf.butler.DeferredDatasetHandle` ] + Iterable of handles. Must have the "ArrowAstropy" storage class + and identical columns. + + Returns + ------- + table : `astropy.table.Table` + Concatenated table with the same columns as each input table and + the rows of all of them. + """ + handles = tuple(handles) # guard against single-pass iterators + vstack = cls.from_handles(handles) + for handle in handles: + vstack.extend(handle.get()) + return vstack.result + + class WriteObjectTableConnections(pipeBase.PipelineTaskConnections, defaultTemplates={"coaddName": "deep"}, dimensions=("tract", "patch", "skymap")): @@ -199,10 +293,9 @@ class WriteSourceTableConnections(pipeBase.PipelineTaskConnections, dimensions=("instrument", "visit", "detector") ) outputCatalog = connectionTypes.Output( - doc="Catalog of sources, `src` in DataFrame/Parquet format. The 'id' column is " - "replaced with an index; all other columns are unchanged.", + doc="Catalog of sources, `src` in Astropy/Parquet format. Columns are unchanged.", name="{catalogType}source", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument", "visit", "detector") ) @@ -227,7 +320,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): butlerQC.put(outputs, outputRefs) def run(self, catalog, visit, detector, **kwargs): - """Convert `src` catalog to DataFrame + """Convert `src` catalog to an Astropy table. Parameters ---------- @@ -244,15 +337,15 @@ def run(self, catalog, visit, detector, **kwargs): ------- result : `~lsst.pipe.base.Struct` ``table`` - `DataFrame` version of the input catalog + `astropy.table.Table` version of the input catalog """ self.log.info("Generating DataFrame from src catalog visit,detector=%i,%i", visit, detector) - df = catalog.asAstropy().to_pandas().set_index("id", drop=True) - df["visit"] = visit + tbl = catalog.asAstropy() + tbl["visit"] = visit # int16 instead of uint8 because databases don't like unsigned bytes. - df["detector"] = np.int16(detector) + tbl["detector"] = np.int16(detector) - return pipeBase.Struct(table=df) + return pipeBase.Struct(table=tbl) class WriteRecalibratedSourceTableConnections(WriteSourceTableConnections, @@ -558,7 +651,7 @@ class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections, ) outputCatalog = connectionTypes.Output( name="", - storageClass="DataFrame", + storageClass="ArrowAstropy", ) @@ -685,8 +778,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): "Must be a valid path to yaml in order to run Task as a PipelineTask.") result = self.run(handle=inputs["inputCatalog"], funcs=self.funcs, dataId=dict(outputRefs.outputCatalog.dataId.mapping)) - outputs = pipeBase.Struct(outputCatalog=result) - butlerQC.put(outputs, outputRefs) + butlerQC.put(result, outputRefs) def run(self, handle, funcs=None, dataId=None, band=None): """Do postprocessing calculations @@ -710,13 +802,16 @@ def run(self, handle, funcs=None, dataId=None, band=None): Returns ------- - df : `pandas.DataFrame` + result : `lsst.pipe.base.Struct` + Result struct, with a single ``outputCatalog`` attribute holding + the transformed catalog. """ self.log.info("Transforming/standardizing the source table dataId: %s", dataId) df = self.transform(band, handle, funcs, dataId).df self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df)) - return df + result = pipeBase.Struct(outputCatalog=pandas_to_astropy(df)) + return result def getFunctors(self): return self.funcs @@ -767,10 +862,15 @@ class TransformObjectCatalogConnections(pipeBase.PipelineTaskConnections, doc="Per-Patch Object Table of columns transformed from the deepCoadd_obj table per the standard " "data model.", dimensions=("tract", "patch", "skymap"), - storageClass="DataFrame", + storageClass="ArrowAstropy", name="objectTable" ) + def __init__(self, *, config=None): + super().__init__(config=config) + if config.multilevelOutput: + self.outputCatalog = dataclasses.replace(self.outputCatalog, storageClass="DataFrame") + class TransformObjectCatalogConfig(TransformCatalogBaseConfig, pipelineConnections=TransformObjectCatalogConnections): @@ -797,7 +897,9 @@ class TransformObjectCatalogConfig(TransformCatalogBaseConfig, dtype=bool, default=False, doc=("Whether results dataframe should have a multilevel column index (True) or be flat " - "and name-munged (False).") + "and name-munged (False). If True, the output storage class will be " + "set to DataFrame, since astropy tables do not support multi-level indexing."), + deprecated="Support for multi-level outputs is deprecated and will be removed after v29.", ) goodFlags = pexConfig.ListField( dtype=str, @@ -903,10 +1005,13 @@ def run(self, handle, funcs=None, dataId=None, band=None): noDupCols += self.config.columnsFromDataId df = flattenFilters(df, noDupCols=noDupCols, camelCase=self.config.camelCase, inputBands=inputBands) + tbl = pandas_to_astropy(df) + else: + tbl = df - self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df)) + self.log.info("Made a table of %d columns and %d rows", len(tbl.columns), len(tbl)) - return df + return pipeBase.Struct(outputCatalog=tbl) class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections, @@ -914,14 +1019,15 @@ class ConsolidateObjectTableConnections(pipeBase.PipelineTaskConnections, inputCatalogs = connectionTypes.Input( doc="Per-Patch objectTables conforming to the standard data model.", name="objectTable", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("tract", "patch", "skymap"), multiple=True, + deferLoad=True, ) outputCatalog = connectionTypes.Output( doc="Pre-tract horizontal concatenation of the input objectTables", name="objectTable_tract", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("tract", "skymap"), ) @@ -950,8 +1056,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) self.log.info("Concatenating %s per-patch Object Tables", len(inputs["inputCatalogs"])) - df = pd.concat(inputs["inputCatalogs"]) - butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) + table = TableVStack.vstack_handles(inputs["inputCatalogs"]) + butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs) class TransformSourceTableConnections(pipeBase.PipelineTaskConnections, @@ -969,7 +1075,7 @@ class TransformSourceTableConnections(pipeBase.PipelineTaskConnections, doc="Narrower, per-detector Source Table transformed and converted per a " "specified set of functors", name="{catalogType}sourceTable", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument", "visit", "detector") ) @@ -1125,14 +1231,15 @@ class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections, inputCatalogs = connectionTypes.Input( doc="Input per-detector Source Tables", name="{catalogType}sourceTable", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument", "visit", "detector"), - multiple=True + multiple=True, + deferLoad=True, ) outputCatalog = connectionTypes.Output( doc="Per-visit concatenation of Source Table", name="{catalogType}sourceTable_visit", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument", "visit") ) @@ -1160,8 +1267,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) self.log.info("Concatenating %s per-detector Source Tables", len(inputs["inputCatalogs"])) - df = pd.concat(inputs["inputCatalogs"]) - butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) + table = TableVStack.vstack_handles(inputs["inputCatalogs"]) + butlerQC.put(pipeBase.Struct(outputCatalog=table), outputRefs) class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections, @@ -1178,7 +1285,7 @@ class MakeCcdVisitTableConnections(pipeBase.PipelineTaskConnections, outputCatalog = connectionTypes.Output( doc="CCD and Visit metadata table", name="ccdVisitTable", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument",) ) @@ -1231,16 +1338,17 @@ def run(self, visitSummaryRefs): "effTime", "effTimePsfSigmaScale", "effTimeSkyBgScale", "effTimeZeroPointScale", "magLim"] - ccdEntry = summaryTable[selectColumns].to_pandas().set_index("id") + ccdEntry = summaryTable[selectColumns] # 'visit' is the human readable visit number. # 'visitId' is the key to the visitId table. They are the same. # Technically you should join to get the visit from the visit # table. - ccdEntry = ccdEntry.rename(columns={"visit": "visitId"}) + ccdEntry.rename_column("visit", "visitId") + ccdEntry.rename_column("id", "detectorId") # RFC-924: Temporarily keep a duplicate "decl" entry for backwards # compatibility. To be removed after September 2023. - ccdEntry["decl"] = ccdEntry.loc[:, "dec"] + ccdEntry["decl"] = ccdEntry["dec"] ccdEntry["ccdVisitId"] = [ self.config.idGenerator.apply( @@ -1259,12 +1367,13 @@ def run(self, visitSummaryRefs): visitSummary["psfSigma"] * visitSummary["pixelScale"] * np.sqrt(8 * np.log(2)) ) ccdEntry["skyRotation"] = visitInfo.getBoresightRotAngle().asDegrees() - ccdEntry["expMidpt"] = visitInfo.getDate().toPython() + ccdEntry["expMidpt"] = np.datetime64(visitInfo.getDate().toPython(), "ns") ccdEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD) + expTime = visitInfo.getExposureTime() ccdEntry["obsStart"] = ( - ccdEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=ccdEntry["expTime"].values[0]) + ccdEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns") ) - expTime_days = ccdEntry["expTime"] / (60*60*24) + expTime_days = expTime / (60*60*24) ccdEntry["obsStartMJD"] = ccdEntry["expMidptMJD"] - 0.5 * expTime_days ccdEntry["darkTime"] = visitInfo.getDarkTime() ccdEntry["xSize"] = summaryTable["bbox_max_x"] - summaryTable["bbox_min_x"] @@ -1282,8 +1391,7 @@ def run(self, visitSummaryRefs): # values are actually wanted. ccdEntries.append(ccdEntry) - outputCatalog = pd.concat(ccdEntries) - outputCatalog.set_index("ccdVisitId", inplace=True, verify_integrity=True) + outputCatalog = astropy.table.vstack(ccdEntries, join_type="exact") return pipeBase.Struct(outputCatalog=outputCatalog) @@ -1301,7 +1409,7 @@ class MakeVisitTableConnections(pipeBase.PipelineTaskConnections, outputCatalog = connectionTypes.Output( doc="Visit metadata table", name="visitTable", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("instrument",) ) @@ -1359,9 +1467,9 @@ def run(self, visitSummaries): visitEntry["airmass"] = visitInfo.getBoresightAirmass() expTime = visitInfo.getExposureTime() visitEntry["expTime"] = expTime - visitEntry["expMidpt"] = visitInfo.getDate().toPython() + visitEntry["expMidpt"] = np.datetime64(visitInfo.getDate().toPython(), "ns") visitEntry["expMidptMJD"] = visitInfo.getDate().get(dafBase.DateTime.MJD) - visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * pd.Timedelta(seconds=expTime) + visitEntry["obsStart"] = visitEntry["expMidpt"] - 0.5 * np.timedelta64(int(expTime * 1E9), "ns") expTime_days = expTime / (60*60*24) visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days visitEntries.append(visitEntry) @@ -1370,8 +1478,7 @@ def run(self, visitSummaries): # mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp, # dimmSeeing, pwvGPS, pwvMW, flags, nExposures. - outputCatalog = pd.DataFrame(data=visitEntries) - outputCatalog.set_index("visitId", inplace=True, verify_integrity=True) + outputCatalog = astropy.table.Table(rows=visitEntries) return pipeBase.Struct(outputCatalog=outputCatalog) diff --git a/python/lsst/pipe/tasks/propagateSourceFlags.py b/python/lsst/pipe/tasks/propagateSourceFlags.py index 76b5bcd44..4c0c6e302 100644 --- a/python/lsst/pipe/tasks/propagateSourceFlags.py +++ b/python/lsst/pipe/tasks/propagateSourceFlags.py @@ -184,7 +184,7 @@ def run(self, coadd_object_cat, ccd_inputs, self.log.info("Visit %d not in input handle dict for %s", visit, name) continue handle = handle_dict[visit] - df = handle.get(parameters={"columns": columns}) + tbl = handle.get(parameters={"columns": columns}) # Loop over all ccd_inputs rows for this visit. for row in ccd_inputs[ccd_inputs["visit"] == visit]: @@ -195,13 +195,13 @@ def run(self, coadd_object_cat, ccd_inputs, "propagate flags. Skipping...", visit, detector) continue - df_det = df[df["detector"] == detector] + tbl_det = tbl[tbl["detector"] == detector] - if len(df_det) == 0: + if len(tbl_det) == 0: continue - ra, dec = wcs.pixelToSkyArray(df_det[x_col].values, - df_det[y_col].values, + ra, dec = wcs.pixelToSkyArray(np.asarray(tbl_det[x_col]), + np.asarray(tbl_det[y_col]), degrees=True) try: @@ -225,7 +225,7 @@ def run(self, coadd_object_cat, ccd_inputs, continue for flag in flag_counts: - flag_values = df_det[flag].values + flag_values = np.asarray(tbl_det[flag]) flag_counts[flag][i1] += flag_values[i2].astype(np.int32) for flag in source_flag_counts: diff --git a/schemas/PreSource.yaml b/schemas/PreSource.yaml deleted file mode 100644 index 80b78a304..000000000 --- a/schemas/PreSource.yaml +++ /dev/null @@ -1,456 +0,0 @@ -# This file defines the mapping between the columns in a single visit+detector -# source table and their respective DPDD-style column names, as used by -# `lsst.pipe.tasks.postprocess.TransformSourceTableTask`. -# See the DPDD for more information about the output: https://lse-163.lsst.io -funcs: - sourceId: - functor: Column - args: id - coord_ra: - # reference position required by db. Not in DPDD - functor: CoordColumn - args: coord_ra - coord_dec: - # Reference position required by db. Not in DPDD - functor: CoordColumn - args: coord_dec - # objectId: not avaliable - # ssObjectId: not avaliable - parentSourceId: - functor: Column - args: parent - # htmId20: not avaliable - x: - functor: Column - args: slot_Centroid_x - y: - functor: Column - args: slot_Centroid_y - xErr: - functor: Column - args: slot_Centroid_xErr - yErr: - functor: Column - args: slot_Centroid_yErr - # x_y_Cov: not available - ra: - functor: RAColumn - dec: - functor: DecColumn - - # RFC-924: Temporarily keep a duplicate "decl" entry for backwards - # compatibility. To be removed after September 2023. - decl: - functor: DecColumn - - raErr: - functor: RAErrColumn - decErr: - functor: DecErrColumn - ra_dec_Cov: - functor: RADecCovColumn - # One calibrated Calib flux is important: - calibFlux: - functor: LocalNanojansky - args: - - slot_CalibFlux_instFlux - - slot_CalibFlux_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - calibFluxErr: - functor: LocalNanojanskyErr - args: - - slot_CalibFlux_instFlux - - slot_CalibFlux_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - # Not in DPDD. Used for QA - ap03Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_3_0_instFlux - - base_CircularApertureFlux_3_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap03FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_3_0_instFlux - - base_CircularApertureFlux_3_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap03Flux_flag: - functor: Column - args: base_CircularApertureFlux_3_0_flag - # if we need to add decimal apertures call them e.g. ap04p5Flux - ap06Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_6_0_instFlux - - base_CircularApertureFlux_6_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap06FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_6_0_instFlux - - base_CircularApertureFlux_6_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap06Flux_flag: - functor: Column - args: base_CircularApertureFlux_6_0_flag - ap09Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_9_0_instFlux - - base_CircularApertureFlux_9_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap09FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_9_0_instFlux - - base_CircularApertureFlux_9_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap09Flux_flag: - functor: Column - args: base_CircularApertureFlux_9_0_flag - ap12Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_12_0_instFlux - - base_CircularApertureFlux_12_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap12FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_12_0_instFlux - - base_CircularApertureFlux_12_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap12Flux_flag: - functor: Column - args: base_CircularApertureFlux_12_0_flag - ap17Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_17_0_instFlux - - base_CircularApertureFlux_17_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap17FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_17_0_instFlux - - base_CircularApertureFlux_17_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap17Flux_flag: - functor: Column - args: base_CircularApertureFlux_17_0_flag - ap25Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_25_0_instFlux - - base_CircularApertureFlux_25_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap25FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_25_0_instFlux - - base_CircularApertureFlux_25_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap25Flux_flag: - functor: Column - args: base_CircularApertureFlux_25_0_flag - ap35Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_35_0_instFlux - - base_CircularApertureFlux_35_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap35FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_35_0_instFlux - - base_CircularApertureFlux_35_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap35Flux_flag: - functor: Column - args: base_CircularApertureFlux_35_0_flag - ap50Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_50_0_instFlux - - base_CircularApertureFlux_50_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap50FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_50_0_instFlux - - base_CircularApertureFlux_50_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap50Flux_flag: - functor: Column - args: base_CircularApertureFlux_50_0_flag - ap70Flux: - functor: LocalNanojansky - args: - - base_CircularApertureFlux_70_0_instFlux - - base_CircularApertureFlux_70_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap70FluxErr: - functor: LocalNanojanskyErr - args: - - base_CircularApertureFlux_70_0_instFlux - - base_CircularApertureFlux_70_0_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - ap70Flux_flag: - functor: Column - args: base_CircularApertureFlux_70_0_flag - # TODO: When DM-25019 is complete, these should be - # changed to use the local value of the background - # model, rather than the residual of the background - sky: - functor: LocalNanojansky - args: - - base_LocalBackground_instFlux - - base_LocalBackground_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - skyErr: - functor: LocalNanojanskyErr - args: - - base_LocalBackground_instFlux - - base_LocalBackground_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - psfFlux: - functor: LocalNanojansky - args: - - slot_PsfFlux_instFlux - - slot_PsfFlux_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - psfFluxErr: - functor: LocalNanojanskyErr - args: - - slot_PsfFlux_instFlux - - slot_PsfFlux_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - - # These PS columns do not make sense anymore as named - # psX - # psXSigma - # psY - # psYSigma - # psFlux_psX_Cov - # psFlux_psY_Cov - # psX_Y_Cov - # psLnL - # psChi2 - # psN - # psRa - # psRaSigma - # psDecl - # psDeclSigma - # psFlux_psRa_Cov - # psFlux_psDecl_Cov - - ixx: - functor: Column - args: slot_Shape_xx - iyy: - functor: Column - args: slot_Shape_yy - ixy: - functor: Column - args: slot_Shape_xy - # DPDD should include Psf Shape - ixxPSF: - functor: Column - args: slot_PsfShape_xx - iyyPSF: - functor: Column - args: slot_PsfShape_yy - ixyPSF: - functor: Column - args: slot_PsfShape_xy - ixxDebiasedPSF: - functor: Column - args: ext_shapeHSM_HsmPsfMomentsDebiased_xx - iyyDebiasedPSF: - functor: Column - args: ext_shapeHSM_HsmPsfMomentsDebiased_yy - ixyDebiasedPSF: - functor: Column - args: ext_shapeHSM_HsmPsfMomentsDebiased_xy - # apNann: Replaced by raw Aperture instFluxes in flags section below - # apMeanSb: Replaced by raw Aperture instFluxes in flags section below - # apMeanSbErr: Replaced by raw Aperture instFluxes in flags section below - - # DPDD does not include gaussianFluxes, however they are used for - # the single frame extendedness column which is used for QA. - gaussianFlux: - functor: LocalNanojansky - args: - - base_GaussianFlux_instFlux - - base_GaussianFlux_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - gaussianFluxErr: - functor: LocalNanojanskyErr - args: - - base_GaussianFlux_instFlux - - base_GaussianFlux_instFluxErr - - base_LocalPhotoCalib - - base_LocalPhotoCalibErr - extendedness: - functor: Column - args: base_ClassificationExtendedness_value - sizeExtendedness: - functor: Column - args: base_ClassificationSizeExtendedness_value -flags: - - base_LocalPhotoCalib - - base_LocalPhotoCalib_flag - - base_LocalPhotoCalibErr - - base_LocalWcs_flag - - base_LocalWcs_CDMatrix_2_1 - - base_LocalWcs_CDMatrix_1_1 - - base_LocalWcs_CDMatrix_1_2 - - base_LocalWcs_CDMatrix_2_2 - - base_Blendedness_abs - - base_Blendedness_flag - - base_Blendedness_flag_noCentroid - - base_Blendedness_flag_noShape - - base_CircularApertureFlux_12_0_flag - - base_CircularApertureFlux_12_0_flag_apertureTruncated - - base_CircularApertureFlux_12_0_flag_sincCoeffsTruncated - - base_CircularApertureFlux_12_0_instFlux - - base_CircularApertureFlux_12_0_instFluxErr - - base_CircularApertureFlux_17_0_flag - - base_CircularApertureFlux_17_0_instFlux - - base_CircularApertureFlux_17_0_instFluxErr - - base_CircularApertureFlux_35_0_flag - - base_CircularApertureFlux_35_0_instFlux - - base_CircularApertureFlux_35_0_instFluxErr - - base_CircularApertureFlux_50_0_flag - - base_CircularApertureFlux_50_0_instFlux - - base_CircularApertureFlux_50_0_instFluxErr - - base_NormalizedCompensatedTophatFlux_flag - - base_NormalizedCompensatedTophatFlux_instFlux - - base_NormalizedCompensatedTophatFlux_instFluxErr - - base_ClassificationExtendedness_flag - - base_ClassificationSizeExtendedness_flag - - base_FootprintArea_value - - base_InvalidPsf_flag - - base_Jacobian_flag - - base_Jacobian_value - - base_LocalBackground_instFlux - - base_LocalBackground_instFluxErr - - base_LocalBackground_flag - - base_LocalBackground_flag_noGoodPixels - - base_LocalBackground_flag_noPsf - - base_PixelFlags_flag_bad - - base_PixelFlags_flag_cr - - base_PixelFlags_flag_crCenter - - base_PixelFlags_flag_edge - - base_PixelFlags_flag_interpolated - - base_PixelFlags_flag_interpolatedCenter - - base_PixelFlags_flag_offimage - - base_PixelFlags_flag_saturated - - base_PixelFlags_flag_saturatedCenter - - base_PixelFlags_flag_suspect - - base_PixelFlags_flag_suspectCenter - # Streak flags not yet propagated from compareWarp (DM-46947, DM-43077). - # - base_PixelFlags_flag_streak - # - base_PixelFlags_flag_streakCenter - - base_PsfFlux_apCorr - - base_PsfFlux_apCorrErr - - base_PsfFlux_area - - base_PsfFlux_flag - - base_PsfFlux_flag_apCorr - - base_PsfFlux_flag_edge - - base_PsfFlux_flag_noGoodPixels - - base_GaussianFlux_flag - - base_SdssCentroid_flag - - base_SdssCentroid_flag_almostNoSecondDerivative - - base_SdssCentroid_flag_badError - - base_SdssCentroid_flag_edge - - base_SdssCentroid_flag_noSecondDerivative - - base_SdssCentroid_flag_notAtMaximum - - base_SdssCentroid_flag_resetToPeak - - base_Variance_flag - - base_Variance_flag_emptyFootprint - - base_Variance_value - - calib_astrometry_used - - calib_photometry_reserved - - calib_photometry_used - - calib_psf_candidate - - calib_psf_reserved - - calib_psf_used - - deblend_deblendedAsPsf - - deblend_hasStrayFlux - - deblend_masked - - deblend_nChild - - deblend_parentTooBig - - deblend_patchedTemplate - - deblend_rampedTemplate - - deblend_skipped - - deblend_tooManyPeaks - - ext_shapeHSM_HsmPsfMoments_flag - - ext_shapeHSM_HsmPsfMoments_flag_no_pixels - - ext_shapeHSM_HsmPsfMoments_flag_not_contained - - ext_shapeHSM_HsmPsfMoments_flag_parent_source - - ext_shapeHSM_HsmPsfMomentsDebiased_flag - - ext_shapeHSM_HsmPsfMomentsDebiased_flag_no_pixels - - ext_shapeHSM_HsmPsfMomentsDebiased_flag_not_contained - - ext_shapeHSM_HsmPsfMomentsDebiased_flag_parent_source - - ext_shapeHSM_HsmPsfMomentsDebiased_flag_galsim - - ext_shapeHSM_HsmPsfMomentsDebiased_flag_edge - - ext_shapeHSM_HsmShapeRegauss_flag - - ext_shapeHSM_HsmShapeRegauss_flag_galsim - - ext_shapeHSM_HsmShapeRegauss_flag_no_pixels - - ext_shapeHSM_HsmShapeRegauss_flag_not_contained - - ext_shapeHSM_HsmShapeRegauss_flag_parent_source - - sky_source - - detect_isPrimary - -flag_rename_rules: - # Taken from db-meas-forced - - ['ext_photometryKron_', ''] - - ['base_Blendedness', 'base_blendedness'] - - ['base_Local', 'local'] - - ['base_PixelFlags_flag', 'pixelFlags'] - - ['base_ClassificationE', 'e'] - - ['base_ClassificationS', 's'] - - ['base_SdssCentroid', 'centroid'] - - ['base_Variance', 'variance'] - - ['base_Psf', 'psf'] - - ['base_InvalidPsf_flag', 'invalidPsfFlag'] - - ['base_GaussianFlux', 'gaussianFlux'] - - ['base_CircularApertureFlux', 'apFlux'] - - ['base_NormalizedCompensatedTophatFlux', 'normCompTophatFlux'] - - ['base_FootprintArea', 'footprintArea'] - - ['base_Jacobian', 'jacobian'] - - ['ext_shapeHSM_HsmPsfMomentsDebiased', 'iDebiasedPSF'] - - ['ext_shapeHSM_Hsm', 'hsm'] - - ['ext_convolved_', ''] - - ['undeblended_base', 'undeblended'] - - ['undeblended_ext_photometryKron', 'undeblended'] - - ['ext_photometryKron_', ''] - - ['base_', ''] - - ['slot_', ''] diff --git a/schemas/Source.yaml b/schemas/Source.yaml index dc3ee9a50..493d09680 100644 --- a/schemas/Source.yaml +++ b/schemas/Source.yaml @@ -4,7 +4,8 @@ # See the DPDD for more information about the output: https://lse-163.lsst.io funcs: sourceId: - functor: Index + functor: Column + args: id coord_ra: # reference position required by db. Not in DPDD functor: CoordColumn @@ -13,18 +14,12 @@ funcs: # Reference position required by db. Not in DPDD functor: CoordColumn args: coord_dec - visit: - functor: Column - args: visit - detector: - functor: Column - args: detector - # objectId: not avaliable - # ssObjectId: not avaliable + # objectId: not available + # ssObjectId: not available parentSourceId: functor: Column args: parent - # htmId20: not avaliable + # htmId20: not available x: functor: Column args: slot_Centroid_x diff --git a/tests/test_finalizeCharacterization.py b/tests/test_finalizeCharacterization.py index 174b4383d..3e65a0c66 100644 --- a/tests/test_finalizeCharacterization.py +++ b/tests/test_finalizeCharacterization.py @@ -23,8 +23,9 @@ """ import logging import unittest + +import astropy.table.table import numpy as np -import pandas as pd import lsst.utils.tests import lsst.afw.table as afwTable @@ -150,10 +151,10 @@ def _make_isocats(self): source_cat = np.concatenate(source_cats) - isolated_star_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(pd.DataFrame(cat), - storageClass="DataFrame") - isolated_star_source_dict[tract] = pipeBase.InMemoryDatasetHandle(pd.DataFrame(source_cat), - storageClass="DataFrame") + isolated_star_cat_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(cat), + storageClass="ArrowAstropy") + isolated_star_source_dict[tract] = pipeBase.InMemoryDatasetHandle(astropy.table.Table(source_cat), + storageClass="ArrowAstropy") return isolated_star_cat_dict, isolated_star_source_dict diff --git a/tests/test_isolatedStarAssociation.py b/tests/test_isolatedStarAssociation.py index 3cc0ac211..5be01991b 100644 --- a/tests/test_isolatedStarAssociation.py +++ b/tests/test_isolatedStarAssociation.py @@ -22,8 +22,9 @@ """Test IsolatedStarAssociationTask. """ import unittest + +import astropy.table import numpy as np -import pandas as pd import lsst.utils.tests import lsst.pipe.base @@ -43,11 +44,10 @@ class IsolatedStarAssociationTestCase(lsst.utils.tests.TestCase): def setUp(self): self.skymap = self._make_skymap() self.tract = 9813 - self.data_refs = self._make_simdata(self.tract) - self.visits = np.arange(len(self.data_refs)) + 1 + self.handles = self._make_simdata(self.tract) + self.visits = np.arange(len(self.handles)) + 1 - self.data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits, - self.data_refs)} + self.handle_dict = {visit: handle for visit, handle in zip(self.visits, self.handles)} config = IsolatedStarAssociationConfig() config.band_order = ['i', 'r'] @@ -87,7 +87,7 @@ def _make_simdata(self, Returns ------- - data_refs : `list` [`InMemoryDatasetHandle`] + handles : `list` [`InMemoryDatasetHandle`] List of mock references. """ np.random.seed(12345) @@ -131,7 +131,7 @@ def _make_simdata(self, id_counter = 0 visit_counter = 1 - data_refs = [] + handles = [] for band in ['r', 'i']: if band == 'r': filtername = 'R FILTER' @@ -186,9 +186,8 @@ def _make_simdata(self, # Make one star have low s/n table['normCompTophatFlux_instFlux'][0] = 1.0 - df = pd.DataFrame(table) - df.set_index('sourceId', inplace=True) - data_refs.append(lsst.pipe.base.InMemoryDatasetHandle(df, storageClass="DataFrame")) + tbl = astropy.table.Table(table) + handles.append(lsst.pipe.base.InMemoryDatasetHandle(tbl, storageClass="ArrowAstropy")) id_counter += nstar visit_counter += 1 @@ -203,7 +202,7 @@ def _make_simdata(self, self.star_ras = np.concatenate((ra_both, ra_just_r, ra_just_i, ra_neighbor)) self.star_decs = np.concatenate((dec_both, dec_just_r, dec_just_i, dec_neighbor)) - return data_refs + return handles def test_compute_unique_ids(self): """Test computation of unique ids.""" @@ -234,9 +233,9 @@ def test_match_primary_stars(self): """Test matching primary stars.""" # Stack all the sources; we do not want any cutting here. tables = [] - for data_ref in self.data_refs: - df = data_ref.get() - tables.append(df.to_records()) + for handle in self.handles: + tbl = handle.get() + tables.append(np.asarray(tbl)) source_cat = np.concatenate(tables) primary_star_cat = self.isolatedStarAssociationTask._match_primary_stars(['i', 'r'], @@ -271,9 +270,9 @@ def test_match_sources(self): """Test _match_sources source to primary matching.""" # Stack all the sources; we do not want any cutting here. tables = [] - for data_ref in self.data_refs: - df = data_ref.get() - tables.append(df.to_records()) + for handle in self.handles: + tbl = handle.get() + tables.append(np.asarray(tbl)) source_cat = np.concatenate(tables) source_cat = np.lib.recfunctions.append_fields(source_cat, @@ -300,7 +299,7 @@ def test_match_sources(self): def test_make_all_star_sources(self): """Test appending all the star sources.""" source_cat = self.isolatedStarAssociationTask._make_all_star_sources(self.skymap[self.tract], - self.data_ref_dict) + self.handle_dict) # Make sure we don't have any low s/n sources. sn_min = np.min(source_cat['normCompTophatFlux_instFlux'] @@ -316,7 +315,7 @@ def test_run_isolated_star_association_task(self): """Test running the full task.""" struct = self.isolatedStarAssociationTask.run(self.skymap, self.tract, - self.data_ref_dict) + self.handle_dict) star_source_cat = struct.star_source_cat star_cat = struct.star_cat @@ -363,13 +362,12 @@ def test_run_isolated_star_association_task(self): def test_run_task_all_neighbors(self): """Test running the task when all the stars are rejected as neighbors.""" - data_refs = self._make_simdata(self.tract, only_neighbors=True) - data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits, - data_refs)} + handles = self._make_simdata(self.tract, only_neighbors=True) + handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} struct = self.isolatedStarAssociationTask.run(self.skymap, self.tract, - data_ref_dict) + handle_dict) # These should ber zero length. self.assertEqual(len(struct.star_source_cat), 0) @@ -380,13 +378,12 @@ def test_run_task_all_neighbors(self): def test_run_task_all_out_of_tract(self): """Test running the task when all the sources are out of the tract.""" - data_refs = self._make_simdata(self.tract, only_out_of_tract=True) - data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits, - data_refs)} + handles = self._make_simdata(self.tract, only_out_of_tract=True) + handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} struct = self.isolatedStarAssociationTask.run(self.skymap, self.tract, - data_ref_dict) + handle_dict) # These should ber zero length. self.assertEqual(len(struct.star_source_cat), 0) @@ -397,13 +394,12 @@ def test_run_task_all_out_of_tract(self): def test_run_task_all_out_of_inner_tract(self): """Test running the task when all the sources are out of the inner tract.""" - data_refs = self._make_simdata(self.tract, only_out_of_inner_tract=True) - data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits, - data_refs)} + handles = self._make_simdata(self.tract, only_out_of_inner_tract=True) + handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} struct = self.isolatedStarAssociationTask.run(self.skymap, self.tract, - data_ref_dict) + handle_dict) # These should ber zero length. self.assertEqual(len(struct.star_source_cat), 0) @@ -417,13 +413,12 @@ def test_run_task_secondary_no_overlap(self): This tests DM-34834. """ - data_refs = self._make_simdata(self.tract, no_secondary_overlap=True) - data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits, - data_refs)} + handles = self._make_simdata(self.tract, no_secondary_overlap=True) + handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)} struct = self.isolatedStarAssociationTask.run(self.skymap, self.tract, - data_ref_dict) + handle_dict) # Add a sanity check that we got a catalog out. self.assertGreater(len(struct.star_source_cat), 0) diff --git a/tests/test_transformObject.py b/tests/test_transformObject.py index 5a19bcb45..63b367216 100644 --- a/tests/test_transformObject.py +++ b/tests/test_transformObject.py @@ -21,6 +21,7 @@ import os import unittest +import astropy.table import pandas as pd import numpy as np @@ -65,33 +66,33 @@ def testNullFilter(self): 'IntColumn': Column('base_InputCount_value', dataset='meas'), 'GoodFlagColumn': Column('slot_GaussianFlux_flag', dataset='meas'), 'BadFlagColumn': Column('slot_Centroid_flag', dataset='meas')} - df = task.run(self.handle, funcs=funcs, dataId=self.dataId) - self.assertIsInstance(df, pd.DataFrame) + tbl = task.run(self.handle, funcs=funcs, dataId=self.dataId).outputCatalog + self.assertIsInstance(tbl, astropy.table.Table) for filt in config.outputBands: - self.assertIn(filt + 'FloatColumn', df.columns) - self.assertIn(filt + 'IntColumn', df.columns) - self.assertIn(filt + 'BadFlagColumn', df.columns) - self.assertIn(filt + 'GoodFlagColumn', df.columns) + self.assertIn(filt + 'FloatColumn', tbl.columns) + self.assertIn(filt + 'IntColumn', tbl.columns) + self.assertIn(filt + 'BadFlagColumn', tbl.columns) + self.assertIn(filt + 'GoodFlagColumn', tbl.columns) # Check that the default filling has worked. - self.assertNotIn('gFloatColumn', df.columns) - self.assertTrue(df['yFloatColumn'].isnull().all()) - self.assertTrue(df['iFloatColumn'].notnull().all()) - self.assertTrue(np.all(df['iIntColumn'].values >= 0)) - self.assertTrue(np.all(df['yIntColumn'].values < 0)) - self.assertTrue(np.all(~df['yGoodFlagColumn'].values)) - self.assertTrue(np.all(df['yBadFlagColumn'].values)) + self.assertNotIn('gFloatColumn', tbl.columns) + self.assertTrue(np.all(np.ma.is_masked(tbl['yFloatColumn']))) + self.assertFalse(np.any(np.ma.is_masked(tbl['iFloatColumn']))) + self.assertTrue(np.all(tbl['iIntColumn'] >= 0)) + self.assertTrue(np.all(tbl['yIntColumn'] < 0)) + self.assertTrue(np.all(~tbl['yGoodFlagColumn'])) + self.assertTrue(np.all(tbl['yBadFlagColumn'])) # Check that the datatypes are preserved. - self.assertEqual(df['iFloatColumn'].dtype, np.dtype(np.float64)) - self.assertEqual(df['yFloatColumn'].dtype, np.dtype(np.float64)) - self.assertEqual(df['iIntColumn'].dtype, np.dtype(np.int64)) - self.assertEqual(df['yIntColumn'].dtype, np.dtype(np.int64)) - self.assertEqual(df['iGoodFlagColumn'].dtype, np.dtype(np.bool_)) - self.assertEqual(df['yGoodFlagColumn'].dtype, np.dtype(np.bool_)) - self.assertEqual(df['iBadFlagColumn'].dtype, np.dtype(np.bool_)) - self.assertEqual(df['yBadFlagColumn'].dtype, np.dtype(np.bool_)) + self.assertEqual(tbl['iFloatColumn'].dtype, np.dtype(np.float64)) + self.assertEqual(tbl['yFloatColumn'].dtype, np.dtype(np.float64)) + self.assertEqual(tbl['iIntColumn'].dtype, np.dtype(np.int64)) + self.assertEqual(tbl['yIntColumn'].dtype, np.dtype(np.int64)) + self.assertEqual(tbl['iGoodFlagColumn'].dtype, np.dtype(np.bool_)) + self.assertEqual(tbl['yGoodFlagColumn'].dtype, np.dtype(np.bool_)) + self.assertEqual(tbl['iBadFlagColumn'].dtype, np.dtype(np.bool_)) + self.assertEqual(tbl['yBadFlagColumn'].dtype, np.dtype(np.bool_)) def testUnderscoreColumnFormat(self): """Test the per-filter column format with an underscore""" @@ -100,10 +101,10 @@ def testUnderscoreColumnFormat(self): config.camelCase = False task = TransformObjectCatalogTask(config=config) funcs = {'Fwhm': HsmFwhm(dataset='meas')} - df = task.run(self.handle, funcs=funcs, dataId=self.dataId) - self.assertIsInstance(df, pd.DataFrame) + tbl = task.run(self.handle, funcs=funcs, dataId=self.dataId).outputCatalog + self.assertIsInstance(tbl, astropy.table.Table) for filt in config.outputBands: - self.assertIn(filt + '_Fwhm', df.columns) + self.assertIn(filt + '_Fwhm', tbl.columns) def testMultilevelOutput(self): """Test the non-flattened result dataframe with a multilevel column index""" @@ -112,7 +113,7 @@ def testMultilevelOutput(self): config.multilevelOutput = True task = TransformObjectCatalogTask(config=config) funcs = {'Fwhm': HsmFwhm(dataset='meas')} - df = task.run(self.handle, funcs=funcs, dataId=self.dataId) + df = task.run(self.handle, funcs=funcs, dataId=self.dataId).outputCatalog self.assertIsInstance(df, pd.DataFrame) self.assertNotIn('g', df) for filt in config.outputBands: @@ -123,15 +124,13 @@ def testNoOutputBands(self): """All the input bands should go into the output, and nothing else. """ config = TransformObjectCatalogConfig() - config.multilevelOutput = True task = TransformObjectCatalogTask(config=config) funcs = {'Fwhm': HsmFwhm(dataset='meas')} - df = task.run(self.handle, funcs=funcs, dataId=self.dataId) - self.assertIsInstance(df, pd.DataFrame) - self.assertNotIn('HSC-G', df) + tbl = task.run(self.handle, funcs=funcs, dataId=self.dataId).outputCatalog + self.assertIsInstance(tbl, astropy.table.Table) + self.assertNotIn('HSC-G_Fwhm', tbl.columns) for filt in ['g', 'r', 'i']: - self.assertIsInstance(df[filt], pd.DataFrame) - self.assertIn('Fwhm', df[filt].columns) + self.assertIn(f'{filt}_Fwhm', tbl.columns) if __name__ == "__main__":