Skip to content

Commit

Permalink
Add option for different backends
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdstein committed Aug 1, 2024
1 parent 6580f5a commit 2774a79
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 170 deletions.
27 changes: 21 additions & 6 deletions nuztf/base_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,21 @@ def text_summary(self):
return text

def calculate_overlap_with_observations(
self, first_det_window_days=3.0, min_sep=0.01, fields=None
self,
first_det_window_days: float = 3.0,
min_sep: float = 0.01,
fields: list[int] | None = None,
backend: str = "best",
):
"""
Calculate the overlap of the skymap with observations
:param first_det_window_days: First detection window in days
:param min_sep: Minimum separation between detections in days
:param fields: Fields to consider (if None, all fields are considered)
:param backend: Backend to use for coverage calculation
:return:
"""

if fields is not None:
new = []
Expand All @@ -874,14 +887,17 @@ def calculate_overlap_with_observations(
data = pd.concat(new)

else:
mns = get_obs_summary(t_min=self.t_min, max_days=first_det_window_days)
mns = get_obs_summary(
t_min=self.t_min, max_days=first_det_window_days, backend=backend
)

if mns is None:
return None, None, None

data = mns.data.copy()

mask = data["status"] == 0

self.logger.info(
f"Found {mask.sum()} successful observations in the depot, "
f"corresponding to {np.mean(mask)*100:.2f}% of the total."
Expand Down Expand Up @@ -1014,17 +1030,15 @@ def calculate_overlap_with_observations(
)

def plot_overlap_with_observations(
self,
first_det_window_days=None,
min_sep=0.01,
fields=None,
self, first_det_window_days=None, min_sep=0.01, fields=None, backend="best"
):
"""
Function to plot the overlap of the field with observations.
:param first_det_window_days: Window of time in days to consider for the first detection.
:param min_sep: Minimum separation between observations to consider them as separate.
:param fields: Fields to consider.
:param backend: Backend to use for coverage calculation
"""

Expand All @@ -1036,6 +1050,7 @@ def plot_overlap_with_observations(
first_det_window_days=first_det_window_days,
min_sep=min_sep,
fields=fields,
backend=backend,
)

if coverage_df is None:
Expand Down
201 changes: 97 additions & 104 deletions nuztf/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,24 @@ def write_coverage_skyvision(jds: list[float]):
# The following functions are used to get the coverage from the cache


def get_coverage(jds: [int]) -> pd.DataFrame | None:
def get_coverage(jds: [int], backend="best") -> pd.DataFrame | None:
"""
Get a dataframe of the coverage for a list of JDs
Will use the cache if available, otherwise will query the depot, and lastly TAP
:param jds: JDs
:param backend: "best" or "depot" or "tap" or "skyvision"
:return: Coverage dataframe
"""

assert backend in [
"best",
"depot",
"tap",
"skyvision",
], f"Invalid backend '{backend}'"

# Clear any logs flagged as partial/incomplete

cache_files = coverage_dir.glob("*.json")
Expand All @@ -288,77 +297,83 @@ def get_coverage(jds: [int]) -> pd.DataFrame | None:

# Only write missing logs

missing_logs = []
missing_logs = [x for x in jds]

for jd in jds:
depot_path = coverage_depot_path(jd)
if not depot_path.exists():
missing_logs.append(jd)
else:
df = pd.read_json(coverage_depot_path(jd))
if len(df) == 0:
missing_logs.append(jd)

if len(missing_logs) > 0:
logger.info(
f"Some logs were missing from the cache. "
f"Querying for the following JDs in depot: {missing_logs}"
)
write_coverage_depot(missing_logs)

# Try skyvision for missing logs
still_missing_logs = []
for jd in missing_logs:
skyvision_path = coverage_skyvision_path(jd)
if not skyvision_path.exists():
still_missing_logs.append(jd)
else:
df = pd.read_json(coverage_skyvision_path(jd))
if len(df) == 0:
still_missing_logs.append(jd)
if backend in ["best", "depot"]:

if len(still_missing_logs) > 0:
logger.info(
f"Some logs were still missing from the cache. "
f"Querying for the following JDs from skyvision: {still_missing_logs}"
)
write_coverage_skyvision(still_missing_logs)
for jd in missing_logs:
depot_path = coverage_depot_path(jd)
if depot_path.exists():
df = pd.read_json(coverage_depot_path(jd))
if len(df) > 0:
missing_logs.remove(jd)

# Try TAP for missing logs

completely_missing_logs = []
if len(missing_logs) > 0:
logger.info(
f"Some logs were missing from the cache. "
f"Querying for the following JDs in depot: {missing_logs}"
)
write_coverage_depot(missing_logs)

if backend in ["best", "skyvision"]:
for jd in missing_logs:
skyvision_path = coverage_skyvision_path(jd)
if skyvision_path.exists():
df = pd.read_json(coverage_skyvision_path(jd))
if len(df) > 0:
missing_logs.remove(jd)

if len(missing_logs) > 0:
logger.info(
f"Some logs were still missing from the cache. "
f"Querying for the following JDs from skyvision: {missing_logs}"
)
write_coverage_skyvision(missing_logs)

for jd in still_missing_logs:
depot_path = coverage_depot_path(jd)
if not depot_path.exists():
completely_missing_logs.append(jd)
else:
df = pd.read_json(coverage_depot_path(jd))
if len(df) == 0:
completely_missing_logs.append(jd)
# Try TAP for missing logs

if len(completely_missing_logs) > 0:
logger.info(
f"Some logs were still missing from the cache. "
f"Querying for the following JDs from TAP: {completely_missing_logs}"
)
write_coverage_tap(completely_missing_logs)
if backend in ["best", "tap"]:
for jd in missing_logs:
tap_path = coverage_tap_path(jd)
if tap_path.exists():
df = pd.read_json(coverage_tap_path(jd))
if len(df) > 0:
missing_logs.remove(jd)

if len(missing_logs) > 0:
logger.info(
f"Some logs were still missing from the cache. "
f"Querying for the following JDs from TAP: {missing_logs}"
)
write_coverage_tap(missing_logs)

# Load logs from cache

results = []

for jd in tqdm(jds):
res = pd.read_json(coverage_depot_path(jd))
if len(res) > 0:

res = None

if backend in ["best", "depot"]:
df = pd.read_json(coverage_depot_path(jd))
if len(df) > 0:
res = df

if backend in ["best", "skyvision"]:
if res is None:
df = pd.read_json(coverage_skyvision_path(jd))
if len(df) > 0:
res = df

if backend in ["best", "tap"]:
if res is None:
df = pd.read_json(coverage_tap_path(jd))
if len(df) > 0:
res = df

if res is not None:
results.append(res)
else:
res = pd.read_json(coverage_skyvision_path(jd))
if len(res) > 0:
results.append(res)
else:
res = pd.read_json(coverage_tap_path(jd))
results.append(res)

if results:
result_df = pd.concat(results, ignore_index=True)
Expand All @@ -367,14 +382,19 @@ def get_coverage(jds: [int]) -> pd.DataFrame | None:
return None


def get_obs_summary_depot(t_min: Time, t_max: Time) -> MNS | None:
def get_obs_summary_depot(t_min: Time, t_max: Time, backend="best") -> MNS | None:
"""
Get observation summary from depot
:param t_min: Start time
:param t_max: End time
:param backend: "best" or "depot" or "tap" or "skyvision"
:return: MNS object
"""

jds = np.arange(int(t_min.jd) - 0.5, int(t_max.jd) + 1.5)

res = get_coverage(jds)
res = get_coverage(jds, backend=backend)

if len(res) == 0:
return None
Expand All @@ -391,47 +411,20 @@ def get_obs_summary_depot(t_min: Time, t_max: Time) -> MNS | None:
return mns


def get_obs_summary_skyvision(t_min, t_max):
"""
Get observation summary from Skyvision
"""
t_min_date = t_min.to_value("iso", subfmt="date")
t_max_date = t_max.to_value("iso", subfmt="date")

# ztfquery saves nightly observations in a cache, and does not redownload them.
# If the nightly log was not complete, it will never be updated.
# Here we simply clear the cache and cleanly re-download everything.

logger.debug(
f"Skyvision: Obtaining nightly observation logs from {t_min_date} to {t_max_date}"
)

skyvision_log = os.path.join(LOCALSOURCE, "skyvision")

for filename in os.listdir(skyvision_log):
if ".csv" in filename:
path = os.path.join(skyvision_log, filename)
os.remove(path)

mns = skyvision.CompletedLog.from_daterange(
t_min_date, end=t_max_date, verbose=False
)

mns.data["obsjd"] = Time(list(mns.data.datetime.values), format="isot").jd

mns.data.query(f"obsjd >= {t_min.jd} and obsjd <= {t_max.jd}", inplace=True)

mns.data.reset_index(inplace=True)
mns.data.drop(columns=["index"], inplace=True)

logger.debug(f"Found {len(mns.data)} observations in total.")

return mns


def get_obs_summary(t_min, t_max=None, max_days: float = None) -> MNS | None:
def get_obs_summary(
t_min,
t_max=None,
max_days: float = None,
backend="best",
) -> MNS | None:
"""
Get observation summary from IPAC depot
:param t_min: Start time
:param t_max: End time
:param max_days: Maximum number of days
:param backend: "best" or "depot" or "tap" or "skyvision"
:return: MNS object
"""
now = Time.now()

Expand All @@ -447,12 +440,12 @@ def get_obs_summary(t_min, t_max=None, max_days: float = None) -> MNS | None:
if t_max > now:
t_max = now

logger.info("Getting observation logs from IPAC depot.")
mns = get_obs_summary_depot(t_min=t_min, t_max=t_max)
logger.info(f"Getting observation logs using backend {backend}.")
mns = get_obs_summary_depot(t_min=t_min, t_max=t_max, backend=backend)

if mns is not None:
logger.info(f"Found {len(set(mns.data))} observations in total.")
else:
logger.warning("Found no observations on IPAC depot or TAP.")
logger.warning(f"Found no observations using backend {backend}.")

return mns
60 changes: 0 additions & 60 deletions tests/test_observations.py

This file was deleted.

0 comments on commit 2774a79

Please sign in to comment.