diff --git a/setup.cfg b/.flake8 similarity index 100% rename from setup.cfg rename to .flake8 diff --git a/bin/run-eastlake-sim b/bin/run-eastlake-sim index 859fa0e6..2f45bca8 100755 --- a/bin/run-eastlake-sim +++ b/bin/run-eastlake-sim @@ -65,8 +65,9 @@ def parse_args(): parser.add_argument( '--resume', action='store_true', default=False, help=( - "If passed, implies --skip_completed_steps and " - "the restart file must be in the default location " + "If passed and --step_names is not given, " + "implies --skip_completed_steps and " + "The restart file must be in the default location " "(i.e., 'BASE_DIR/job_record.pkl')." ), ) @@ -115,7 +116,10 @@ def main(): ) ) if args.resume: - args.skip_completed_steps = True + if args.step_names is None: + args.skip_completed_steps = True + else: + args.skip_completed_steps = False # Setup the logger # If we're resuming a pipeline (ie. if args.job_record_file is not None), diff --git a/eastlake/config/Y6A1_v1_meds-desdm-Y6A1v11.yaml b/eastlake/config/Y6A1_v1_meds-desdm-Y6A1v11.yaml new file mode 100644 index 00000000..7310ccb2 --- /dev/null +++ b/eastlake/config/Y6A1_v1_meds-desdm-Y6A1v11.yaml @@ -0,0 +1,81 @@ +# Originally https://github.com/beckermr/des-y6-analysis/blob/main/2022_09_20_color_dep_meds/Y6A1_test_piff.yaml +# Color ranges (gi_color_range and iz_color_range have been updated) +# MRB - changed to use astrorefine WCS for use in sims +medsconf: "Y6A1_v1_meds-desdm-Y6A1v11" +campaign: "Y6A2_COADD" +piff_campaign: "Y6A2_PIFF_V3" + +joblib: + max_workers: 8 + backend: loky + +psf: + se: + type: piff + stamp_size: 25 + use_color: True + gi_color_range: [0.0, 3.5] + iz_color_range: [0.0, 0.65] + + coadd: + type: psfex + use_color: False + +# extensions for single-epoch images +# the ngwint files +se_image_ext: "sci" +se_weight_ext: "wgt" +se_bmask_ext: "msk" + +se_astrom: + use_color: False + +# separate bkg files +se_bkg_ext: "sci" + +# separate seg files +se_seg_ext: "sci" + +# coadd images +coadd_image_ext: "sci" +coadd_weight_ext: "wgt" +coadd_bmask_ext: "msk" + +# separate segmap file +coadd_seg_ext: "sci" + +# coadds are already background subtracted +coadd_bkg_ext: "none" + +# DESDM prefers to have the compression keywords instead +# of fpack command line arguments + +fpack_pars: + FZQVALUE: 4 + FZTILE: "(10240,1)" + FZALGOR: "RICE_1" + FZQMETHD: "SUBTRACTIVE_DITHER_2" + +source_type: "finalcut" + +use_astro_refine: True +coadd_astrom: + use_color: False + +# allowed_box_sizes: [32, 48, 64, 96, 128, 192, 256] +allowed_box_sizes: [40, 56, 72, 104, 136, 200, 264] +min_box_size: 40 +max_box_size: 264 + +# special bmask flags that we zero out in the weight map. This should be +# restricted to BADAMP (8) currently, which is designed to deal with the fact +# that the good half of ccd 31 was used for coadds but not the bad half. The +# bad part will be included in the MEDS files and potentially misused. Of +# course users should check the bitmask but in the previous MEDS files ccd 31 +# was not included so codes may not be checking for BADAMP + + +unusable_bmask: 8 + +comment: | + Updated for piff color dep diff --git a/eastlake/pipeline.py b/eastlake/pipeline.py index 42274a37..e7e4d58e 100644 --- a/eastlake/pipeline.py +++ b/eastlake/pipeline.py @@ -26,6 +26,7 @@ MetadetectRunner, StashPrep, BalrogRunner, + DESDMMEDSRunner, ) from .utils import get_logger, safe_mkdir, pushd from .stash import Stash @@ -49,6 +50,7 @@ ('stash_prep', StashPrep), ('balrog', BalrogRunner), ('delete_sources', DeleteSources), + ('desdm_meds', DESDMMEDSRunner), ]) STEP_IS_GALSIM = set(["galsim"]) diff --git a/eastlake/stash.py b/eastlake/stash.py index 68d329e8..6bb93e0d 100644 --- a/eastlake/stash.py +++ b/eastlake/stash.py @@ -414,39 +414,110 @@ def get_input_pizza_cutter_yaml(self, tilename, band): def _make_lists_psfmaps_symlinks( self, base_dir_or_imsim_data, tilename, band, pyml, skip_existing=False, + ): + # retry here for race conditions + import random + import time + + ntry = 10 + for i in range(ntry): + stime = 1.25**i * random.uniform(0.9, 1.0) + try: + self._make_lists_psfmaps_symlinks_impl( + base_dir_or_imsim_data, tilename, band, pyml, skip_existing=skip_existing, + ) + except Exception as e: + if i == ntry-1: + raise e + else: + time.sleep(stime) + + def _make_lists_psfmaps_symlinks_impl( + self, base_dir_or_imsim_data, tilename, band, pyml, skip_existing=False, ): odir = os.path.join(base_dir_or_imsim_data, self["desrun"], tilename) ############################################## - # make bkg and nullwt flist files - os.makedirs(os.path.join(odir, "lists"), exist_ok=True) - bkg_file = os.path.join( - odir, "lists", f"{tilename}_{band}_bkg-flist-{self['desrun']}.dat", + # make lists and file conf stuff + meds_path = os.path.join( + odir, + "%s_%s_meds-%s.fits.fz" % (tilename, band, self["desrun"]), ) - if not (skip_existing and os.path.exists(bkg_file)): - with open(bkg_file, "w") as fp_bkg: - for i in range(len(pyml["src_info"])): - fp_bkg.write(pyml["src_info"][i]["bkg_path"] + "\n") - - nw_file = os.path.join( - odir, "lists", f"{tilename}_{band}_nullwt-flist-{self['desrun']}.dat", + fileconf_data = { + "band": band, + "tilename": tilename, + "coadd_image_url": pyml["image_path"], + "coadd_cat_url": pyml["cat_path"], + "coadd_seg_url": pyml["seg_path"], + "coadd_psf_url": pyml["psf_path"], + "coadd_magzp": 30, + "coadd_object_map": self.get_filepaths( + "coadd_object_map", tilename, band=band, keyerror=False + ), + "meds_url": meds_path, + "coaddinfo": get_pizza_cutter_yaml_path( + self["base_dir"], + self["desrun"], + tilename, + band, + ), + } + + # there are two versions of the lists files, one in a single list dir and one per band + for ldir in ["lists", f"lists-{band}"]: + for flist_name, key in [ + ("bkg_flist", "bkg_path"), + ("piff_flist", "piff_path"), + ("psf_flist", "psf_path"), + ("seg_flist", "seg_path"), + ("nullwt_flist", "coadd_nwgint_path"), + ("nwgint_flist", "coadd_nwgint_path"), + ("finalcut_flist", "image_path"), + ]: + fname = os.path.join( + odir, + ldir, + f"{tilename}_{band}_{flist_name.replace('_', '-')}-{self['desrun']}.dat", + ) + if ( + (not (skip_existing and os.path.exists(fname))) + and any( + key in pyml["src_info"][i] + for i in range(len(pyml["src_info"])) + ) + ): + os.makedirs(os.path.dirname(fname), exist_ok=True) + with open(fname, "w") as fp: + if flist_name in ["finalcut_flist", "nullwt_flist", "nwgint_flist"]: + for i in range(len(pyml["src_info"])): + if ldir == "lists": + fp.write( + "%s %r\n" % ( + pyml["src_info"][i][key], + pyml["src_info"][i]["magzp"], + ) + ) + else: + fp.write( + "%s %s %r\n" % ( + pyml["src_info"][i][key], + pyml["src_info"][i]["head_path"], + pyml["src_info"][i]["magzp"], + ) + ) + else: + for i in range(len(pyml["src_info"])): + fp.write("%s\n" % (pyml["src_info"][i][key],)) + + if ldir != "lists": + fileconf_data[flist_name] = fname + + fname = os.path.join( + odir, f"lists-{band}", f"{tilename}_{band}_fileconf-{self['desrun']}.yaml", ) - if ( - (not (skip_existing and os.path.exists(nw_file))) - and any( - "coadd_nwgint_path" in pyml["src_info"][i] - for i in range(len(pyml["src_info"])) - ) - ): - with open(nw_file, "w") as fp_nw: - for i in range(len(pyml["src_info"])): - if "coadd_nwgint_path" in pyml["src_info"][i]: - fp_nw.write( - "%s %r\n" % ( - pyml["src_info"][i]["coadd_nwgint_path"], - pyml["src_info"][i]["magzp"], - ) - ) + with open(fname, "w") as fp: + yaml.dump(fileconf_data, fp) + self.set_filepaths("desdm-fileconf", fname, tilename, band=band) # make psf map files pmap_file = os.path.join(odir, f"{tilename}_{band}_psfmap-{self['desrun']}.dat") diff --git a/eastlake/steps/__init__.py b/eastlake/steps/__init__.py index 5dd21cda..7fd4d7a7 100644 --- a/eastlake/steps/__init__.py +++ b/eastlake/steps/__init__.py @@ -12,3 +12,4 @@ from .stash_prep import StashPrep # noqa from .balrog import BalrogRunner # noqa from .delete_sources import DeleteSources # noqa +from .desdm_meds import DESDMMEDSRunner # noqa diff --git a/eastlake/steps/balrog.py b/eastlake/steps/balrog.py index 745586ef..deda7840 100644 --- a/eastlake/steps/balrog.py +++ b/eastlake/steps/balrog.py @@ -150,7 +150,7 @@ def execute(self, stash, new_params=None): ) # update the stash with PSF info for downstream w/ MEDS - stash["psf_config"] = {"type": "DES_PSFEx"} + stash["psf_config"] = {"type": "DES_Piff"} stash["draw_method"] = "no_pixel" return 0, stash diff --git a/eastlake/steps/desdm_meds.py b/eastlake/steps/desdm_meds.py new file mode 100644 index 00000000..3920e193 --- /dev/null +++ b/eastlake/steps/desdm_meds.py @@ -0,0 +1,227 @@ +from __future__ import print_function, absolute_import +import fitsio +import numpy as np +import os +import multiprocessing +import shutil +import subprocess + +import joblib +import pkg_resources +import yaml +from datetime import timedelta +from timeit import default_timer as timer + + +from ..utils import safe_mkdir, safe_rm, copy_ifnotexists, safe_copy +from ..step import Step, run_and_check + + +def _get_default_config(nm): + return pkg_resources.resource_filename("eastlake", "config/%s" % nm) + + +class DESDMMEDSRunner(Step): + """ + Pipeline step for generating MEDS files as DESDM does it + """ + def __init__(self, config, base_dir, name="desdm_meds", logger=None, + verbosity=0, log_file=None): + super().__init__( + config, base_dir, name=name, + logger=logger, verbosity=verbosity, log_file=log_file) + + self.meds_config = os.path.abspath( + os.path.expanduser( + os.path.expandvars( + self.config.get( + "config_file", + _get_default_config("Y6A1_v1_meds-desdm-Y6A1v11.yaml"), + ) + ) + ) + ) + self.config["n_jobs"] = int( + self.config.get( + "n_jobs", + multiprocessing.cpu_count()//2, + ) + ) + self.config["use_nwgint"] = self.config.get("use_nwgint", True) + + def clear_stash(self, stash): + # If we continued the pipeline from a previous job record file, + # mof_file entries can mess things up, so clear them + if "tile_info" in stash: + for tilename, tile_file_info in stash["tile_info"].items(): + tile_file_info.pop("meds_files", None) + + def execute(self, stash, new_params=None, boxsize=64): + + self.clear_stash(stash) + + os.environ["MEDS_DIR"] = self.base_dir + meds_run = stash["desrun"] + + # Loop through tiles + tilenames = stash["tilenames"] + + for tilename in tilenames: + # meds files + meds_files = [] + + # image data + for band in stash["bands"]: + t0 = timer() + + self._copy_inputs(stash, tilename, band) + meds_config, fileconf = self._prep_config_and_flist( + stash, tilename, band, meds_run, + ) + + meds_file = os.path.join( + self.base_dir, meds_run, tilename, + "%s_%s_meds-%s.fits.fz" % (tilename, band, meds_run)) + final_meds_file = os.path.join( + self.base_dir, stash["desrun"], tilename, + "%s_%s_meds-%s.fits.fz" % (tilename, band, meds_run)) + meds_files.append(final_meds_file) + for _mpth in [meds_file, final_meds_file]: + safe_mkdir(os.path.dirname(os.path.normpath(_mpth))) + safe_rm(_mpth[:-len(".fz")]) + safe_rm(_mpth) + + tmpdir = os.environ.get("TMPDIR", None) + cmd = [ + "desmeds-make-meds-desdm", + f"{meds_config}", + f"{fileconf}", + f"--tmpdir={tmpdir}", + ] + run_and_check( + cmd, + "desmeds-make-meds-desdm", + logger=self.logger, + verbose=True, + ) + + # move to final spot + shutil.move(meds_file, final_meds_file) + + t1 = timer() + self.logger.error( + "Time to write meds file for tile %s, band %s: %s" % ( + tilename, band, str(timedelta(seconds=t1-t0)))) + + stash.set_filepaths("meds_files", meds_files, tilename) + + return 0, stash + + def _copy_inputs(self, stash, tilename, band): + # copy input files + in_pyml = stash.get_input_pizza_cutter_yaml(tilename, band) + pyml = stash.get_output_pizza_cutter_yaml(tilename, band) + + def _process_entry(i): + # we don't overwrite these since we could have estimated them + copy_ifnotexists( + in_pyml["src_info"][i]["head_path"], + pyml["src_info"][i]["head_path"], + ) + copy_ifnotexists( + in_pyml["src_info"][i]["piff_path"], + pyml["src_info"][i]["piff_path"], + ) + copy_ifnotexists( + in_pyml["src_info"][i]["psf_path"], + pyml["src_info"][i]["psf_path"], + ) + copy_ifnotexists( + in_pyml["src_info"][i]["seg_path"], + pyml["src_info"][i]["seg_path"], + ) + + seg_pth = pyml["src_info"][i]["seg_path"] + if seg_pth.endswith(".fz"): + nofz_seg_pth = seg_pth[:-len(".fz")] + safe_rm(nofz_seg_pth) + subprocess.run( + f"funpack {seg_pth}", + shell=True, + check=True, + ) + else: + nofz_seg_pth = seg_pth + + with fitsio.FITS(nofz_seg_pth, mode='rw') as fp: + fp[0].write(np.zeros(tuple(fp[0].get_dims()))) + + if seg_pth.endswith(".fz"): + safe_rm(seg_pth) + subprocess.run( + f"fpack {nofz_seg_pth}", + shell=True, + check=True, + ) + safe_rm(nofz_seg_pth) + + print("prepping data for MEDS making", flush=True) + jobs = [ + joblib.delayed(_process_entry)(i) + for i in range(len(pyml["src_info"])) + ] + with joblib.Parallel( + n_jobs=self.config["n_jobs"], + backend="loky", + verbose=100, + ) as par: + par(jobs) + + def _prep_config_and_flist(self, stash, tilename, band, meds_run): + # set # of workers in config + meds_yml_pth = os.path.join( + self.base_dir, + stash["desrun"], + tilename, + "meds-config.yaml", + ) + safe_copy( + self.meds_config, + meds_yml_pth, + ) + with open(meds_yml_pth, "r") as fp: + meds_yml = yaml.safe_load(fp.read()) + if "joblib" in meds_yml: + meds_yml["joblib"]["max_workers"] = self.config["n_jobs"] + with open(meds_yml_pth, "w") as fp: + yaml.dump(meds_yml, fp) + + # copy in null weight images if needed + orig_fileconf = stash.get_filepaths( + "desdm-fileconf", tilename, band=band + ) + if self.config["use_nwgint"]: + fileconf = os.path.join( + os.path.dirname(orig_fileconf), + f"{tilename}_{band}_nullwtfileconf-{meds_run}.yaml", + ) + safe_copy( + orig_fileconf, + fileconf, + ) + with open(fileconf, "r") as fp: + fconf = yaml.safe_load(fp.read()) + fconf["finalcut_flist"] = fconf["nullwt_flist"] + with open(fileconf, "w") as fp: + yaml.dump(fconf, fp) + else: + fileconf = orig_fileconf + + return meds_yml_pth, fileconf + + @classmethod + def from_config_file(cls, config_file, base_dir=None, logger=None, + name="meds"): + with open(config_file, "rb") as f: + config = yaml.safe_load(f) + return cls(config, base_dir=base_dir, logger=logger, name=name) diff --git a/eastlake/steps/source_extractor.py b/eastlake/steps/source_extractor.py index 984d4497..43a9874a 100644 --- a/eastlake/steps/source_extractor.py +++ b/eastlake/steps/source_extractor.py @@ -8,7 +8,8 @@ from ..step import Step, run_and_check from ..stash import Stash -from ..utils import get_relpath, pushd +from ..utils import get_relpath, pushd, safe_copy +from ..des_piff import PSF_KWARGS from .swarp import FITSEXTMAP @@ -187,4 +188,57 @@ def execute(self, stash, new_params=None): stash.set_filepaths( "bkg_rms_file", bkg_rms_name, tilename, band=band) + # make coadd object map + last_coadd_cat = fitsio.read(catalog_name, lower=True) + dtype = [ + ('id', 'i8'), + ('object_number', 'i8'), + ('gi_color', 'f8'), + ('iz_color', 'f8'), + ] + obj_cat = np.zeros(len(last_coadd_cat), dtype=dtype) + obj_cat["id"] = last_coadd_cat['number'] + obj_cat["object_number"] = last_coadd_cat['number'] + mags = {} + for band in stash["bands"]: + pyml = stash.get_output_pizza_cutter_yaml(tilename, band) + coadd_cat = fitsio.read(pyml["cat_path"], lower=True) + mags[band] = coadd_cat["mag_auto"] + assert len(coadd_cat) == len(obj_cat) + + if "g" in mags and "i" in mags: + obj_cat["gi_color"] = mags["g"] - mags["i"] + else: + obj_cat["gi_color"] = PSF_KWARGS["r"]["GI_COLOR"] + + if "i" in mags and "z" in mags: + obj_cat["iz_color"] = mags["i"] - mags["z"] + else: + obj_cat["iz_color"] = PSF_KWARGS["z"]["IZ_COLOR"] + + for band in stash["bands"]: + coadd_file, _ = stash.get_filepaths( + "coadd_file", tilename, band=band, with_fits_ext=True, funpack=True, + ) + obj_cat_name = coadd_file.replace(".fits", "_objmap.fits") + fitsio.write(obj_cat_name, obj_cat, clobber=True) + with stash.update_output_pizza_cutter_yaml(tilename, band): + stash.set_filepaths( + "coadd_object_map", obj_cat_name, tilename, band=band, + ) + + for band in stash["bands"]: + # copy the PSF model even though it is not the actual PSF + orig_psf_path = stash.get_input_pizza_cutter_yaml( + tilename, band + )["psf_path"] + psf_path_from_imsim_data = os.path.relpath( + orig_psf_path, stash["imsim_data"]) + psf_file = os.path.join( + stash["base_dir"], psf_path_from_imsim_data) + safe_copy( + orig_psf_path, + psf_file, + ) + return 0, stash diff --git a/eastlake/steps/true_detection.py b/eastlake/steps/true_detection.py index 78771b95..62ec9f47 100644 --- a/eastlake/steps/true_detection.py +++ b/eastlake/steps/true_detection.py @@ -6,6 +6,8 @@ from ..step import Step from ..utils import safe_copy, safe_rm +from ..des_piff import PSF_KWARGS +from ..des_files import MAGZP_REF class TrueDetectionRunner(Step): @@ -68,12 +70,22 @@ def _reformat_catalog(self, stash, tilename, band): ('ymax_image', 'i4'), ('x_image', 'f4'), ('y_image', 'f4'), + ('x2_image', 'f4'), + ('y2_image', 'f4'), + ('errx2_image', 'f4'), + ('erry2_image', 'f4'), ('alpha_j2000', 'f8'), ('delta_j2000', 'f8'), ('a_world', 'f4'), ('b_world', 'f4'), ('flags', 'i2'), - ('flux_radius', 'f4')] + ('flux_radius', 'f4'), + ('isoarea_image', 'f4'), + ('flux_auto', 'f4'), + ("mag_auto", 'f4'), + ('fluxerr_auto', 'f4'), + ('magerr_auto', 'f4'), + ] srcext_cat = np.zeros(len(tdet), dtype=dtype) srcext_cat['number'] = np.arange(len(tdet)) + 1 srcext_cat['x_image'] = tdet['x'] @@ -84,6 +96,11 @@ def _reformat_catalog(self, stash, tilename, band): srcext_cat['b_world'] = 0 srcext_cat['flags'] = 0 srcext_cat['flux_radius'] = 0 + if f"mag_{band}" in tdet.dtype.names: + srcext_cat["mag_auto"] = tdet[f"mag_{band}"] + srcext_cat["flux_auto"] = 10**( + 0.4*(MAGZP_REF - tdet[f"mag_{band}"]) + ) half = int(self.box_size / 2) xint = (tdet['x'] + 0.5).astype(np.int32) @@ -102,9 +119,44 @@ def _reformat_catalog(self, stash, tilename, band): pyml["cat_path"] = srcext_cat_name fitsio.write(srcext_cat_name, srcext_cat, clobber=True) + # make the coadd object map file + dtype = [ + ('id', 'i8'), + ('object_number', 'i8'), + ('gi_color', 'f8'), + ('iz_color', 'f8'), + ] + obj_cat = np.zeros(len(tdet), dtype=dtype) + obj_cat_name = coadd_file.replace(".fits", "_objmap.fits") + obj_cat["id"] = srcext_cat['number'] + obj_cat["object_number"] = srcext_cat['number'] + if "mag_g" in tdet.dtype.names and "mag_i" in tdet.dtype.names: + obj_cat["gi_color"] = tdet["mag_g"] - tdet["mag_i"] + bad = ~np.isfinite(obj_cat["gi_color"]) + if np.any(bad): + obj_cat[bad] = PSF_KWARGS["r"]["GI_COLOR"] + else: + obj_cat["gi_color"] = PSF_KWARGS["r"]["GI_COLOR"] + + if "mag_i" in tdet.dtype.names and "mag_z" in tdet.dtype.names: + obj_cat["iz_color"] = tdet["mag_i"] - tdet["mag_z"] + bad = ~np.isfinite(obj_cat["iz_color"]) + if np.any(bad): + obj_cat[bad] = PSF_KWARGS["r"]["iz_COLOR"] + else: + obj_cat["iz_color"] = PSF_KWARGS["z"]["IZ_COLOR"] + + fitsio.write(obj_cat_name, obj_cat, clobber=True) + with stash.update_output_pizza_cutter_yaml(tilename, band): + stash.set_filepaths( + "coadd_object_map", obj_cat_name, tilename, band=band, + ) + def _copy_and_munge_coadd_data(self, stash, tilename, band): # we need to set the coadd path and img ext for downstrem code - orig_coadd_path = stash.get_input_pizza_cutter_yaml(tilename, band)["image_path"] + orig_coadd_path = stash.get_input_pizza_cutter_yaml( + tilename, band + )["image_path"] coadd_path_from_imsim_data = os.path.relpath( orig_coadd_path, stash["imsim_data"]) coadd_file = os.path.join( @@ -146,4 +198,17 @@ def _copy_and_munge_coadd_data(self, stash, tilename, band): pyml["seg_path"] = "" pyml["seg_ext"] = -1 + # copy the PSF model even though it is not the actual PSF + orig_psf_path = stash.get_input_pizza_cutter_yaml( + tilename, band + )["psf_path"] + psf_path_from_imsim_data = os.path.relpath( + orig_psf_path, stash["imsim_data"]) + psf_file = os.path.join( + stash["base_dir"], psf_path_from_imsim_data) + safe_copy( + orig_psf_path, + psf_file, + ) + return coadd_file diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 00000000..f11cf635 --- /dev/null +++ b/ruff.toml @@ -0,0 +1 @@ +line-length = 120