Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-34875: switch most DataFrame connections to ArrowAstropy #1010

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Drop Gen2 terminology in isolatedStarAssociation.
Use "handle" as an abbreviation for DeferredDatasetHandle (or
InMemoryDatasetHandle) rather than "ref", which used to mean DataRef
in Gen2 to (analogous) but suggests DatasetRef in Gen3 (not analogous).
  • Loading branch information
TallJimbo committed Dec 13, 2024
commit bb34e454b27d0b7360423ca5b0b3c32f94f06b7c
42 changes: 22 additions & 20 deletions python/lsst/pipe/tasks/isolatedStarAssociation.py
Original file line number Diff line number Diff line change
@@ -206,37 +206,37 @@ def __init__(self, **kwargs):
self.source_selector.log.setLevel(self.source_selector.log.WARN)

def runQuantum(self, butlerQC, inputRefs, outputRefs):
input_ref_dict = butlerQC.get(inputRefs)
input_handle_dict = butlerQC.get(inputRefs)

tract = butlerQC.quantum.dataId['tract']

source_table_refs = input_ref_dict['source_table_visit']
source_table_handles = input_handle_dict['source_table_visit']

self.log.info('Running with %d source_table_visit dataRefs',
len(source_table_refs))
self.log.info('Running with %d source_table_visit datasets',
len(source_table_handles))

source_table_ref_dict_temp = {source_table_ref.dataId['visit']: source_table_ref for
source_table_ref in source_table_refs}
source_table_handle_dict_temp = {source_table_handle.dataId['visit']: source_table_handle for
source_table_handle in source_table_handles}

bands = {source_table_ref.dataId['band'] for source_table_ref in source_table_refs}
bands = {source_table_handle.dataId['band'] for source_table_handle in source_table_handles}
for band in bands:
if band not in self.config.band_order:
self.log.warning('Input data has data from band %s but that band is not '
'configured for matching', band)

# TODO: Sort by visit until DM-31701 is done and we have deterministic
# dataset ordering.
source_table_ref_dict = {visit: source_table_ref_dict_temp[visit] for
visit in sorted(source_table_ref_dict_temp.keys())}
source_table_handle_dict = {visit: source_table_handle_dict_temp[visit] for
visit in sorted(source_table_handle_dict_temp.keys())}

struct = self.run(input_ref_dict['skymap'], tract, source_table_ref_dict)
struct = self.run(input_handle_dict['skymap'], tract, source_table_handle_dict)

butlerQC.put(pd.DataFrame(struct.star_source_cat),
outputRefs.isolated_star_sources)
butlerQC.put(pd.DataFrame(struct.star_cat),
outputRefs.isolated_star_cat)

def run(self, skymap, tract, source_table_ref_dict):
def run(self, skymap, tract, source_table_handle_dict):
"""Run the isolated star association task.

Parameters
@@ -245,15 +245,16 @@ def run(self, skymap, tract, source_table_ref_dict):
Skymap object.
tract : `int`
Tract number.
source_table_ref_dict : `dict`
Dictionary of source_table refs. Key is visit, value is dataref.
source_table_handle_dict : `dict`
Dictionary of source_table handles. Key is visit, value is
a `lsst.daf.butler.DeferredDatasetHandle`.

Returns
-------
struct : `lsst.pipe.base.struct`
Struct with outputs for persistence.
"""
star_source_cat = self._make_all_star_sources(skymap[tract], source_table_ref_dict)
star_source_cat = self._make_all_star_sources(skymap[tract], source_table_handle_dict)

primary_bands = self.config.band_order

@@ -297,15 +298,16 @@ def run(self, skymap, tract, source_table_ref_dict):
return pipeBase.Struct(star_source_cat=star_source_cat,
star_cat=primary_star_cat)

def _make_all_star_sources(self, tract_info, source_table_ref_dict):
def _make_all_star_sources(self, tract_info, source_table_handle_dict):
"""Make a catalog of all the star sources.

Parameters
----------
tract_info : `lsst.skymap.TractInfo`
Information about the tract.
source_table_ref_dict : `dict`
Dictionary of source_table refs. Key is visit, value is dataref.
source_table_handle_dict : `dict`
Dictionary of source_table handles. Key is visit, value is
a `lsst.daf.butler.DeferredDatasetHandle`.

Returns
-------
@@ -319,9 +321,9 @@ def _make_all_star_sources(self, tract_info, source_table_ref_dict):
poly = tract_info.outer_sky_polygon

tables = []
for visit in source_table_ref_dict:
source_table_ref = source_table_ref_dict[visit]
df = source_table_ref.get(parameters={'columns': all_columns})
for visit in source_table_handle_dict:
source_table_handle = source_table_handle_dict[visit]
df = source_table_handle.get(parameters={'columns': all_columns})
df.reset_index(inplace=True)

goodSrc = self.source_selector.selectSources(df)
55 changes: 25 additions & 30 deletions tests/test_isolatedStarAssociation.py
Original file line number Diff line number Diff line change
@@ -43,11 +43,10 @@ class IsolatedStarAssociationTestCase(lsst.utils.tests.TestCase):
def setUp(self):
self.skymap = self._make_skymap()
self.tract = 9813
self.data_refs = self._make_simdata(self.tract)
self.visits = np.arange(len(self.data_refs)) + 1
self.handles = self._make_simdata(self.tract)
self.visits = np.arange(len(self.handles)) + 1

self.data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits,
self.data_refs)}
self.handle_dict = {visit: handle for visit, handle in zip(self.visits, self.handles)}

config = IsolatedStarAssociationConfig()
config.band_order = ['i', 'r']
@@ -87,7 +86,7 @@ def _make_simdata(self,

Returns
-------
data_refs : `list` [`InMemoryDatasetHandle`]
handles : `list` [`InMemoryDatasetHandle`]
List of mock references.
"""
np.random.seed(12345)
@@ -131,7 +130,7 @@ def _make_simdata(self,
id_counter = 0
visit_counter = 1

data_refs = []
handles = []
for band in ['r', 'i']:
if band == 'r':
filtername = 'R FILTER'
@@ -188,7 +187,7 @@ def _make_simdata(self,

df = pd.DataFrame(table)
df.set_index('sourceId', inplace=True)
data_refs.append(lsst.pipe.base.InMemoryDatasetHandle(df, storageClass="DataFrame"))
handles.append(lsst.pipe.base.InMemoryDatasetHandle(df, storageClass="DataFrame"))

id_counter += nstar
visit_counter += 1
@@ -203,7 +202,7 @@ def _make_simdata(self,
self.star_ras = np.concatenate((ra_both, ra_just_r, ra_just_i, ra_neighbor))
self.star_decs = np.concatenate((dec_both, dec_just_r, dec_just_i, dec_neighbor))

return data_refs
return handles

def test_compute_unique_ids(self):
"""Test computation of unique ids."""
@@ -234,8 +233,8 @@ def test_match_primary_stars(self):
"""Test matching primary stars."""
# Stack all the sources; we do not want any cutting here.
tables = []
for data_ref in self.data_refs:
df = data_ref.get()
for handle in self.handles:
df = handle.get()
tables.append(df.to_records())
source_cat = np.concatenate(tables)

@@ -271,8 +270,8 @@ def test_match_sources(self):
"""Test _match_sources source to primary matching."""
# Stack all the sources; we do not want any cutting here.
tables = []
for data_ref in self.data_refs:
df = data_ref.get()
for handle in self.handles:
df = handle.get()
tables.append(df.to_records())
source_cat = np.concatenate(tables)

@@ -300,7 +299,7 @@ def test_match_sources(self):
def test_make_all_star_sources(self):
"""Test appending all the star sources."""
source_cat = self.isolatedStarAssociationTask._make_all_star_sources(self.skymap[self.tract],
self.data_ref_dict)
self.handle_dict)

# Make sure we don't have any low s/n sources.
sn_min = np.min(source_cat['normCompTophatFlux_instFlux']
@@ -316,7 +315,7 @@ def test_run_isolated_star_association_task(self):
"""Test running the full task."""
struct = self.isolatedStarAssociationTask.run(self.skymap,
self.tract,
self.data_ref_dict)
self.handle_dict)

star_source_cat = struct.star_source_cat
star_cat = struct.star_cat
@@ -363,13 +362,12 @@ def test_run_isolated_star_association_task(self):

def test_run_task_all_neighbors(self):
"""Test running the task when all the stars are rejected as neighbors."""
data_refs = self._make_simdata(self.tract, only_neighbors=True)
data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits,
data_refs)}
handles = self._make_simdata(self.tract, only_neighbors=True)
handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}

struct = self.isolatedStarAssociationTask.run(self.skymap,
self.tract,
data_ref_dict)
handle_dict)

# These should ber zero length.
self.assertEqual(len(struct.star_source_cat), 0)
@@ -380,13 +378,12 @@ def test_run_task_all_neighbors(self):

def test_run_task_all_out_of_tract(self):
"""Test running the task when all the sources are out of the tract."""
data_refs = self._make_simdata(self.tract, only_out_of_tract=True)
data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits,
data_refs)}
handles = self._make_simdata(self.tract, only_out_of_tract=True)
handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}

struct = self.isolatedStarAssociationTask.run(self.skymap,
self.tract,
data_ref_dict)
handle_dict)

# These should ber zero length.
self.assertEqual(len(struct.star_source_cat), 0)
@@ -397,13 +394,12 @@ def test_run_task_all_out_of_tract(self):

def test_run_task_all_out_of_inner_tract(self):
"""Test running the task when all the sources are out of the inner tract."""
data_refs = self._make_simdata(self.tract, only_out_of_inner_tract=True)
data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits,
data_refs)}
handles = self._make_simdata(self.tract, only_out_of_inner_tract=True)
handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}

struct = self.isolatedStarAssociationTask.run(self.skymap,
self.tract,
data_ref_dict)
handle_dict)

# These should ber zero length.
self.assertEqual(len(struct.star_source_cat), 0)
@@ -417,13 +413,12 @@ def test_run_task_secondary_no_overlap(self):

This tests DM-34834.
"""
data_refs = self._make_simdata(self.tract, no_secondary_overlap=True)
data_ref_dict = {visit: data_ref for visit, data_ref in zip(self.visits,
data_refs)}
handles = self._make_simdata(self.tract, no_secondary_overlap=True)
handle_dict = {visit: handle for visit, handle in zip(self.visits, handles)}

struct = self.isolatedStarAssociationTask.run(self.skymap,
self.tract,
data_ref_dict)
handle_dict)

# Add a sanity check that we got a catalog out.
self.assertGreater(len(struct.star_source_cat), 0)