diff --git a/eastlake/config/Y6A1_v1_fitvd.yaml b/eastlake/config/Y6A1_v1_fitvd.yaml new file mode 100644 index 0000000..c7b2acc --- /dev/null +++ b/eastlake/config/Y6A1_v1_fitvd.yaml @@ -0,0 +1,98 @@ +# same config should work for cosmos and SN +parspace: 'ngmix' +hst_band: null + +# we don't have id yet +match_field: 'number' + +# usually this is to skip the coadd +skip_first_epoch: True + +weight_type: 'uberseg' + +# gaussian aperture weighted fluxes +gap: + # in arcsec + weight_fwhm: 2.5 + +# cosmic ray detection is pretty good now, and outlier rejection +# tends to cause problems for brightish stars +reject_outliers: False + +image_flagnames_to_mask: [ + BPM, + SATURATE, + INTERP, + BADAMP, + CRAY, + TRAIL, + EDGEBLEED, + EDGE, + STREAK, + NEAREDGE, + TAPEBUMP, +] + + +# this is before adding additional masking, e.g. circular mask +# does not apply for uberseg +max_maskfrac: 0.45 + +# minimum number of unmasked pixels to be used in an epoch. this is only +# checked if using uberseg, or in _set_weight, if the weight map is being +# modified, e.g. circular mask +min_npix: 9 + +use_mask: False + +radius_column: "iso_radius" + +max_fof_size: 25 + +mof: + + model: 'bdf' + + subtract_neighbors: False + + # number of times to try the fit if it fails + ntry: 2 + + # for guesses + detband: 2 + + priors: + cen: + type: 'normal2d' + sigma: 0.263 + + g: + type: 'ba' + sigma: 0.2 + + T: + type: 'flat' + pars: [-0.1, 1.e+05] + + flux: + type: 'flat' + pars: [-1000.0, 1.0e+09] + + fracdev: + type: 'normal' + mean: 0.5 + sigma: 0.1 + bounds: [0.0, 1.0] + + psf: + ntry: 4 + + model: 'coellip3' + + lm_pars: + ftol: 1.0e-5 + xtol: 1.0e-5 + + lm_pars: + ftol: 1.0e-5 + xtol: 1.0e-5 diff --git a/eastlake/config/Y6A1_v1_shredx.yaml b/eastlake/config/Y6A1_v1_shredx.yaml new file mode 100644 index 0000000..54a2a08 --- /dev/null +++ b/eastlake/config/Y6A1_v1_shredx.yaml @@ -0,0 +1,37 @@ +fofs: + # minimum fof size to process + min_fofsize: 1 + +data: + # zero weights for pixels with these bits set in the mask plane + zero_weight_badpix: 1 + +guess: + # the guess for each object will be initially laid out with relative flux and + # sizes for each gaussian based on this model + + # dev uses 10 gaussians + model: 'dev' + +shredding: + # number of gaussians to represent the psf + psf_ngauss: 5 + + # tolerance for coadd and flux fitting. For the main fit it is the + # relative change in the log likelihood. For the flux fit it is the + # relative change in the total flux + # tol: 0.001 + + # min and max number of iterations for the main fit on the coadd of all + # bands + # miniter: 40 + # maxiter: 500 + # miniter: 40 + + # min and max number of iterations for the flux only adjustments for each + # band + # flux_miniter: 20 + # flux_maxiter: 500 + + # fill in zero weight pixels with the model on each iteration + # fill_zero_weight: True diff --git a/eastlake/pipeline.py b/eastlake/pipeline.py index e7e4d58..50c1393 100644 --- a/eastlake/pipeline.py +++ b/eastlake/pipeline.py @@ -27,6 +27,7 @@ StashPrep, BalrogRunner, DESDMMEDSRunner, + FitvdRunner, ) from .utils import get_logger, safe_mkdir, pushd from .stash import Stash @@ -51,6 +52,7 @@ ('balrog', BalrogRunner), ('delete_sources', DeleteSources), ('desdm_meds', DESDMMEDSRunner), + ('fitvd', FitvdRunner), ]) STEP_IS_GALSIM = set(["galsim"]) diff --git a/eastlake/steps/__init__.py b/eastlake/steps/__init__.py index 7fd4d7a..d73b9fa 100644 --- a/eastlake/steps/__init__.py +++ b/eastlake/steps/__init__.py @@ -13,3 +13,4 @@ from .balrog import BalrogRunner # noqa from .delete_sources import DeleteSources # noqa from .desdm_meds import DESDMMEDSRunner # noqa +from .fitvd import FitvdRunner # noqa diff --git a/eastlake/steps/delete_sources.py b/eastlake/steps/delete_sources.py index e78b623..bea6559 100644 --- a/eastlake/steps/delete_sources.py +++ b/eastlake/steps/delete_sources.py @@ -123,32 +123,33 @@ def execute(self, stash, new_params=None): self.logger.debug("removing file %s" % t) safe_rm(t) - self.logger.error("removing psf links for %s" % tilename) + self.logger.error("removing psf links for %s" % tilename) + psf_link = os.path.join( + base_dir, stash["desrun"], tilename, "psfs", + os.path.basename(pyml["psf_path"]) + ) + safe_rm(psf_link) + + for sri in pyml["src_info"]: psf_link = os.path.join( base_dir, stash["desrun"], tilename, "psfs", - os.path.basename(pyml["psf_path"]) + os.path.basename(sri["psf_path"]) ) safe_rm(psf_link) - for sri in pyml["src_info"]: - psf_link = os.path.join( - base_dir, stash["desrun"], tilename, "psfs", - os.path.basename(sri["psf_path"]) - ) - safe_rm(psf_link) - - psf_path = os.path.join( - base_dir, stash["desrun"], tilename, "psfs", - ) - safe_rmdir(psf_path) + psf_path = os.path.join( + base_dir, stash["desrun"], tilename, "psfs", + ) + safe_rmdir(psf_path) - self.logger.error("deleting empty dirs") + self.logger.error("deleting empty dirs") - for root, dirs, files in os.walk(base_dir, topdown=False): - for name in dirs: - full_dir = os.path.join(root, name) - if len(os.listdir(full_dir)) == 0: - safe_rmdir(full_dir) + tile_path = os.path.join(base_dir, stash["desrun"], tilename) + for root, dirs, files in os.walk(tile_path, topdown=False): + for name in dirs: + full_dir = os.path.join(root, name) + if len(os.listdir(full_dir)) == 0: + safe_rmdir(full_dir) return 0, stash diff --git a/eastlake/steps/fitvd.py b/eastlake/steps/fitvd.py new file mode 100644 index 0000000..562bb6e --- /dev/null +++ b/eastlake/steps/fitvd.py @@ -0,0 +1,335 @@ +from __future__ import print_function, absolute_import +import os +import pkg_resources + +import fitsio +import joblib +import numpy as np + +from ..step import Step, run_and_check +from ..utils import safe_mkdir, safe_rm + + +def _get_default_config(nm): + return pkg_resources.resource_filename("eastlake", "config/%s" % nm) + + +def get_fofs_chunks(*, n_chunks=None, fofs_path=None): + + # Chunk up the fofs groups: + with fitsio.FITS(fofs_path) as fits: + fofs = fits[1].read() + + n_fofs = len(np.unique(fofs['fof_id'])) + + print("fof groups: ", n_fofs) + + n_objs_chunks = n_fofs // n_chunks + + print("n_chunks : ", n_chunks) + print("n_objs_chunks : ", n_objs_chunks) + + starts_ends = [] + for i in range(n_chunks): + start = i * n_objs_chunks + end = i * n_objs_chunks + n_objs_chunks - 1 + if i == n_chunks - 1: + end = n_fofs - 1 + starts_ends.append((start, end)) + + return starts_ends + + +def get_fitvd_chunks(*, n_chunks=None, shredx_fits_path=None): + + with fitsio.FITS(shredx_fits_path) as fits: + shredx = fits[1].read() + + n_objs = len(shredx) + + print("fitvd objs: ", n_objs) + n_objs_chunks = n_objs // n_chunks + + print("n_chunks : ", n_chunks) + print("n_objs_chunks : ", n_objs_chunks) + + starts_ends = [] + for i in range(n_chunks): + start = i * n_objs_chunks + end = i * n_objs_chunks + n_objs_chunks - 1 + if i == n_chunks - 1: + end = n_objs - 1 + starts_ends.append((start, end)) + + return starts_ends + + +class FitvdRunner(Step): + """ + Pipeline step for running shredx and fitvd + """ + def __init__(self, config, base_dir, name="fitvd", + logger=None, verbosity=0, log_file=None): + + super().__init__( + config, base_dir, name=name, logger=logger, verbosity=verbosity, + log_file=log_file) + + self.shredx_config_file = os.path.abspath( + os.path.expanduser( + os.path.expandvars( + self.config.get( + "shredx_config", + _get_default_config("Y6A1_v1_shredx.yaml"), + ) + ) + ) + ) + self.fitvd_config_file = os.path.abspath( + os.path.expanduser( + os.path.expandvars( + self.config.get( + "fitvd_config", + _get_default_config("Y6A1_v1_fitvd.yaml"), + ) + ) + ) + ) + + # fitvd relies on an older version of ngmix; here, we use a separate conda + # environment to run these codes. + if (CONDA_PREFIX := self.config.get("conda_prefix", None)) is not None: + CMD_PREFIX = [ + "conda", "run", + "--prefix", CONDA_PREFIX, + ] + else: + CMD_PREFIX = [] + self.CMD_PREFIX = CMD_PREFIX + + def execute(self, stash, new_params=None): + rng = np.random.RandomState(seed=stash["step_primary_seed"]) + for tilename in stash["tilenames"]: + shredx_seed = rng.randint(1, 2**31) + fitvd_seed = rng.randint(1, 2**31) + + fitvd_files = [] + + output_dir = os.path.join( + self.base_dir, stash["desrun"], tilename, "fitvd", + ) + shredx_dir = os.path.join( + output_dir, + "shredx", + ) + sof_dir = os.path.join( + output_dir, + "sof", + ) + safe_mkdir(output_dir) + safe_mkdir(shredx_dir) + safe_mkdir(sof_dir) + + meds_dict = dict( + zip( + stash.get("bands"), + stash.get_filepaths("meds_files", tilename) + ) + ) + pyml_dict = { + band: stash.get_output_pizza_cutter_yaml( + tilename, + band, + ) + for band in self.config.get("bands") + } + + bands = self.config.get("bands") + det_band = self.config.get("det_band") + + image_paths = {} + segmap_paths = {} + catalog_paths = {} + psf_paths = {} + meds_paths = {} + + for band in bands: + _image_path = pyml_dict[band].get("image_path") + _segmap_path = pyml_dict[band].get("seg_path") + _catalog_path = pyml_dict[band].get("cat_path") + _psf_path = pyml_dict[band].get("psf_path") + _meds_path = meds_dict[band] + + image_paths[band] = _image_path + segmap_paths[band] = _segmap_path + catalog_paths[band] = _catalog_path + psf_paths[band] = _psf_path + meds_paths[band] = _meds_path + + # Preserve this in case needed later + req_num = os.path.basename(segmap_paths["r"])[13:21] + + # 1. Create shredx fofs + shredx_fofslist = os.path.join( + output_dir, + f"{tilename}_{req_num}_shredx-fofslist.fits", + ) + cmd = [ + "shredx-make-fofs", + "--seg", segmap_paths[det_band], + "--output", shredx_fofslist, + ] + cmd = self.CMD_PREFIX + cmd + run_and_check( + cmd, + "shredx-make-fofs", + ) + + # Prepare fof chunks + fofs_chunks = get_fofs_chunks( + n_chunks=self.config.get("n_jobs"), + fofs_path=shredx_fofslist, + ) + + shredx_chunklist = [ + os.path.join( + shredx_dir, + f"{tilename}_shredx-chunk-{start}-{end}.fits", + ) + for start, end in fofs_chunks + ] + + # 2. Run shredx + cmd_list = [ + [ + "shredx", + "--start", str(start), + "--end", str(end), + "--seed", str(shredx_seed), + "--images", *[image_paths[band] for band in bands], + "--psf", *[psf_paths[band] for band in bands], + "--cat", catalog_paths[det_band], + "--seg", segmap_paths[det_band], + "--config", self.shredx_config_file, + "--fofs", shredx_fofslist, + "--outfile", shredx_chunklist[i], + ] + for i, (start, end) in enumerate(fofs_chunks) + ] + + jobs = [] + for cmd in cmd_list: + cmd = self.CMD_PREFIX + cmd + jobs.append(joblib.delayed( + run_and_check + )(cmd, "shredx")) + + with joblib.Parallel( + n_jobs=self.config.get("n_jobs", -1), + backend="loky", + verbose=100, + ) as par: + par(jobs) + + shredx_flist = os.path.join( + output_dir, + "shredx_flist.txt", + ) + with open(shredx_flist, "w") as fobj: + for shredx_chunk in shredx_chunklist: + fobj.write(f"{shredx_chunk}\n") + + # 3. Collate shredx output + shredx_fits = os.path.join( + output_dir, + f"{tilename}_{req_num}_shredx.fits", + ) + cmd = [ + "shredx-collate", + "--tilename", tilename, + "--output", shredx_fits, + "--flist", shredx_flist, + ] + cmd = self.CMD_PREFIX + cmd + run_and_check( + cmd, + "shredx-collate" + ) + + safe_rm(shredx_flist) + + fitvd_files.append(shredx_fits) + + # Prepare fitvd chunks + fitvd_chunks = get_fitvd_chunks( + n_chunks=self.config.get("n_jobs"), + shredx_fits_path=shredx_fits, + ) + sof_output = [ + os.path.join( + sof_dir, + f"{tilename}_sof-chunk-{start}-{end}.fits", + ) + for start, end in fitvd_chunks + ] + + # 4. Run fitvd + cmd_list = [ + [ + "fitvd", + "--start", str(start), + "--end", str(end), + "--seed", str(fitvd_seed), + "--config", self.fitvd_config_file, + "--model-pars", shredx_fits, + "--output", sof_output[i], + *[meds_paths[band] for band in bands], + ] + for i, (start, end) in enumerate(fitvd_chunks) + ] + + jobs = [] + for cmd in cmd_list: + cmd = self.CMD_PREFIX + cmd + jobs.append(joblib.delayed( + run_and_check + )(cmd, "fitvd")) + + with joblib.Parallel( + n_jobs=self.config.get("n_jobs", -1), + backend="loky", + verbose=100, + ) as par: + par(jobs) + + # 5. Collate fitvd output + sof_fits = os.path.join( + output_dir, + f"{tilename}_{req_num}_sof.fits", + ) + cmd = [ + "fitvd-collate", + "--meds", meds_paths[det_band], + "--output", sof_fits, + *sof_output, + ] + cmd = self.CMD_PREFIX + cmd + run_and_check( + cmd, + "fitvd-collate" + ) + fitvd_files.append(sof_fits) + + # Cleaning up + + # Remove intermediate outputs + safe_rm(shredx_fofslist) + for outfile in shredx_chunklist: + safe_rm(outfile) + for outfile in sof_output: + safe_rm(outfile) + + # Inform the stash of the final outputs + stash.set_filepaths("fitvd_files", fitvd_files, tilename) + + return 0, stash