Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-34875: switch most DataFrame connections to ArrowAstropy #1010

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions python/lsst/pipe/tasks/finalizeCharacterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
'FinalizeCharacterizationConfig',
'FinalizeCharacterizationTask']

import astropy.table
import numpy as np
import esutil
import pandas as pd


import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
Expand Down Expand Up @@ -71,7 +72,7 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
doc=('Catalog of isolated stars with average positions, number of associated '
'sources, and indexes to the isolated_star_sources catalogs.'),
name='isolated_star_presource_associations',
storageClass='DataFrame',
storageClass='ArrowAstropy',
dimensions=('instrument', 'tract', 'skymap'),
deferLoad=True,
multiple=True,
Expand All @@ -80,7 +81,7 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
doc=('Catalog of isolated star sources with sourceIds, and indexes to the '
'isolated_star_cats catalogs.'),
name='isolated_star_presources',
storageClass='DataFrame',
storageClass='ArrowAstropy',
dimensions=('instrument', 'tract', 'skymap'),
deferLoad=True,
multiple=True,
Expand All @@ -96,7 +97,7 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
finalized_src_table = pipeBase.connectionTypes.Output(
doc=('Per-visit catalog of measurements for psf/flag/etc.'),
name='finalized_src_table',
storageClass='DataFrame',
storageClass='ArrowAstropy',
dimensions=('instrument', 'visit'),
)

Expand Down Expand Up @@ -293,7 +294,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):

butlerQC.put(struct.psf_ap_corr_cat,
outputRefs.finalized_psf_ap_corr_cat)
butlerQC.put(pd.DataFrame(struct.output_table),
butlerQC.put(astropy.table.Table(struct.output_table),
outputRefs.finalized_src_table)

def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
Expand Down Expand Up @@ -538,14 +539,13 @@ def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_
merge_source_counter = 0

for tract in isolated_star_cat_dict:
df_cat = isolated_star_cat_dict[tract].get()
table_cat = df_cat.to_records()
astropy_cat = isolated_star_cat_dict[tract].get()
table_cat = np.asarray(astropy_cat)

df_source = isolated_star_source_dict[tract].get(
parameters={'columns': [self.config.id_column,
'obj_index']}
astropy_source = isolated_star_source_dict[tract].get(
parameters={'columns': [self.config.id_column, 'obj_index']}
)
table_source = df_source.to_records()
table_source = np.asarray(astropy_source)

# Cut isolated star table to those observed in this band, and adjust indexes
(use_band,) = (table_cat[f'nsource_{band}'] > 0).nonzero()
Expand Down
59 changes: 30 additions & 29 deletions python/lsst/pipe/tasks/isolatedStarAssociation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
'IsolatedStarAssociationConfig',
'IsolatedStarAssociationTask']

import astropy.table
import esutil
import hpgeom as hpg
import numpy as np
import pandas as pd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warms my heart.

from smatch.matcher import Matcher

import lsst.pex.config as pexConfig
Expand All @@ -43,7 +43,7 @@ class IsolatedStarAssociationConnections(pipeBase.PipelineTaskConnections,
source_table_visit = pipeBase.connectionTypes.Input(
doc='Source table in parquet format, per visit',
name='preSourceTable_visit',
storageClass='DataFrame',
storageClass='ArrowAstropy',
dimensions=('instrument', 'visit'),
deferLoad=True,
multiple=True,
Expand All @@ -57,13 +57,13 @@ class IsolatedStarAssociationConnections(pipeBase.PipelineTaskConnections,
isolated_star_sources = pipeBase.connectionTypes.Output(
doc='Catalog of individual sources for the isolated stars',
name='isolated_star_presources',
storageClass='DataFrame',
storageClass='ArrowAstropy',
dimensions=('instrument', 'tract', 'skymap'),
)
isolated_star_cat = pipeBase.connectionTypes.Output(
doc='Catalog of isolated star positions',
name='isolated_star_presource_associations',
storageClass='DataFrame',
storageClass='ArrowAstropy',
dimensions=('instrument', 'tract', 'skymap'),
)

Expand Down Expand Up @@ -206,37 +206,37 @@ def __init__(self, **kwargs):
self.source_selector.log.setLevel(self.source_selector.log.WARN)

def runQuantum(self, butlerQC, inputRefs, outputRefs):
input_ref_dict = butlerQC.get(inputRefs)
input_handle_dict = butlerQC.get(inputRefs)

tract = butlerQC.quantum.dataId['tract']

source_table_refs = input_ref_dict['source_table_visit']
source_table_handles = input_handle_dict['source_table_visit']

self.log.info('Running with %d source_table_visit dataRefs',
len(source_table_refs))
self.log.info('Running with %d source_table_visit datasets',
len(source_table_handles))

source_table_ref_dict_temp = {source_table_ref.dataId['visit']: source_table_ref for
source_table_ref in source_table_refs}
source_table_handle_dict_temp = {source_table_handle.dataId['visit']: source_table_handle for
source_table_handle in source_table_handles}

bands = {source_table_ref.dataId['band'] for source_table_ref in source_table_refs}
bands = {source_table_handle.dataId['band'] for source_table_handle in source_table_handles}
for band in bands:
if band not in self.config.band_order:
self.log.warning('Input data has data from band %s but that band is not '
'configured for matching', band)

# TODO: Sort by visit until DM-31701 is done and we have deterministic
# dataset ordering.
source_table_ref_dict = {visit: source_table_ref_dict_temp[visit] for
visit in sorted(source_table_ref_dict_temp.keys())}
source_table_handle_dict = {visit: source_table_handle_dict_temp[visit] for
visit in sorted(source_table_handle_dict_temp.keys())}

struct = self.run(input_ref_dict['skymap'], tract, source_table_ref_dict)
struct = self.run(input_handle_dict['skymap'], tract, source_table_handle_dict)

butlerQC.put(pd.DataFrame(struct.star_source_cat),
butlerQC.put(astropy.table.Table(struct.star_source_cat),
outputRefs.isolated_star_sources)
butlerQC.put(pd.DataFrame(struct.star_cat),
butlerQC.put(astropy.table.Table(struct.star_cat),
outputRefs.isolated_star_cat)

def run(self, skymap, tract, source_table_ref_dict):
def run(self, skymap, tract, source_table_handle_dict):
"""Run the isolated star association task.

Parameters
Expand All @@ -245,15 +245,16 @@ def run(self, skymap, tract, source_table_ref_dict):
Skymap object.
tract : `int`
Tract number.
source_table_ref_dict : `dict`
Dictionary of source_table refs. Key is visit, value is dataref.
source_table_handle_dict : `dict`
Dictionary of source_table handles. Key is visit, value is
a `lsst.daf.butler.DeferredDatasetHandle`.

Returns
-------
struct : `lsst.pipe.base.struct`
Struct with outputs for persistence.
"""
star_source_cat = self._make_all_star_sources(skymap[tract], source_table_ref_dict)
star_source_cat = self._make_all_star_sources(skymap[tract], source_table_handle_dict)

primary_bands = self.config.band_order

Expand Down Expand Up @@ -297,15 +298,16 @@ def run(self, skymap, tract, source_table_ref_dict):
return pipeBase.Struct(star_source_cat=star_source_cat,
star_cat=primary_star_cat)

def _make_all_star_sources(self, tract_info, source_table_ref_dict):
def _make_all_star_sources(self, tract_info, source_table_handle_dict):
"""Make a catalog of all the star sources.

Parameters
----------
tract_info : `lsst.skymap.TractInfo`
Information about the tract.
source_table_ref_dict : `dict`
Dictionary of source_table refs. Key is visit, value is dataref.
source_table_handle_dict : `dict`
Dictionary of source_table handles. Key is visit, value is
a `lsst.daf.butler.DeferredDatasetHandle`.

Returns
-------
Expand All @@ -319,14 +321,13 @@ def _make_all_star_sources(self, tract_info, source_table_ref_dict):
poly = tract_info.outer_sky_polygon

tables = []
for visit in source_table_ref_dict:
source_table_ref = source_table_ref_dict[visit]
df = source_table_ref.get(parameters={'columns': all_columns})
df.reset_index(inplace=True)
for visit in source_table_handle_dict:
source_table_ref = source_table_handle_dict[visit]
tbl = source_table_ref.get(parameters={'columns': all_columns})

goodSrc = self.source_selector.selectSources(df)
goodSrc = self.source_selector.selectSources(tbl)

table = df[persist_columns][goodSrc.selected].to_records()
table = np.asarray(tbl[persist_columns][goodSrc.selected])

# Append columns that include the row in the source table
# and the matched object index (to be filled later).
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/pipe/tasks/multiBand.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class MeasureMergedCoaddSourcesConnections(
"These tables contain astrometry and photometry flags, and optionally "
"PSF flags."),
name="sourceTable_visit",
storageClass="DataFrame",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit"),
multiple=True,
deferLoad=True,
Expand All @@ -289,7 +289,7 @@ class MeasureMergedCoaddSourcesConnections(
doc=("Finalized source tables from ``FinalizeCalibrationTask``. These "
"tables contain PSF flags from the finalized PSF estimation."),
name="finalized_src_table",
storageClass="DataFrame",
storageClass="ArrowAstropy",
dimensions=("instrument", "visit"),
multiple=True,
deferLoad=True,
Expand Down
Loading
Loading