diff --git a/nuztf/base_scanner.py b/nuztf/base_scanner.py index 4b8cac1..11efa49 100644 --- a/nuztf/base_scanner.py +++ b/nuztf/base_scanner.py @@ -58,6 +58,7 @@ def __init__( ): self.cone_nside = cone_nside self.t_min = t_min + ( self.map_coords, self.pixel_nos, @@ -906,17 +907,6 @@ def calculate_overlap_with_observations( self.logger.info("Unpacking observations") - if self.nside < 1024: - ( - self.map_coords, - self.pixel_nos, - self.nside, - self.map_probs, - self.data, - self.total_pixel_area, - self.key, - ) = self.unpack_skymap(output_nside=1024) - pix_map = dict() pix_obs_times = dict() diff --git a/nuztf/neutrino_scanner.py b/nuztf/neutrino_scanner.py index 3498660..5dfa7fd 100644 --- a/nuztf/neutrino_scanner.py +++ b/nuztf/neutrino_scanner.py @@ -152,7 +152,8 @@ def filter_f_no_prv(self, res: dict): # Require 2 detections separated by 15 mins if (endhist - starthist) < 0.01: self.logger.debug( - f"❌ {ztf_id}: Does have 2 detections, but these are not separated by >15 mins (delta t = {(endhist-starthist)*1440:.0f} min)" + f"❌ {ztf_id}: Does have 2 detections, but these are not separated by " + f">15 mins (delta t = {(endhist-starthist)*1440:.0f} min)" ) return False @@ -160,7 +161,10 @@ def filter_f_no_prv(self, res: dict): return True def filter_f_history(self, res: dict): - """Filter based on 2 detection requirement and probability contour requirement""" + """ + Filter based on 2 detection requirement + and probability contour requirement + """ ztf_id = res["objectId"] @@ -215,8 +219,14 @@ def in_contour(self, ra_deg, dec_deg): return np.logical_and(in_ra, in_dec) - def unpack_skymap(self, skymap=None, output_nside: None | int = None): - """ """ + def unpack_skymap(self, output_nside: None | int = None): + """ + Unpack the skymap and return the pixel coordinates and probabilities + + :param output_nside: Nside of the output map + :return: Map coordinates, pixel numbers, output nside, + map probabilities, data, total pixel area, key + """ output_nside = 2048 if output_nside is None else output_nside map_coords = [] @@ -225,7 +235,8 @@ def unpack_skymap(self, skymap=None, output_nside: None | int = None): center_ra = np.radians(np.mean([self.ra_max, self.ra_min])) center_dec = np.radians(np.mean([self.dec_max, self.dec_min])) # Take the larger of the two sides and convert to radians - # To make sure to include all pixels until the edge of the rectangle, we have to devide by sqrt(2) + # To make sure to include all pixels until the edge of the rectangle, + # we have to devide by sqrt(2) # (not 2 as previously done here!) rad = np.radians( max(self.ra_max - self.ra_min, self.dec_max - self.dec_min) diff --git a/nuztf/observations.py b/nuztf/observations.py deleted file mode 100644 index c45677d..0000000 --- a/nuztf/observations.py +++ /dev/null @@ -1,451 +0,0 @@ -import json -import logging -import os -import time -from glob import glob -from pathlib import Path - -import backoff -import numpy as np -import pandas as pd -import pyvo.dal -import requests -from astropy import units as u -from astropy.time import Time -from pyvo.auth import authsession, securitymethods -from requests.auth import HTTPBasicAuth -from requests.exceptions import HTTPError -from tqdm import tqdm -from ztfquery import skyvision -from ztfquery.io import LOCALSOURCE - -from nuztf import credentials - -coverage_dir = Path(LOCALSOURCE).joinpath("all_obs") -coverage_dir.mkdir(exist_ok=True) - -partial_flag = "_PARTIAL" - -logger = logging.getLogger(__name__) - -# IPAC TAP login -username, password = credentials.load_credentials("ipacdepot") -data = {"username": username, "password": password} -headers = {"Content-Type": "application/x-www-form-urlencoded", "Accept": "text/plain"} -IPAC_TAP_URL = "https://irsa.ipac.caltech.edu" - - -class MNS: - def __init__(self, df): - self.data = df - - -class NoDepotEntry(Exception): - """ - No entry in the depot for a given date - """ - - -def get_date(jd: float) -> str: - """ - Get the date in YYYYMMDD format from a JD - - :param jd: JD - :return: String date - """ - return str(Time(jd, format="jd").isot).split("T")[0].replace("-", "") - - -def download_depot_log(date): - """ - Download the depot log for a given date - - :param date: date in YYYYMMDD format - :return: json log - """ - url = f"https://ztfweb.ipac.caltech.edu/ztf/depot/{date}/ztf_recentproc_{date}.json" - response = requests.get(url, auth=HTTPBasicAuth(username, password)) - if response.status_code == 404: - raise NoDepotEntry(f"No depot entry for {date} at url {url}") - response.raise_for_status() - return response.json() - - -# The following functions are used to find the paths to the cached coverage files - - -def coverage_depot_path(jd: float) -> Path: - """ - Return the path to the cached coverage file for a given JD - - :param jd: JD - :return: path to cached coverage file - """ - - date = get_date(jd) - - if (Time.now().jd - jd) < 1: - partial_ext = partial_flag - else: - partial_ext = "" - - return coverage_dir.joinpath(f"{date}{partial_ext}.json") - - -def coverage_skyvision_path(jd: float) -> Path: - """ - Find the path to the Skyvision coverage file for a given JD - - :param jd: JD - :return: Output path - """ - if (Time.now().jd - jd) < 1: - partial_ext = partial_flag - else: - partial_ext = "" - - output_path = coverage_dir.joinpath(f"{get_date(jd)}{partial_ext}_skyvision.json") - - return output_path - - -def coverage_tap_path(jd: float) -> Path: - """ - Find the path to the TAP coverage file for a given JD - - :param jd: JD - :return: Output path - """ - if (Time.now().jd - jd) < 1: - partial_ext = partial_flag - else: - partial_ext = "" - - output_path = coverage_dir.joinpath(f"{get_date(jd)}{partial_ext}_tap.json") - - return output_path - - -# The following functions are used to write the coverage to the cache - - -def write_coverage_depot(jds: list[float]): - """ - Write the depot coverage for a list of JDs to the cache - - Requires a valid IPAC depot login - - :param jds: JDs - :return: None - """ - for jd in tqdm(jds): - date = get_date(jd) - try: - log = download_depot_log(date) - except NoDepotEntry as exc: - log = {} - logger.warning(f"No depot entry for {date}: {exc}") - - path = coverage_depot_path(jd) - with open(path, "w") as f: - json.dump(log, f) - - -@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=60) -def write_coverage_tap(jds: [float]): - """ - Write the coverage for a list of JDs to the cache using TAP - - (JDs must be half-integer) - - :param jds: Half-integer JDs - :return: None - """ - with requests.Session() as session: - # Create a session and do the login. - # The cookie will end up in the cookie jar of the session. - response = session.post(IPAC_TAP_URL, data=data, headers=headers) - response.raise_for_status() - auth = authsession.AuthSession() - auth.credentials.set(securitymethods.ANONYMOUS, session) - client = pyvo.dal.TAPService("https://irsa.ipac.caltech.edu/TAP", auth) - - for jd in jds: - assert jd - int(jd) == 0.5, "JD must be a half-integer" - - obstable = client.search( - f""" - SELECT expid,obsjd,fid,field,exptime,rcid,maglimit,infobits,ipac_gid - FROM ztf.ztf_current_meta_sci WHERE (obsjd BETWEEN {jd} AND {jd + 1}) - """ - ).to_table() - names = ( - "expid", - "exptime", - "field", - "fid", - "rcid", - "maglimit", - "infobits", - "ipac_gid", - ) - renames = ( - "exposure_id", - "exposure_time", - "field_id", - "filter_id", - "qid", - "maglim", - "status", - "programid", - ) - obstable.rename_columns(names, renames) - - obs = obstable.to_pandas() - obs["nalertpackets"] = np.nan - - output_path = coverage_tap_path(jd) - - obs.to_json(output_path) - time.sleep(1) - - logger.warning( - f"Coverage for JD {jd} written to {output_path} using TAP, " - f"but this will only include programid=1" - ) - - -def write_coverage_skyvision(jds: list[float]): - """ - Write the Skyvision coverage for a list of JDs to the cache - - :param jds: JDs - :return: None - """ - for jd in tqdm(jds): - date = get_date(jd) - - assert jd - int(jd) == 0.5, "JD must be a half-integer" - - res = skyvision.get_log(f"{date[:4]}-{date[4:6]}-{date[6:8]}", verbose=False) - - path = coverage_skyvision_path(jd) - - if res is not None: - # The skyvision log has a bug where some entries have a FieldID of "NONE" - mask = ( - pd.notnull(res["FieldID"]) & res["FieldID"].astype(str).str.isnumeric() - ) - res = res[mask] - jds = [ - Time(f'{row["UT Date"]}T{row["UT Time"]}').jd - for _, row in res.iterrows() - ] - res["obsjd"] = jds - res["status"] = (res["Observation Status"] == "FAILED").astype(int) - res["filter_id"] = res["Filter"].apply( - lambda x: 1 if x == "FILTER_ZTF_G" else 2 if x == "FILTER_ZTF_R" else 3 - ) - res["maglim"] = 20.5 - res["field_id"] = res["FieldID"].astype(int) - res["exposure_time"] = res["Exptime"] - res = res[ - ["obsjd", "filter_id", "field_id", "exposure_time", "maglim", "status"] - ] - - new_res = [] - for _, row in res.iterrows(): - new = row.to_dict() - new_res += [dict(qid=int(i), **new) for i in range(64)] - pd.DataFrame(new_res).to_json(path) - - else: - with open(path, "w") as f: - json.dump({}, f) - - -# The following functions are used to get the coverage from the cache - - -def get_coverage(jds: [int], backend="best") -> pd.DataFrame | None: - """ - Get a dataframe of the coverage for a list of JDs - - Will use the cache if available, otherwise will query the depot, and lastly TAP - - :param jds: JDs - :param backend: "best" or "depot" or "tap" or "skyvision" - :return: Coverage dataframe - """ - - assert backend in [ - "best", - "depot", - "tap", - "skyvision", - ], f"Invalid backend '{backend}'" - - # Clear any logs flagged as partial/incomplete - - cache_files = coverage_dir.glob("*.json") - partial_logs = [x for x in cache_files if partial_flag in str(x)] - - if len(partial_logs) > 0: - logger.debug(f"Removing the following partial logs: {partial_logs}") - for partial_log in partial_logs: - partial_log.unlink() - - # Only write missing logs - - missing_logs = [x for x in jds] - - if backend in ["best", "depot"]: - - for jd in missing_logs: - depot_path = coverage_depot_path(jd) - if depot_path.exists(): - df = pd.read_json(coverage_depot_path(jd)) - if len(df) > 0: - missing_logs.remove(jd) - - if len(missing_logs) > 0: - logger.info( - f"Some logs were missing from the cache. " - f"Querying for the following JDs in depot: {missing_logs}" - ) - write_coverage_depot(missing_logs) - - if backend in ["best", "skyvision"]: - for jd in missing_logs: - skyvision_path = coverage_skyvision_path(jd) - if skyvision_path.exists(): - df = pd.read_json(coverage_skyvision_path(jd)) - if len(df) > 0: - missing_logs.remove(jd) - - if len(missing_logs) > 0: - logger.info( - f"Some logs were still missing from the cache. " - f"Querying for the following JDs from skyvision: {missing_logs}" - ) - write_coverage_skyvision(missing_logs) - - # Try TAP for missing logs - - if backend in ["best", "tap"]: - for jd in missing_logs: - tap_path = coverage_tap_path(jd) - if tap_path.exists(): - df = pd.read_json(coverage_tap_path(jd)) - if len(df) > 0: - missing_logs.remove(jd) - - if len(missing_logs) > 0: - logger.info( - f"Some logs were still missing from the cache. " - f"Querying for the following JDs from TAP: {missing_logs}" - ) - write_coverage_tap(missing_logs) - - # Load logs from cache - - results = [] - - for jd in tqdm(jds): - - res = None - - if backend in ["best", "depot"]: - df = pd.read_json(coverage_depot_path(jd)) - if len(df) > 0: - res = df - - if backend in ["best", "skyvision"]: - if res is None: - df = pd.read_json(coverage_skyvision_path(jd)) - if len(df) > 0: - res = df - - if backend in ["best", "tap"]: - if res is None: - df = pd.read_json(coverage_tap_path(jd)) - if len(df) > 0: - res = df - - if res is not None: - results.append(res) - - if results: - result_df = pd.concat(results, ignore_index=True) - return result_df - else: - return None - - -def get_obs_summary_depot(t_min: Time, t_max: Time, backend="best") -> MNS | None: - """ - Get observation summary from depot - - :param t_min: Start time - :param t_max: End time - :param backend: "best" or "depot" or "tap" or "skyvision" - :return: MNS object - """ - - jds = np.arange(int(t_min.jd) - 0.5, int(t_max.jd) + 1.5) - - res = get_coverage(jds, backend=backend) - - if len(res) == 0: - return None - - res["date"] = Time(res["obsjd"].to_numpy(), format="jd").isot - - mns = MNS(df=res) - - mns.data.query(f"obsjd >= {t_min.jd} and obsjd <= {t_max.jd}", inplace=True) - - mns.data.reset_index(inplace=True) - mns.data.drop(columns=["index"], inplace=True) - - return mns - - -def get_obs_summary( - t_min, - t_max=None, - max_days: float = None, - backend="best", -) -> MNS | None: - """ - Get observation summary from IPAC depot - - :param t_min: Start time - :param t_max: End time - :param max_days: Maximum number of days - :param backend: "best" or "depot" or "tap" or "skyvision" - :return: MNS object - """ - now = Time.now() - - if t_max and max_days: - raise ValueError("Choose either t_max or max_days, not both") - - if t_max is None: - if max_days is None: - t_max = now - else: - t_max = t_min + (max_days * u.day) - - if t_max > now: - t_max = now - - logger.info(f"Getting observation logs using backend {backend}.") - mns = get_obs_summary_depot(t_min=t_min, t_max=t_max, backend=backend) - - if mns is not None: - logger.info(f"Found {len(set(mns.data))} observations in total.") - else: - logger.warning(f"Found no observations using backend {backend}.") - - return mns diff --git a/nuztf/observations/__init__.py b/nuztf/observations/__init__.py new file mode 100644 index 0000000..acbdb59 --- /dev/null +++ b/nuztf/observations/__init__.py @@ -0,0 +1,5 @@ +""" +This module contains functions to download and summarize observations from ZTF +""" + +from nuztf.observations.core import get_obs_summary diff --git a/nuztf/observations/core.py b/nuztf/observations/core.py new file mode 100644 index 0000000..dc77dd4 --- /dev/null +++ b/nuztf/observations/core.py @@ -0,0 +1,205 @@ +import logging + +import numpy as np +import pandas as pd +from astropy import units as u +from astropy.time import Time +from tqdm import tqdm + +from nuztf.observations.depot import coverage_depot_path, write_coverage_depot +from nuztf.observations.mns import MNS +from nuztf.observations.shared import coverage_dir, partial_flag +from nuztf.observations.skyvision import ( + coverage_skyvision_path, + write_coverage_skyvision, +) +from nuztf.observations.tap import coverage_tap_path, write_coverage_tap + +logger = logging.getLogger(__name__) + + +# The following functions are used to get the coverage from the cache + + +def get_coverage(jds: [int], backend="best") -> pd.DataFrame | None: + """ + Get a dataframe of the coverage for a list of JDs + + Will use the cache if available, otherwise will query the depot, and lastly TAP + + :param jds: JDs + :param backend: "best" or "depot" or "tap" or "skyvision" + :return: Coverage dataframe + """ + + assert backend in [ + "best", + "depot", + "tap", + "skyvision", + ], f"Invalid backend '{backend}'" + + # Clear any logs flagged as partial/incomplete + + cache_files = coverage_dir.glob("*.json") + partial_logs = [x for x in cache_files if partial_flag in str(x)] + + if len(partial_logs) > 0: + logger.debug(f"Removing the following partial logs: {partial_logs}") + for partial_log in partial_logs: + partial_log.unlink() + + # Only write missing logs + + missing_logs = [x for x in jds] + + if backend in ["best", "depot"]: + + for jd in missing_logs: + depot_path = coverage_depot_path(jd) + if depot_path.exists(): + df = pd.read_json(coverage_depot_path(jd)) + if len(df) > 0: + missing_logs.remove(jd) + + if len(missing_logs) > 0: + logger.info( + f"Some logs were missing from the cache. " + f"Querying for the following JDs in depot: {missing_logs}" + ) + write_coverage_depot(missing_logs) + + if backend in ["best", "skyvision"]: + for jd in missing_logs: + skyvision_path = coverage_skyvision_path(jd) + if skyvision_path.exists(): + df = pd.read_json(coverage_skyvision_path(jd)) + if len(df) > 0: + missing_logs.remove(jd) + + if len(missing_logs) > 0: + logger.info( + f"Some logs were still missing from the cache. " + f"Querying for the following JDs from skyvision: {missing_logs}" + ) + write_coverage_skyvision(missing_logs) + + # Try TAP for missing logs + + if backend in ["best", "tap"]: + for jd in missing_logs: + tap_path = coverage_tap_path(jd) + if tap_path.exists(): + df = pd.read_json(coverage_tap_path(jd)) + if len(df) > 0: + missing_logs.remove(jd) + + if len(missing_logs) > 0: + logger.info( + f"Some logs were still missing from the cache. " + f"Querying for the following JDs from TAP: {missing_logs}" + ) + write_coverage_tap(missing_logs) + + # Load logs from cache + + results = [] + + for jd in tqdm(jds): + + res = None + + if backend in ["best", "depot"]: + df = pd.read_json(coverage_depot_path(jd)) + if len(df) > 0: + res = df + + if backend in ["best", "skyvision"]: + if res is None: + df = pd.read_json(coverage_skyvision_path(jd)) + if len(df) > 0: + res = df + + if backend in ["best", "tap"]: + if res is None: + df = pd.read_json(coverage_tap_path(jd)) + if len(df) > 0: + res = df + + if res is not None: + results.append(res) + + if results: + result_df = pd.concat(results, ignore_index=True) + return result_df + else: + return None + + +def get_obs_summary_depot(t_min: Time, t_max: Time, backend="best") -> MNS | None: + """ + Get observation summary from depot + + :param t_min: Start time + :param t_max: End time + :param backend: "best" or "depot" or "tap" or "skyvision" + :return: MNS object + """ + + jds = np.arange(int(t_min.jd) - 0.5, int(t_max.jd) + 1.5) + + res = get_coverage(jds, backend=backend) + + if len(res) == 0: + return None + + res["date"] = Time(res["obsjd"].to_numpy(), format="jd").isot + + mns = MNS(df=res) + + mns.data.query(f"obsjd >= {t_min.jd} and obsjd <= {t_max.jd}", inplace=True) + + mns.data.reset_index(inplace=True) + mns.data.drop(columns=["index"], inplace=True) + + return mns + + +def get_obs_summary( + t_min, + t_max=None, + max_days: float = None, + backend="best", +) -> MNS | None: + """ + Get observation summary from IPAC depot + + :param t_min: Start time + :param t_max: End time + :param max_days: Maximum number of days + :param backend: "best" or "depot" or "tap" or "skyvision" + :return: MNS object + """ + now = Time.now() + + if t_max and max_days: + raise ValueError("Choose either t_max or max_days, not both") + + if t_max is None: + if max_days is None: + t_max = now + else: + t_max = t_min + (max_days * u.day) + + if t_max > now: + t_max = now + + logger.info(f"Getting observation logs using backend {backend}.") + mns = get_obs_summary_depot(t_min=t_min, t_max=t_max, backend=backend) + + if mns is not None: + logger.info(f"Found {len(set(mns.data))} observations in total.") + else: + logger.warning(f"Found no observations using backend {backend}.") + + return mns diff --git a/nuztf/observations/depot.py b/nuztf/observations/depot.py new file mode 100644 index 0000000..e61407f --- /dev/null +++ b/nuztf/observations/depot.py @@ -0,0 +1,71 @@ +import json +import logging +from pathlib import Path + +import requests +from astropy.time import Time +from requests.auth import HTTPBasicAuth +from tqdm import tqdm + +from nuztf import credentials +from nuztf.observations.shared import NoDepotEntry, coverage_dir, get_date, partial_flag + +logger = logging.getLogger(__name__) + +username, password = credentials.load_credentials("ipacdepot") +data = {"username": username, "password": password} + + +def download_depot_log(date): + """ + Download the depot log for a given date + + :param date: date in YYYYMMDD format + :return: json log + """ + url = f"https://ztfweb.ipac.caltech.edu/ztf/depot/{date}/ztf_recentproc_{date}.json" + response = requests.get(url, auth=HTTPBasicAuth(username, password)) + if response.status_code == 404: + raise NoDepotEntry(f"No depot entry for {date} at url {url}") + response.raise_for_status() + return response.json() + + +def coverage_depot_path(jd: float) -> Path: + """ + Return the path to the cached coverage file for a given JD + + :param jd: JD + :return: path to cached coverage file + """ + + date = get_date(jd) + + if (Time.now().jd - jd) < 1: + partial_ext = partial_flag + else: + partial_ext = "" + + return coverage_dir.joinpath(f"{date}{partial_ext}.json") + + +def write_coverage_depot(jds: list[float]): + """ + Write the depot coverage for a list of JDs to the cache + + Requires a valid IPAC depot login + + :param jds: JDs + :return: None + """ + for jd in tqdm(jds): + date = get_date(jd) + try: + log = download_depot_log(date) + except NoDepotEntry as exc: + log = {} + logger.warning(f"No depot entry for {date}: {exc}") + + path = coverage_depot_path(jd) + with open(path, "w") as f: + json.dump(log, f) diff --git a/nuztf/observations/mns.py b/nuztf/observations/mns.py new file mode 100644 index 0000000..043401b --- /dev/null +++ b/nuztf/observations/mns.py @@ -0,0 +1,10 @@ +""" +MNS class +""" + +import pandas as pd + + +class MNS: + def __init__(self, df: pd.DataFrame): + self.data = df diff --git a/nuztf/observations/shared.py b/nuztf/observations/shared.py new file mode 100644 index 0000000..e5809fe --- /dev/null +++ b/nuztf/observations/shared.py @@ -0,0 +1,29 @@ +""" +Shared variables for the observations module +""" + +from pathlib import Path + +from astropy.time import Time +from ztfquery.io import LOCALSOURCE + +coverage_dir = Path(LOCALSOURCE).joinpath("all_obs") +coverage_dir.mkdir(exist_ok=True) + +partial_flag = "_PARTIAL" + + +class NoDepotEntry(Exception): + """ + No entry in the depot for a given date + """ + + +def get_date(jd: float) -> str: + """ + Get the date in YYYYMMDD format from a JD + + :param jd: JD + :return: String date + """ + return str(Time(jd, format="jd").isot).split("T")[0].replace("-", "") diff --git a/nuztf/observations/skyvision.py b/nuztf/observations/skyvision.py new file mode 100644 index 0000000..236e7ba --- /dev/null +++ b/nuztf/observations/skyvision.py @@ -0,0 +1,79 @@ +""" +Skyvision coverage +""" + +import json +from pathlib import Path + +import pandas as pd +from astropy.time import Time +from tqdm import tqdm +from ztfquery import skyvision + +from nuztf.observations.shared import coverage_dir, get_date, partial_flag + + +def coverage_skyvision_path(jd: float) -> Path: + """ + Find the path to the Skyvision coverage file for a given JD + + :param jd: JD + :return: Output path + """ + if (Time.now().jd - jd) < 1: + partial_ext = partial_flag + else: + partial_ext = "" + + output_path = coverage_dir.joinpath(f"{get_date(jd)}{partial_ext}_skyvision.json") + + return output_path + + +def write_coverage_skyvision(jds: list[float]): + """ + Write the Skyvision coverage for a list of JDs to the cache + + :param jds: JDs + :return: None + """ + for jd in tqdm(jds): + date = get_date(jd) + + assert jd - int(jd) == 0.5, "JD must be a half-integer" + + res = skyvision.get_log(f"{date[:4]}-{date[4:6]}-{date[6:8]}", verbose=False) + + path = coverage_skyvision_path(jd) + + if res is not None: + # The skyvision log has a bug where some entries have a FieldID of "NONE" + mask = ( + pd.notnull(res["FieldID"]) & res["FieldID"].astype(str).str.isnumeric() + ) + res = res[mask] + jds = [ + Time(f'{row["UT Date"]}T{row["UT Time"]}').jd + for _, row in res.iterrows() + ] + res["obsjd"] = jds + res["status"] = (res["Observation Status"] == "FAILED").astype(int) + res["filter_id"] = res["Filter"].apply( + lambda x: 1 if x == "FILTER_ZTF_G" else 2 if x == "FILTER_ZTF_R" else 3 + ) + res["maglim"] = 20.5 + res["field_id"] = res["FieldID"].astype(int) + res["exposure_time"] = res["Exptime"] + res = res[ + ["obsjd", "filter_id", "field_id", "exposure_time", "maglim", "status"] + ] + + new_res = [] + for _, row in res.iterrows(): + new = row.to_dict() + new_res += [dict(qid=int(i), **new) for i in range(64)] + pd.DataFrame(new_res).to_json(path) + + else: + with open(path, "w") as f: + json.dump({}, f) diff --git a/nuztf/observations/tap.py b/nuztf/observations/tap.py new file mode 100644 index 0000000..fdd7dbf --- /dev/null +++ b/nuztf/observations/tap.py @@ -0,0 +1,106 @@ +import logging +import time +from pathlib import Path + +import backoff +import numpy as np +import pyvo.dal +import requests +from astropy.time import Time +from pyvo.auth import authsession, securitymethods + +from nuztf.observations.depot import data +from nuztf.observations.shared import coverage_dir, get_date, partial_flag + +logger = logging.getLogger(__name__) + + +# IPAC TAP login +headers = {"Content-Type": "application/x-www-form-urlencoded", "Accept": "text/plain"} +IPAC_TAP_URL = "https://irsa.ipac.caltech.edu" + +# The following functions are used to find the paths to the cached coverage files + + +def coverage_tap_path(jd: float) -> Path: + """ + Find the path to the TAP coverage file for a given JD + + :param jd: JD + :return: Output path + """ + if (Time.now().jd - jd) < 1: + partial_ext = partial_flag + else: + partial_ext = "" + + output_path = coverage_dir.joinpath(f"{get_date(jd)}{partial_ext}_tap.json") + + return output_path + + +# The following functions are used to write the coverage to the cache + + +@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=60) +def write_coverage_tap(jds: [float]): + """ + Write the coverage for a list of JDs to the cache using TAP + + (JDs must be half-integer) + + :param jds: Half-integer JDs + :return: None + """ + with requests.Session() as session: + # Create a session and do the login. + # The cookie will end up in the cookie jar of the session. + response = session.post(IPAC_TAP_URL, data=data, headers=headers) + response.raise_for_status() + auth = authsession.AuthSession() + auth.credentials.set(securitymethods.ANONYMOUS, session) + client = pyvo.dal.TAPService("https://irsa.ipac.caltech.edu/TAP", auth) + + for jd in jds: + assert jd - int(jd) == 0.5, "JD must be a half-integer" + + obstable = client.search( + f""" + SELECT expid,obsjd,fid,field,exptime,rcid,maglimit,infobits,ipac_gid + FROM ztf.ztf_current_meta_sci WHERE (obsjd BETWEEN {jd} AND {jd + 1}) + """ + ).to_table() + names = ( + "expid", + "exptime", + "field", + "fid", + "rcid", + "maglimit", + "infobits", + "ipac_gid", + ) + renames = ( + "exposure_id", + "exposure_time", + "field_id", + "filter_id", + "qid", + "maglim", + "status", + "programid", + ) + obstable.rename_columns(names, renames) + + obs = obstable.to_pandas() + obs["nalertpackets"] = np.nan + + output_path = coverage_tap_path(jd) + + obs.to_json(output_path) + time.sleep(1) + + logger.warning( + f"Coverage for JD {jd} written to {output_path} using TAP, " + f"but this will only include programid=1" + ) diff --git a/nuztf/paths.py b/nuztf/paths.py index 1ff8a3f..8ea47e0 100644 --- a/nuztf/paths.py +++ b/nuztf/paths.py @@ -28,3 +28,5 @@ PREPROCESSED_CACHE_DIR = CACHE_DIR / "preprocessed" PREPROCESSED_CACHE_DIR.mkdir(exist_ok=True, parents=True) + +ZTF_LOG_PATH = CACHE_DIR / "allexp.tbl"