Skip to content

Commit

Permalink
Pipeline to compute angular correlation function (#8)
Browse files Browse the repository at this point in the history
* Add skeleton for the autocorrelation workflow

* Perform some code cleanup

* Perform alignment accordingly

* First acf calculation with lsdb

* Add the true values for the natural estimator

* Refactor code and add more tests

* Split upper triangle and diagonal

* Pushing files back in

* Update coordinate projection

* Add catalogs with float64 types

* Counts closer to same.

* Be more tolerant!

* Improve docstrings and namings

* Improve code organization

* Fix formatting

---------

Co-authored-by: Melissa DeLucchi <[email protected]>
  • Loading branch information
camposandro and delucchi-cmu authored Jun 28, 2024
1 parent 9ff6cbf commit 83224aa
Show file tree
Hide file tree
Showing 18 changed files with 565 additions and 18 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,7 @@ _html/
# Project initialization script
.initialize_new_project.sh

.idea/
.idea/

# Log files generated by gundam
tests/**/*_log
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
git+https://github.com/astronomy-commons/hipscat.git@main
git+https://github.com/astronomy-commons/lsdb.git@main
git+https://github.com/lincc-frameworks-mask-incubator/gundam.git@main
git+https://github.com/lincc-frameworks-mask-incubator/gundam.git@acf-without-grid
24 changes: 9 additions & 15 deletions src/corrgi/alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,20 @@ def autocorrelation_alignment(catalog: Catalog) -> PixelAlignment:
"""Determine all pairs of partitions that should be correlated within the same catalog.
This considers all combinations, without duplicates between the "primary" and "join"
pixels in the alignment.
pixels in the alignment. It does not include the combinations of partitions with themselves.
Args:
catalog (Catalog): catalog for autocorrelation
catalog (Catalog): catalog for auto-correlation.
Returns:
alignment object where the `aligned` columns simply match the left pixel.
The alignment object where the `aligned` columns simply match the left pixel.
"""
upper_triangle = [
[left.order, left.pixel, right.order, right.pixel, left.order, left.pixel]
for (left, right) in itertools.combinations(catalog.get_healpix_pixels(), 2)
]
upper_triangle = pd.DataFrame(upper_triangle, columns=column_names)
diagonal = pd.DataFrame(
[
[pix.order, pix.pixel, pix.order, pix.pixel, pix.order, pix.pixel]
for pix in catalog.get_healpix_pixels()
],
columns=column_names,
)
result_mapping = pd.concat([upper_triangle, diagonal])
return PixelAlignment(catalog.pixel_tree, result_mapping, PixelAlignmentType.OUTER)
return PixelAlignment(catalog.pixel_tree, upper_triangle, PixelAlignmentType.OUTER)


def crosscorrelation_alignment(catalog_left: Catalog, catalog_right: Catalog) -> PixelAlignment:
Expand All @@ -48,10 +41,11 @@ def crosscorrelation_alignment(catalog_left: Catalog, catalog_right: Catalog) ->
This considers the full cross-product of pixels.
Args:
catalog_left (Catalog): left side of the cross-correlation
catalog_right (Catalog): right side of the cross-correlation
catalog_left (Catalog): left side of the cross-correlation.
catalog_right (Catalog): right side of the cross-correlation.
Returns:
alignment object where the `aligned` columns simply match the left pixel.
The alignment object where the `aligned` columns simply match the left pixel.
"""
full_product = [
[left.order, left.pixel, right.order, right.pixel, left.order, left.pixel]
Expand Down
38 changes: 38 additions & 0 deletions src/corrgi/corrgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import numpy as np
from lsdb import Catalog
from munch import Munch

from corrgi.dask import compute_autocorrelation_counts
from corrgi.estimators import calculate_natural_estimate


def compute_autocorrelation(catalog: Catalog, random: Catalog, params: Munch) -> np.ndarray:
"""Calculates the auto-correlation for a catalog.
Args:
catalog (Catalog): The catalog.
random (Catalog): A random samples catalog.
params (Munch): The parameters dictionary to run gundam with.
Returns:
A numpy array with the result of the auto-correlation, using the natural estimator.
"""
num_galaxies = catalog.hc_structure.catalog_info.total_rows
num_random = random.hc_structure.catalog_info.total_rows
counts_dd, counts_rr = compute_autocorrelation_counts(catalog, random, params)
return calculate_natural_estimate(counts_dd, counts_rr, num_galaxies, num_random)


def compute_crosscorrelation(left: Catalog, right: Catalog, random: Catalog, params: Munch) -> np.ndarray:
"""Computes the cross-correlation between two catalogs.
Args:
left (Catalog): Left catalog for the cross-correlation.
right (Catalog): Right catalog for the cross-correlation.
random (Catalog): A random samples catalog.
params (Munch): The parameters dictionary to run gundam with.
Returns:
A numpy array with the result of the cross-correlation, using the natural estimator.
"""
raise NotImplementedError()
209 changes: 209 additions & 0 deletions src/corrgi/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import dask
import gundam.cflibfor as cff
import numpy as np
import pandas as pd
from dask.distributed import print as dask_print
from gundam import gundam
from hipscat.catalog.catalog_info import CatalogInfo
from hipscat.pixel_math import HealpixPixel
from lsdb import Catalog
from lsdb.dask.merge_catalog_functions import align_and_apply, get_healpix_pixels_from_alignment
from munch import Munch

from corrgi.alignment import autocorrelation_alignment, crosscorrelation_alignment
from corrgi.parameters import generate_dd_rr_params
from corrgi.utils import join_count_histograms, project_coordinates


def compute_autocorrelation_counts(catalog: Catalog, random: Catalog, params: Munch) -> np.ndarray:
"""Computes the auto-correlation counts for a catalog.
Args:
catalog (Catalog): The catalog with galaxy samples.
random (Catalog): The catalog with random samples.
params (dict): The gundam parameters for the Fortran subroutine.
Returns:
The histogram counts to calculate the auto-correlation.
"""
# Calculate the angular separation bins
bins, _ = gundam.makebins(params.nsept, params.septmin, params.dsept, params.logsept)
params_dd, params_rr = generate_dd_rr_params(params)
# Generate the histograms with counts for each catalog
counts_dd = perform_auto_counts(catalog, bins, params_dd)
counts_rr = perform_auto_counts(random, bins, params_rr)
# Actually compute the results
return dask.compute(*[counts_dd, counts_rr])


def perform_auto_counts(catalog: Catalog, *args) -> np.ndarray:
"""Aligns the pixel of a single catalog and performs the pairs counting.
Args:
catalog (Catalog): The catalog.
*args: The arguments to pass to the counting methods.
Returns:
The histogram with the sample distance counts.
"""
# Get counts between points of different partitions
alignment = autocorrelation_alignment(catalog.hc_structure)
left_pixels, right_pixels = get_healpix_pixels_from_alignment(alignment)
cross_partials = align_and_apply(
[(catalog, left_pixels), (catalog, right_pixels)], count_cross_pairs, *args
)
# Get counts between points of the same partition
auto_partials = [
count_auto_pairs(partition, catalog.hc_structure.catalog_info, *args)
for partition in catalog._ddf.to_delayed()
]
all_partials = [*cross_partials, *auto_partials]
return join_count_histograms(all_partials)


def perform_cross_counts(left: Catalog, right: Catalog, *args) -> np.ndarray:
"""Aligns the pixel of two catalogs and performs the pairs counting.
Args:
left (Catalog): The left catalog.
right (Catalog): The right catalog.
*args: The arguments to pass to the count_pairs method.
Returns:
The histogram with the sample distance counts.
"""
alignment = crosscorrelation_alignment(left.hc_structure, right.hc_structure)
left_pixels, right_pixels = get_healpix_pixels_from_alignment(alignment)
cross_partials = align_and_apply([(left, left_pixels), (right, right_pixels)], count_cross_pairs, *args)
return join_count_histograms(cross_partials)


@dask.delayed
def count_auto_pairs(
df: pd.DataFrame,
catalog_info: CatalogInfo,
bins: np.ndarray,
params: Munch,
) -> np.ndarray:
"""Calls the fortran routine to compute the counts for pairs of
partitions belonging to the same catalog.
Args:
df (pd.DataFrame): The partition dataframe.
catalog_info (CatalogInfo): The catalog metadata.
bins (np.ndarray): The separation bins, in angular space.
params (Munch): The gundam subroutine parameters.
Returns:
The count histogram for the partition pair.
"""
try:
return _count_auto_pairs(df, catalog_info, bins, params)
except Exception as exception:
dask_print(exception)


@dask.delayed
def count_cross_pairs(
left_df: pd.DataFrame,
right_df: pd.DataFrame,
left_pix: HealpixPixel,
right_pix: HealpixPixel,
left_catalog_info: CatalogInfo,
right_catalog_info: CatalogInfo,
bins: np.ndarray,
params: Munch,
) -> np.ndarray:
"""Calls the fortran routine to compute the counts for pairs of
partitions belonging to two different catalogs.
Args:
left_df (pd.DataFrame): The left partition dataframe.
right_df (pd.DataFrame): The right partition dataframe.
left_pix (HealpixPixel): The pixel corresponding to `left_df`.
right_pix (HealpixPixel): The pixel corresponding to `right_df`.
left_catalog_info (CatalogInfo): The left catalog metadata.
right_catalog_info (CatalogInfo): The right catalog metadata.
bins (np.ndarray): The separation bins, in angular space.
params (Munch): The gundam subroutine parameters.
Returns:
The count histogram for the partition pair.
"""
try:
return _count_cross_pairs(
left_df,
right_df,
left_catalog_info,
right_catalog_info,
bins,
params,
)
except Exception as exception:
dask_print(exception)


def _count_auto_pairs(
df: pd.DataFrame,
catalog_info: CatalogInfo,
bins: np.ndarray,
params: Munch,
) -> np.ndarray:
x, y, z = project_coordinates(
ra=df[catalog_info.ra_column].to_numpy(),
dec=df[catalog_info.dec_column].to_numpy(),
)
args = [
len(df), # number of particles
x,
y,
z, # X,Y,Z coordinates of particles
params.nsept, # number of angular separation bins
bins, # bins in angular separation [deg]
]
counts = cff.mod.th_A_naiveway(*args) # fast unweighted counting
return counts


def _count_cross_pairs(
left_df: pd.DataFrame,
right_df: pd.DataFrame,
left_catalog_info: CatalogInfo,
right_catalog_info: CatalogInfo,
bins: np.ndarray,
params: Munch,
) -> np.ndarray:
left_x, left_y, left_z = project_coordinates(
ra=left_df[left_catalog_info.ra_column].to_numpy(),
dec=left_df[left_catalog_info.dec_column].to_numpy(),
)
right_x, right_y, right_z = project_coordinates(
ra=right_df[right_catalog_info.ra_column].to_numpy(),
dec=right_df[right_catalog_info.dec_column].to_numpy(),
)
args = [
1, # number of threads OpenMP
len(left_df), # number of particles of the left partition
left_df[left_catalog_info.ra_column].to_numpy(), # RA of particles [deg]
left_df[left_catalog_info.dec_column].to_numpy(), # DEC of particles [deg]
left_x,
left_y,
left_z, # X,Y,Z coordinates of particles
len(right_df), # number of particles of the right partition
right_x,
right_y,
right_z, # X,Y,Z coordinates of particles
params.nsept, # number of angular separation bins
bins, # bins in angular separation [deg]
params.sbound,
params.mxh1,
params.mxh2,
params.cntid,
params.logf,
params.sk1,
np.zeros(len(right_df)),
params.grid,
]
# TODO: Create gundam th_C_naive_way that accepts only the necessary arguments
counts = cff.mod.th_C(*args) # fast unweighted counting
return counts
25 changes: 25 additions & 0 deletions src/corrgi/estimators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import numpy as np
from gundam import tpcf


def calculate_natural_estimate(
counts_dd: np.ndarray,
counts_rr: np.ndarray,
num_galaxies: int,
num_random: int,
) -> np.ndarray:
"""Calculates the auto-correlation value for the natural estimator.
Args:
counts_dd (np.ndarray): The counts for the galaxy samples.
counts_rr (np.ndarray): The counts for the random samples.
num_galaxies (int): The number of galaxy samples.
num_random (int): The number of random samples.
Returns:
The natural correlation function estimate.
"""
dr = 0 # We do not use DR counts for the natural estimator
bdd = np.zeros([len(counts_dd), 0]) # We do not compute the bootstrap counts
wth, _ = tpcf(num_galaxies, num_random, counts_dd, bdd, counts_rr, dr, estimator="NAT")
return wth
53 changes: 53 additions & 0 deletions src/corrgi/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from copy import deepcopy

from gundam import gundam
from munch import Munch


def create_gundam_params(kind: str, **kwargs) -> Munch:
"""Generates the Gundam parameters for a specific kind of correlation function.
Args:
kind (str): The type of correlation function (e.g. acf).
**kwargs: Additional gundam parameters to set/override.
Returns:
The dictionary of gundam parameters.
"""
params = gundam.packpars(kind=kind, write=False)
# Disable grid and fill its unused parameters
params.grid = 0
params.autogrid = False
params.sbound = [1, 2, 1, 2]
params.mxh1 = 2
params.mxh2 = 2
params.sk1 = [[1, 2], [1, 2]]
# Append any additional params
return Munch({**params, **kwargs})


def generate_dd_rr_params(params: Munch) -> tuple[Munch, Munch]:
"""Generate the DD and RR parameters."""
# Create the parameters for both catalogs
par_dd = deepcopy(params)
par_dd.kind = "thC"
par_dd.cntid = "DD"
par_dd.logf = "DD_log"
par_rr = deepcopy(params)
par_rr.kind = "thC"
par_rr.cntid = "RR"
par_rr.logf = "RR_log"
par_rr.wfib = False
par_rr.doboot = False
return par_dd, par_rr


def generate_dr_params(params: Munch) -> Munch:
"""Generate the DR parameters to be used in cross-correlation."""
par_dr = deepcopy(params)
par_dr.kind = "thC"
par_dr.cntid = "DR"
par_dr.logf = "DR_log"
par_dr.wfib = False
par_dr.doboot = False
return par_dr
Loading

0 comments on commit 83224aa

Please sign in to comment.