From d6efe2d3b0202df6f15d86f94fbe39ff1866f9ae Mon Sep 17 00:00:00 2001 From: Glauber Costa Vila Verde Date: Mon, 13 May 2024 12:47:56 -0700 Subject: [PATCH] All necessary methods to apply a transformation function have been implemented. --- .tmp/config.yaml | 90 +++++-- .../lsst/consdb/efd_transform/butler_dao.py | 28 ++- python/lsst/consdb/efd_transform/transform.py | 26 ++ .../consdb/efd_transform/transform_main.py | 231 ++++++++++++------ 4 files changed, 267 insertions(+), 108 deletions(-) diff --git a/.tmp/config.yaml b/.tmp/config.yaml index 14ba4a43..643a41c1 100644 --- a/.tmp/config.yaml +++ b/.tmp/config.yaml @@ -6,34 +6,76 @@ columns: - name: lsst.sal.ATAOS.logevent_focusOffsetSummary fields: ["total"] - window: 0.0 - # - name: ATAOS_focusOffsetSummary_userApplied - # function: mean - # topics: - # - - # topic: lsst.sal.ATAOS.logevent_focusOffsetSummary - # fields: ["userApplied"] - # window: 0.0 + - name: ATPtg_warning + function: mean + topics: + - + name: lsst.sal.ATPtg.logevent_airmassWarning + fields: ["level"] + - + name: lsst.sal.ATPtg.logevent_moonProximityWarning + fields: ["level"] + - + name: lsst.sal.ATPtg.logevent_mountDataWarning + fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_azWrapWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_blindSpotWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_elLimitWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_focusNameInconsistentWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_objectSetWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_rotWrapWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_sunProximityWarning + # fields: ["level"] + # - + # name: lsst.sal.ATPtg.logevent_objectSetWarning + # fields: ["level"] - # - name: ATAOS_focusOffsetSummary_filter - # function: mean - # topics: - # - - # topic: lsst.sal.ATAOS.logevent_focusOffsetSummary - # fields: ["filter"] - # window: 0.0 + - name: ATAOS_focusOffsetSummary_userApplied + function: min + topics: + - + name: lsst.sal.ATAOS.logevent_focusOffsetSummary + fields: ["userApplied"] + - name: ATAOS_focusOffsetSummary_filter + function: max + topics: + - + name: lsst.sal.ATAOS.logevent_focusOffsetSummary + fields: ["filter"] -# Exemplo retorna 0 Rows - # - name: ESS_pressure - # type: double - # function: mean - # window: 0.0 - # topics: - # - - # topic: lsst.sal.ESS.pressure - # fields: ["pressureItem"] + - name: ATAOS_focusOffsetSummary_userApplied_min + function: min + topics: + - + name: lsst.sal.ATAOS.logevent_focusOffsetSummary + fields: ["userApplied"] + - name: ATAOS_focusOffsetSummary_userApplied_mean + function: mean + topics: + - + name: lsst.sal.ATAOS.logevent_focusOffsetSummary + fields: ["userApplied"] + - name: ATAOS_focusOffsetSummary_userApplied_max + function: max + topics: + - + name: lsst.sal.ATAOS.logevent_focusOffsetSummary + fields: ["userApplied"] # topics: # # other optional information diff --git a/python/lsst/consdb/efd_transform/butler_dao.py b/python/lsst/consdb/efd_transform/butler_dao.py index 544638cb..b4907478 100644 --- a/python/lsst/consdb/efd_transform/butler_dao.py +++ b/python/lsst/consdb/efd_transform/butler_dao.py @@ -1,6 +1,7 @@ +import astropy.time +import pandas from lsst.daf.butler import Butler -import astropy.time class ButlerDao: @@ -10,23 +11,32 @@ def __init__(self, butler: Butler): def query_dimension_to_list(self, resultset): return [r.toDict() for r in resultset] - def exposures_by_period(self, + def query_dimension_to_dataframe(self, resultset): + return pandas.DataFrame([q.toDict() for q in resultset]) + + def exposures_by_period( + self, instrument: str, start_time: astropy.time.Time, end_time: astropy.time.Time, ) -> list: where_clause = f"instrument=instr and exposure.timespan OVERLAPS (T'{start_time}', T'{end_time}')" - return self.butler.registry.queryDimensionRecords( - "exposure", where=where_clause, bind=dict(instr=instrument)) - - def visits_by_period(self, + resultset = self.butler.registry.queryDimensionRecords( + "exposure", where=where_clause, bind=dict(instr=instrument) + ) + return self.query_dimension_to_list(resultset) + + def visits_by_period( + self, instrument: str, start_time: astropy.time.Time, end_time: astropy.time.Time, ) -> list: where_clause = f"instrument=instr and visit.timespan OVERLAPS (T'{start_time}', T'{end_time}')" - return self.butler.registry.queryDimensionRecords( - "visit", where=where_clause, bind=dict(instr=instrument)) - + resultset = self.butler.registry.queryDimensionRecords( + "visit", where=where_clause, bind=dict(instr=instrument) + ) + + return self.query_dimension_to_list(resultset) diff --git a/python/lsst/consdb/efd_transform/transform.py b/python/lsst/consdb/efd_transform/transform.py index 47ddea8a..03c7f07e 100644 --- a/python/lsst/consdb/efd_transform/transform.py +++ b/python/lsst/consdb/efd_transform/transform.py @@ -2,6 +2,7 @@ import numpy + class Transform: """ A class that performs various transformations on a list or numpy @@ -20,6 +21,22 @@ def __init__(self, values: Union[List[Union[float, int, bool]], numpy.ndarray]): """ self.values = numpy.array(values) + def apply(self, method_name: str) -> Union[float, None]: + """ + Apply a transformation method specified by method_name. + + Args: + method_name: Name of the method to apply. + + Returns: + The result of the transformation method or None if the method is not found. + """ + method = getattr(self, method_name, None) + if method: + return method() + else: + return None + def mean(self) -> float: """ Calculate the mean of the values. @@ -27,6 +44,9 @@ def mean(self) -> float: Returns: The mean value as a float. """ + if numpy.size(self.values) == 0: + return numpy.nan + return numpy.nanmean(self.values) def std(self, ddof: Optional[int] = 1) -> float: @@ -48,6 +68,9 @@ def max(self) -> Union[float, int, bool]: Returns: The maximum value as a float, int, or bool. """ + if numpy.size(self.values) == 0: + return numpy.nan + return numpy.nanmax(self.values) def min(self) -> Union[float, int, bool]: @@ -57,6 +80,9 @@ def min(self) -> Union[float, int, bool]: Returns: The minimum value as a float, int, or bool. """ + if numpy.size(self.values) == 0: + return numpy.nan + return numpy.nanmin(self.values) def logical_and(self) -> Union[bool, numpy.ndarray]: diff --git a/python/lsst/consdb/efd_transform/transform_main.py b/python/lsst/consdb/efd_transform/transform_main.py index b9eda158..e5ca0df5 100644 --- a/python/lsst/consdb/efd_transform/transform_main.py +++ b/python/lsst/consdb/efd_transform/transform_main.py @@ -1,28 +1,31 @@ import argparse import asyncio -from typing import Any, Union, List +import logging +import sys +from pathlib import Path +from typing import Any, List, Union +import astropy.time import lsst_efd_client +import numpy +import pandas import yaml +from efd_transform.butler_dao import ButlerDao +from efd_transform.transform import Transform from lsst.daf.butler import Butler from sqlalchemy import Engine -import logging -import sys -from pathlib import Path -import astropy.time -from efd_transform.butler_dao import ButlerDao -import pandas -import numpy class TransformMain: - def __init__(self, - butler: Butler, - db: Engine, - efd: lsst_efd_client.EfdClient, - config: dict[str, Any], - logger: logging.Logger): + def __init__( + self, + butler: Butler, + db: Engine, + efd: lsst_efd_client.EfdClient, + config: dict[str, Any], + logger: logging.Logger, + ): self.log = logger @@ -36,50 +39,127 @@ def __init__(self, self.log.debug(f"EFD: {self.efd}") self.log.debug(f"Configs: {self.config}") - - async def process_interval(self, - instrument: str, - start_time: astropy.time.Time, - end_time: astropy.time.Time, - ): + async def process_interval( + self, + instrument: str, + start_time: astropy.time.Time, + end_time: astropy.time.Time, + ): self.log.info(f"Proccessing interval {start_time} - {end_time}") # Retrieves all exposures for the period - exposures = self.butler_dao.exposures_by_period( - instrument, start_time, end_time) - - # self.log.info(f"Exposures: {len(exposures)}") + exposures = self.butler_dao.exposures_by_period(instrument, start_time, end_time) + + self.log.info(f"Exposures: {len(exposures)}") # Retrieves all visits for the period - visits = self.butler_dao.visits_by_period( - instrument, start_time, end_time) + visits = self.butler_dao.visits_by_period(instrument, start_time, end_time) - # self.log.info(f"Visits: {len(visits)}") + self.log.info(f"Visits: {len(visits)}") # Identifies the period that will be used to consult the topics - topic_interval = self.get_topic_interval(start_time, end_time, exposures, visits) - self.log.info(f"Topic interval: {topic_interval[0]} - {topic_interval[1]}") + topic_interval = self.get_topic_interval(start_time, end_time, exposures, visits) + self.log.info(f"Topic interval: {topic_interval[0]} - {topic_interval[1]}") + + result_exp = {} + for exposure in exposures: + result_exp[exposure["id"]] = { + "exposure_id": exposure["id"], + "instrument": instrument, + # "timespan": exposure['timespan'], + } + + # self.log.info(result_exp) # Iterates over the columns defined in the config. # for each column retrieves EFD topic information - for column in self.config['columns']: + for column in self.config["columns"]: # self.log.debug(column) self.log.info(f"Proccessing Column: {column['name']}") - # Array com todos os topicos necessarios para esta coluna. + # Array with all topics needed for this column # topics = [{'name': topic name, series: pandas.DataFrame}] topics = await self.topics_by_column(column, topic_interval) - # self.log.debug(topics) - # print(topics[0]["series"]) - # print(df.info(verbose=True)) + for exposure in exposures: + column_value = self.proccess_column_value( + start_time=exposure["timespan"].begin, + end_time=exposure["timespan"].end, + topics=topics, + transform_function=column["function"], + ) + + result_exp[exposure["id"]][column["name"]] = column_value + + # for visit in visits: + # column_value = self.proccess_column_value( + # start_time=visit['timespan'].begin, + # end_time=visit['timespan'].end, + # topics = topics, + # transform_function=column['function'] + # ) + + # result_exp[visit['id']][column['name']] = column_value - # TODO: Parei aqui acredito que aqui deve entrar a get_topic_correponding_indexes - # correponding_indexes = self.get_topic_correponding_indexes( - # exposures, - # topic_time_array= NAO SEI COMO CRIAR ESTE PARAMETRO. - # ) + results = [] + for result_row in result_exp: + results.append(result_exp[result_row]) + + df = pandas.DataFrame(results[35:45]) + # self.log.info(df) + print(df) + + def proccess_column_value( + self, start_time: astropy.time.Time, end_time: astropy.time.Time, topics, transform_function + ): + + topics_values = self.topic_values_by_exposure(start_time, end_time, topics) + + values = self.concatenate_arrays(topics_values) + + column_value = Transform(values).apply(transform_function) + + return column_value + + def topic_values_by_exposure(self, start_time: astropy.time.Time, end_time: astropy.time.Time, topics): + + topics_values = [] + + for topic in topics: + topic_table = topic["series"] + values = topic_table.loc[ + (topic_table.index > str(start_time)) & (topic_table.index < str(end_time)) + ] + topics_values.append(values) + + return topics_values + + def concatenate_arrays( + self, input_data: Union[List[float], List[List[float]], numpy.ndarray, List[numpy.ndarray]] + ) -> numpy.ndarray: + """ + Concatenates values from a list or list of lists or a numpy array or list of numpy arrays, + returning a single flat numpy array. + + Args: + input_data: Input data, can be a list of floats or list of lists of floats or a numpy array or list of numpy arrays. + + Returns: + np.ndarray: Concatenated flat numpy array. + """ + if isinstance(input_data, numpy.ndarray): + return numpy.concatenate(input_data.flat) + elif isinstance(input_data, list): + flat_arrays = [ + numpy.array(arr).flat if isinstance(arr, numpy.ndarray) else numpy.array(arr).flatten() + for arr in input_data + ] + return numpy.concatenate(flat_arrays) + else: + raise TypeError( + "Input data must be a list or list of lists or a numpy array or list of numpy arrays." + ) def get_topic_correponding_indexes( self, @@ -87,19 +167,19 @@ def get_topic_correponding_indexes( topic_time_array: Union[List, numpy.ndarray], ) -> List: """ - Returns a list of topic indexes corresponding to each row in the + Returns a list of topic indexes corresponding to each row in the butler_table. - It is assumed that the butler_table and the topic_time_array are + It is assumed that the butler_table and the topic_time_array are sorted in time ascending order. Parameters: - - butler_table (pandas.DataFrame): The input DataFrame containing + - butler_table (pandas.DataFrame): The input DataFrame containing the butler table. - - topic_time_array (Callable[[list],numpy.ndarray]): A callable + - topic_time_array (Callable[[list],numpy.ndarray]): A callable function that returns an array of topic timespans. Returns: - - indexes (List): A list of lists, where each inner list contains the + - indexes (List): A list of lists, where each inner list contains the topic indexes corresponding to a row in the butler_table. """ indexes = [] @@ -118,34 +198,34 @@ def get_topic_correponding_indexes( indexes.append(topic_indexes) return indexes - async def topics_by_column(self, column, topic_interval) -> list[dict]: data = [] - for topic in column['topics']: + for topic in column["topics"]: + # self.log.debug(topic) topic_series = await self.get_efd_values(topic, topic_interval) - data.append({"topic": topic['name'], "series": topic_series}) + data.append({"topic": topic["name"], "series": topic_series}) self.log.debug(f"EFD Topic {topic['name']} return {len(topic_series)} rows") return data - - async def get_efd_values(self, - topic: dict[str, Any], - topic_interval: list[astropy.time.Time], - ) -> pandas.DataFrame: + async def get_efd_values( + self, + topic: dict[str, Any], + topic_interval: list[astropy.time.Time], + ) -> pandas.DataFrame: start = topic_interval[0].utc - end = topic_interval[1].utc - window = astropy.time.TimeDelta(topic.get("window", 0.0), format="sec") + end = topic_interval[1].utc + window = astropy.time.TimeDelta(topic.get("window", 0.0), format="sec") series = await self.efd.select_time_series( - topic["name"], - topic["fields"], - start - window, - end + window, - topic.get("index", None), - ) + topic["name"], + topic["fields"], + start - window, + end + window, + topic.get("index", None), + ) # TODO: Fazendo um resample e interpolate Provisório. # Somente para simular que existe mais de uma mensagem @@ -159,24 +239,25 @@ async def get_efd_values(self, return series - def get_topic_interval(self, - start_time: astropy.time.Time, - end_time: astropy.time.Time, - exposures: list[dict], - visits: list[dict] - ) -> list[astropy.time.Time]: + def get_topic_interval( + self, + start_time: astropy.time.Time, + end_time: astropy.time.Time, + exposures: list[dict], + visits: list[dict], + ) -> list[astropy.time.Time]: min_topic_time = start_time max_topic_time = end_time for exposure in exposures: - if exposure.timespan.end < end_time: - min_topic_time = min(exposure.timespan.begin, min_topic_time) - max_topic_time = max(exposure.timespan.begin, max_topic_time) - + if exposure["timespan"].end < end_time: + min_topic_time = min(exposure["timespan"].begin, min_topic_time) + max_topic_time = max(exposure["timespan"].begin, max_topic_time) + for visit in visits: - if visit.timespan.end < end_time: - min_topic_time = min(visit.timespan.begin, min_topic_time) - max_topic_time = max(visit.timespan.begin, max_topic_time) - - return [min_topic_time, max_topic_time] \ No newline at end of file + if visit["timespan"].end < end_time: + min_topic_time = min(visit["timespan"].begin, min_topic_time) + max_topic_time = max(visit["timespan"].begin, max_topic_time) + + return [min_topic_time, max_topic_time]