diff --git a/.gitignore b/.gitignore index 9b72af1a..a028b699 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,6 @@ venv # other misc ignore .DS_Store /docs/_build/ + +# sphinx build folder +docs/_build \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index 6617e758..38d252ae 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -42,6 +42,7 @@ "sphinx.ext.viewcode", "sphinx_panels", "sphinx_copybutton", + "sphinx.ext.todo", ] # Add any paths that contain templates here, relative to this directory. diff --git a/docs/reference/index.rst b/docs/reference/index.rst index 35bb1ca3..133bf219 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -8,4 +8,5 @@ API Reference .. toctree:: - tempo.tsdf \ No newline at end of file + tempo.tsdf + tempo.intervals \ No newline at end of file diff --git a/docs/reference/tempo.intervals.rst b/docs/reference/tempo.intervals.rst new file mode 100644 index 00000000..a5b11c64 --- /dev/null +++ b/docs/reference/tempo.intervals.rst @@ -0,0 +1,5 @@ +tempo.intervals +=============== + +.. autoclass:: tempo.intervals.IntervalsDF + :members: \ No newline at end of file diff --git a/python/tempo/intervals.py b/python/tempo/intervals.py new file mode 100644 index 00000000..12f7fe66 --- /dev/null +++ b/python/tempo/intervals.py @@ -0,0 +1,617 @@ +from __future__ import annotations + +from typing import Optional +from functools import cached_property + +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.types import NumericType, BooleanType, StructField +import pyspark.sql.functions as f +from pyspark.sql.window import Window + + +def is_metric_col(col: StructField) -> bool: + return isinstance(col.dataType, NumericType) or isinstance( + col.dataType, BooleanType + ) + + +class IntervalsDF: + """ + This object is the main wrapper over a `Spark DataFrame`_ which allows a + user to parallelize computations over snapshots of metrics for intervals + of time defined by a start and end timestamp and various dimensions. + + The required dimensions are `series` (list of columns by which to + summarize), `metrics` (list of columns to analyze), `start_ts` (timestamp + column), and `end_ts` (timestamp column). `start_ts` and `end_ts` can be + epoch or TimestampType. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + + """ + + def __init__( + self, df: DataFrame, start_ts: str, end_ts: str, series_ids: list[str] = None + ) -> None: + """ + Constructor for :class:`IntervalsDF`. + + :param df: + :type df: `DataFrame`_ + :param start_ts: + :type start_ts: str + :param end_ts: + :type end_ts: str + :param series_ids: + :type series_ids: list[str] + :rtype: None + + :Example: + + .. code-block: + + df = spark.createDataFrame( + [["2020-08-01 00:00:09", "2020-08-01 00:00:14", "v1", 5, 0]], + "start_ts STRING, end_ts STRING, series_1 STRING, metric_1 INT, metric_2 INT", + ) + idf = IntervalsDF(df, "start_ts", "end_ts", ["series_1"], ["metric_1", "metric_2"]) + idf.df.collect() + [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=0)] + + .. _`DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + + .. todo:: + - create IntervalsSchema class to validate data types and column + existence + - check elements of series and identifiers to ensure all are str + - check if start_ts, end_ts, and the elements of series and + identifiers can be of type col + + """ + + self.df = df + + self.start_ts = start_ts + self.end_ts = end_ts + + if not series_ids: + self.series_ids = [] + elif isinstance(series_ids, list): + self.series_ids = series_ids + else: + raise ValueError( + f"series_ids must be a list of column names, instead got {type(series_ids)}" + ) + + @cached_property + def interval_boundaries(self) -> list[str]: + return [self.start_ts, self.end_ts] + + @cached_property + def structural_columns(self) -> list[str]: + return self.interval_boundaries + self.series_ids + + @cached_property + def observational_columns(self) -> list[str]: + return list(set(self.df.columns) - set(self.structural_columns)) + + @cached_property + def metric_columns(self) -> list[str]: + return [col.name for col in self.df.schema.fields if is_metric_col(col)] + + @cached_property + def window(self): + return Window.partitionBy(*self.series_ids).orderBy(*self.interval_boundaries) + + @classmethod + def fromStackedMetrics( + cls, + df: DataFrame, + start_ts: str, + end_ts: str, + series: list[str], + metrics_name_col: str, + metrics_value_col: str, + metric_names: Optional[list[str]] = None, + ) -> "IntervalsDF": + """ + Returns a new :class:`IntervalsDF` with metrics of the current DataFrame + pivoted by start and end timestamp and series. + + There are two versions of `fromStackedMetrics`. One that requires the caller + to specify the list of distinct metric names to pivot on, and one that does + not. The latter is more concise but less efficient, because Spark needs to + first compute the list of distinct metric names internally. + + :param df: :class:`DataFrame` to wrap with :class:`IntervalsDF` + :type df: `DataFrame`_ + :param start_ts: Name of the column which denotes interval start + :type start_ts: str + :param end_ts: Name of the column which denotes interval end + :type end_ts: str + :param series: column names + :type series: list[str] + :param metrics_name_col: column name + :type metrics_name_col: str + :param metrics_value_col: column name + :type metrics_value_col: str + :param metric_names: List of metric names that will be translated to + columns in the output :class:`IntervalsDF`. + :type metric_names: list[str], optional + :return: A new :class:`IntervalsDF` with a column and respective + values per distinct metric in `metrics_name_col`. + + :Example: + + .. code-block:: + + df = spark.createDataFrame( + [["2020-08-01 00:00:09", "2020-08-01 00:00:14", "v1", "metric_1", 5], + ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", "metric_2", 0]], + "start_ts STRING, end_ts STRING, series_1 STRING, metric_name STRING, metric_value INT", + ) + + # With distinct metric names specified + + idf = IntervalsDF.fromStackedMetrics( + df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value", ["metric_1", "metric_2"], + ) + idf.df.collect() + [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null), + Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)] + + # Or without specifying metric names (less efficient) + + idf = IntervalsDF.fromStackedMetrics(df, "start_ts", "end_ts", ["series_1"], "metric_name", "metric_value") + idf.df.collect() + [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null), + Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=null, metric_2=0)] + + .. _`DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + + .. todo:: + - check elements of identifiers to ensure all are str + - check if start_ts, end_ts, and the elements of series and + identifiers can be of type col + + """ + + if not isinstance(series, list): + raise ValueError + + df = ( + df.groupBy(start_ts, end_ts, *series) + .pivot(metrics_name_col, values=metric_names) # type: ignore + .max(metrics_value_col) + ) + + return cls(df, start_ts, end_ts, series) + + def __get_adjacent_rows(self, df: DataFrame) -> DataFrame: + """ + Returns a new `Spark DataFrame`_ containing columns from applying the + `lead`_ and `lag`_ window functions. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + .. `lead`_: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lead.html#pyspark.sql.functions.lead + .. `lag`_: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lag.html#pyspark.sql.functions.lag + + .. todo: + - should column names generated here be created at class + initialization, saved as attributes then iterated on here? + this would allow easier reuse throughout code + + """ + for c in self.interval_boundaries + self.metric_columns: + df = df.withColumn( + f"_lead_1_{c}", + f.lead(c, 1).over(self.window), + ).withColumn( + f"_lag_1_{c}", + f.lag(c, 1).over(self.window), + ) + + return df + + def __identify_subset_intervals(self, df: DataFrame) -> tuple[DataFrame, str]: + """ + Returns a new `Spark DataFrame`_ containing a boolean column if the + current interval is a subset of the previous interval and the name + of this column for future use. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + + .. todo: + - should subset_indicator be defined here or as an attribute for + easier reuse across code? + + """ + + subset_indicator = "_lag_1_is_subset" + + df = df.withColumn( + subset_indicator, + (f.col(f"_lag_1_{self.start_ts}") <= f.col(self.start_ts)) + & (f.col(f"_lag_1_{self.end_ts}") >= f.col(self.end_ts)), + ) + + # NB: the first record cannot be a subset of the previous and + # `lag` will return null for this record with no default set + df = df.fillna( + False, + subset=[subset_indicator], + ) + + return df, subset_indicator + + def __identify_overlaps(self, df: DataFrame) -> tuple[DataFrame, list[str]]: + """ + Returns a new `Spark DataFrame`_ containing boolean columns if the + current interval overlaps with the previous or next interval and a list + with the names of these columns for future use. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + + .. todo: + - should overlap_indicators be defined here or as an attribute for + easier reuse across code? + + """ + + overlap_indicators: list[str] = [] + + # identify overlaps for each interval boundary + # NB: between is inclusive so not used here, and + # matches on boundaries should be ignored + for ts in self.interval_boundaries: + df = df.withColumn( + f"_lead_1_{ts}_overlaps", + (f.col(f"_lead_1_{ts}") > f.col(self.start_ts)) + & (f.col(f"_lead_1_{ts}") < f.col(self.end_ts)), + ).withColumn( + f"_lag_1_{ts}_overlaps", + (f.col(f"_lag_1_{ts}") > f.col(self.start_ts)) + & (f.col(f"_lag_1_{ts}") < f.col(self.end_ts)), + ) + + overlap_indicators.extend( + ( + f"_lead_1_{ts}_overlaps", + f"_lag_1_{ts}_overlaps", + ) + ) + + # NB: the first and last record cannot be a subset of the previous and + # next respectively. `lag` will return null for this record with no + # default set. + df = df.fillna( + False, + subset=overlap_indicators, + ) + + return df, overlap_indicators + + def __merge_adjacent_subset_and_superset( + self, df: DataFrame, subset_indicator: str + ) -> DataFrame: + """ + Returns a new `Spark DataFrame`_ where a subset and it's adjacent + superset, identified by `subset_indicator` are merged together. + + We assume that a metric cannot simultaneously have two values in the + same interval (unless captured in a structure such as ArrayType, etc) + so `coalesce`_ is used to merge metrics when a subset exist. Priority + in the coalesce is given to the metrics of the current record, ie it + is listed as the first argument. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + .. _`coalesce`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.coalesce.html + + .. todo: + - should subset_indicator be defined here or as an attribute for + easier reuse across code? + + """ + + for c in self.metric_columns: + df = df.withColumn( + c, + f.when( + f.col(subset_indicator), f.coalesce(f.col(c), f"_lag_1_{c}") + ).otherwise(f.col(c)), + ) + + return df + + def __merge_adjacent_overlaps( + self, df: DataFrame, how: str, overlap_indicators: list[str] + ) -> DataFrame: + """ + Returns a new `Spark DataFrame`_ where adjacent intervals which overlap, + identified by `overlap_indicators` are merged together. + + We assume that a metric cannot simultaneously have two values in the + same interval (unless captured in a structure such as ArrayType, etc) + so `coalesce`_ is used to merge metrics when overlaps exist. Priority + in the `coalesce` is given to the metrics of the current record, ie it + is listed as the first argument. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + .. _`coalesce`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.coalesce.html + + .. todo: + - should overlap_indicators be defined here or as an attribute for + easier reuse across code? + + """ + + if how == "left": + + # new boundary for interval end will become the start of the next + # interval + new_boundary_col = self.end_ts + new_boundary_val = f"_lead_1_{self.start_ts}" + + else: + + # new boundary for interval start will become the end of the + # previous interval + new_boundary_col = self.start_ts + new_boundary_val = f"_lead_1_{self.end_ts}" + + # NB: supersets are split here + # subsets are filled with the superset metrics in + # `__merge_adjacent_subset_and_superset` + superset_interval_when_case = ( + f"WHEN _lead_1_{self.start_ts}_overlaps " + f"AND _lead_1_{self.end_ts}_overlaps " + f"THEN {new_boundary_val} " + ) + + overlap_interval_when_cases = tuple( + f"WHEN {c} THEN {c.replace('_overlaps', '')} " for c in overlap_indicators + ) + + # this logic will be used to create boundaries of disjoint intervals + new_interval_boundaries = ( + "CASE " + + superset_interval_when_case + + " ".join(overlap_interval_when_cases) + + "ELSE null END " + ) + + df = df.withColumn( + new_boundary_col, + f.expr(new_interval_boundaries), + ) + + if how == "left": + + for c in self.metric_columns: + df = df.withColumn( + c, + # needed when intervals have same start but different ends + # in this case, merge metrics since they overlap + f.when( + f.col(f"_lag_1_{self.end_ts}_overlaps"), + f.coalesce(f.col(c), f.col(f"_lag_1_{c}")), + ) + # general case when constructing left disjoint interval + # just want new boundary without merging metrics + .otherwise(f.col(c)), + ) + + return df + + def __merge_equal_intervals(self, df: DataFrame) -> DataFrame: + """ + Returns a new `Spark DataFrame`_ where intervals with the same start + and end timestamps are merged together. + + We assume that a metric cannot simultaneously have two values in the + same interval (unless captured in a structure such as ArrayType, etc.) + so `groupBy`_ with an arbitrary `aggregate function`_ is used to merge + metrics when a subset exists. In this implementation, `max`_ is used to + perform the merge. + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + .. _`groupBy`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html + .. _`aggregate function`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html#aggregate-functions + .. _`max`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.max.html + + """ + + merge_expr = tuple(f.max(c).alias(c) for c in self.metric_columns) + + return df.groupBy(*self.interval_boundaries, *self.series_ids).agg(*merge_expr) + + def disjoint(self) -> "IntervalsDF": + """ + Returns a new :class:`IntervalsDF` where metrics of overlapping time intervals + are correlated and merged prior to constructing new time interval boundaries ( + start and end timestamp) so that all intervals are disjoint. + + The merge process assumes that two overlapping intervals cannot simultaneously + report two different values for the same metric unless recorded in a data type + which supports multiple elements (such as ArrayType, etc.). + + This is often used after :meth:`fromStackedMetrics` to reduce the number of + metrics with `null` values and helps when constructing filter predicates to + to retrieve specific metric values across all instances. + + :return: A new :class:`IntervalsDF` containing disjoint time intervals + + :Example: + + .. code-block:: + + df = spark.createDataFrame( + [["2020-08-01 00:00:10", "2020-08-01 00:00:14", "v1", 5, null], + ["2020-08-01 00:00:09", "2020-08-01 00:00:11", "v1", null, 0]], + "start_ts STRING, end_ts STRING, series_1 STRING, metric_1 STRING, metric_2 INT", + ) + idf = IntervalsDF(df, "start_ts", "end_ts", ["series_1"], ["metric_1", "metric_2"]) + idf.disjoint().df.collect() + [Row(start_ts='2020-08-01 00:00:09', end_ts='2020-08-01 00:00:10', series_1='v1', metric_1=null, metric_2=0), + Row(start_ts='2020-08-01 00:00:10', end_ts='2020-08-01 00:00:11', series_1='v1', metric_1=5, metric_2=0), + Row(start_ts='2020-08-01 00:00:11', end_ts='2020-08-01 00:00:14', series_1='v1', metric_1=5, metric_2=null)] + + .. todo: + - checks that when merging in helper functions, prior to coalesce at least + one of the metrics is null + + """ + df = self.df + + df = self.__get_adjacent_rows(df) + + (df, subset_indicator) = self.__identify_subset_intervals(df) + + subset_df = df.filter(f.col(subset_indicator)) + + subset_df = self.__merge_adjacent_subset_and_superset( + subset_df, subset_indicator + ) + + subset_df = subset_df.select( + *self.interval_boundaries, *self.series_ids, *self.metric_columns + ) + + non_subset_df = df.filter(~f.col(subset_indicator)) + + (non_subset_df, overlap_indicators) = self.__identify_overlaps(non_subset_df) + + overlaps_predicate = " OR ".join(tuple(col for col in overlap_indicators)) + + overlaps_df = non_subset_df.filter(overlaps_predicate) + + disjoint_predicate = f"NOT({overlaps_predicate})" + + # filter for intervals that are already disjoint + disjoint_df = non_subset_df.filter(disjoint_predicate).select( + *self.interval_boundaries, *self.series_ids, *self.metric_columns + ) + + left_overlaps_df = self.__merge_adjacent_overlaps( + overlaps_df, "left", overlap_indicators + ) + + left_overlaps_df = left_overlaps_df.select( + *self.interval_boundaries, *self.series_ids, *self.metric_columns + ) + + right_overlaps_df = self.__merge_adjacent_overlaps( + overlaps_df, "right", overlap_indicators + ) + + right_overlaps_df = right_overlaps_df.select( + *self.interval_boundaries, *self.series_ids, *self.metric_columns + ) + + unioned_df = ( + subset_df.unionByName(disjoint_df) + .unionByName(left_overlaps_df) + .unionByName(right_overlaps_df) + ) + + disjoint_df = self.__merge_equal_intervals(unioned_df) + + return IntervalsDF(disjoint_df, self.start_ts, self.end_ts, self.series_ids) + + def union(self, other: "IntervalsDF") -> "IntervalsDF": + """ + Returns a new :class:`IntervalsDF` containing union of rows in this and another + :class:`IntervalsDF`. + + This is equivalent to UNION ALL in SQL. To do a SQL-style set union + (that does deduplication of elements), use this function followed by + distinct(). + + Also, as standard in SQL, this function resolves columns by position + (not by name). + + Based on `pyspark.sql.DataFrame.union`_. + + :param other: :class:`IntervalsDF` to `union` + :type other: :class:`IntervalsDF` + :return: A new :class:`IntervalsDF` containing union of rows in this + and `other` + + .. _`pyspark.sql.DataFrame.union`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html + + """ + + if not isinstance(other, IntervalsDF): + raise TypeError + + return IntervalsDF( + self.df.union(other.df), self.start_ts, self.end_ts, self.series_ids + ) + + def unionByName(self, other: "IntervalsDF") -> "IntervalsDF": + """ + Returns a new :class:`IntervalsDF` containing union of rows in this + and another :class:`IntervalsDF`. + + This is different from both UNION ALL and UNION DISTINCT in SQL. To do + a SQL-style set union (that does deduplication of elements), use this + function followed by distinct(). + + Based on `pyspark.sql.DataFrame.unionByName`_; however, + `allowMissingColumns` is not supported. + + :param other: :class:`IntervalsDF` to `unionByName` + :type other: :class:`IntervalsDF` + :return: A new :class:`IntervalsDF` containing union of rows in this + and `other` + + .. _`pyspark.sql.DataFrame.unionByName`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.unionByName.html + + """ + + if not isinstance(other, IntervalsDF): + raise TypeError + + return IntervalsDF( + self.df.unionByName(other.df), self.start_ts, self.end_ts, self.series_ids + ) + + def toDF(self, stack: bool = False) -> DataFrame: + """ + Returns a new `Spark DataFrame`_ converted from :class:`IntervalsDF`. + + There are two versions of `toDF`. One that will output columns as they exist + in :class:`IntervalsDF` and, one that will stack metric columns into + `metric_names` and `metric_values` columns populated with their respective + values. The latter can be thought of as the inverse of + :meth:`fromStackedMetrics`. + + Based on `pyspark.sql.DataFrame.toDF`_. + + :param stack: How to handle metric columns in the conversion to a `DataFrame` + :type stack: bool, optional + :return: + + .. _`Spark DataFrame`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html + .. _`pyspark.sql.DataFrame.toDF`: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toDF.html + .. _`STACK`: https://spark.apache.org/docs/latest/api/sql/index.html#stack + + """ + + if stack: + + n_cols = len(self.metric_columns) + metric_cols_expr = ",".join( + tuple(f"'{col}', {col}" for col in self.metric_columns) + ) + + stack_expr = ( + f"STACK({n_cols}, {metric_cols_expr}) AS (metric_name, metric_value)" + ) + + return self.df.select( + *self.interval_boundaries, *self.series_ids, f.expr(stack_expr) + ).dropna(subset="metric_value") + + else: + return self.df diff --git a/python/tests/base.py b/python/tests/base.py index 7fa3a60d..24a7f190 100644 --- a/python/tests/base.py +++ b/python/tests/base.py @@ -9,6 +9,7 @@ import pyspark.sql.functions as F from pyspark.sql import SparkSession from tempo.tsdf import TSDF +from tempo.intervals import IntervalsDF from chispa import assert_df_equality from pyspark.sql.dataframe import DataFrame @@ -84,6 +85,17 @@ def get_data_as_tsdf(self, name: str, convert_ts_col=True): ) return tsdf + def get_data_as_idf(self, name: str, convert_ts_col=True): + df = self.get_data_as_sdf(name, convert_ts_col) + td = self.test_data[name] + idf = IntervalsDF( + df, + start_ts=td["start_ts"], + end_ts=td["end_ts"], + series_ids=td.get("series", None), + ) + return idf + TEST_DATA_FOLDER = "unit_test_data" def __getTestDataFilePath(self, test_file_name: str) -> str: @@ -189,9 +201,10 @@ def assertSchemaContainsField(self, schema, field): @staticmethod def assertDataFrameEquality( - df1: Union[TSDF, DataFrame], - df2: Union[TSDF, DataFrame], + df1: Union[IntervalsDF, TSDF, DataFrame], + df2: Union[IntervalsDF, TSDF, DataFrame], from_tsdf: bool = False, + from_idf: bool = False, ignore_row_order: bool = False, ignore_column_order: bool = True, ignore_nullable: bool = True, @@ -201,7 +214,7 @@ def assertDataFrameEquality( That is, they have equivalent schemas, and both contain the same values """ - if from_tsdf: + if from_tsdf or from_idf: df1 = df1.df df2 = df2.df diff --git a/python/tests/intervals_tests.py b/python/tests/intervals_tests.py new file mode 100644 index 00000000..841b4dd2 --- /dev/null +++ b/python/tests/intervals_tests.py @@ -0,0 +1,344 @@ +from tempo.intervals import * +from tests.tsdf_tests import SparkTest +from pyspark.sql.utils import AnalysisException +import pyspark.sql.functions as f + + +class IntervalsDFTests(SparkTest): + union_tests_dict_input = [ + { + "start_ts": "2020-08-01 00:00:09", + "end_ts": "2020-08-01 00:00:14", + "series_1": "v1", + "metric_1": 5, + "metric_2": None, + }, + { + "start_ts": "2020-08-01 00:00:09", + "end_ts": "2020-08-01 00:00:11", + "series_1": "v1", + "metric_1": None, + "metric_2": 0, + }, + { + "start_ts": "2020-08-01 00:00:09", + "end_ts": "2020-08-01 00:00:12", + "series_1": "v1", + "metric_1": None, + "metric_2": 4, + }, + { + "start_ts": "2020-08-01 00:00:09", + "end_ts": "2020-08-01 00:00:14", + "series_1": "v1", + "metric_1": 5, + "metric_2": None, + }, + { + "start_ts": "2020-08-01 00:00:09", + "end_ts": "2020-08-01 00:00:11", + "series_1": "v1", + "metric_1": None, + "metric_2": 0, + }, + { + "start_ts": "2020-08-01 00:00:09", + "end_ts": "2020-08-01 00:00:12", + "series_1": "v1", + "metric_1": None, + "metric_2": 4, + }, + ] + + def test_init_series_str(self): + df_input = self.get_data_as_sdf("input") + + self.assertRaises( + ValueError, + IntervalsDF, + df_input, + "start_ts", + "end_ts", + "series_1", + ) + + def test_init_series_tuple(self): + df_input = self.get_data_as_sdf("input") + + self.assertRaises( + ValueError, + IntervalsDF, + df_input, + "start_ts", + "end_ts", + ("series_1",), + ) + + def test_init_series_list(self): + df_input = self.get_data_as_sdf("input") + + idf = IntervalsDF(df_input, "start_ts", "end_ts", ["series_1"]) + + self.assertIsInstance(idf, IntervalsDF) + self.assertIsInstance(idf.df, DataFrame) + self.assertEqual(idf.start_ts, "start_ts") + self.assertEqual(idf.end_ts, "end_ts") + self.assertEqual(idf.interval_boundaries, ["start_ts", "end_ts"]) + self.assertCountEqual(idf.series_ids, ["series_1"]) + self.assertCountEqual( + idf.structural_columns, ["start_ts", "end_ts", "series_1"] + ) + self.assertCountEqual(idf.observational_columns, ["metric_1", "metric_2"]) + self.assertCountEqual(idf.metric_columns, ["metric_1", "metric_2"]) + + def test_init_series_none(self): + df_input = self.get_data_as_sdf("input") + + idf = IntervalsDF(df_input, "start_ts", "end_ts", None) + + self.assertIsInstance(idf, IntervalsDF) + self.assertIsInstance(idf.df, DataFrame) + self.assertEqual(idf.start_ts, "start_ts") + self.assertEqual(idf.end_ts, "end_ts") + self.assertEqual(idf.interval_boundaries, ["start_ts", "end_ts"]) + self.assertCountEqual(idf.series_ids, []) + self.assertCountEqual(idf.structural_columns, ["start_ts", "end_ts"]) + self.assertCountEqual( + idf.observational_columns, ["series_1", "metric_1", "metric_2"] + ) + self.assertCountEqual(idf.metric_columns, ["metric_1", "metric_2"]) + + def test_fromStackedMetrics_series_str(self): + df_input = self.get_data_as_sdf("input") + + self.assertRaises( + ValueError, + IntervalsDF.fromStackedMetrics, + df_input, + "start_ts", + "end_ts", + "series_1", + "metric_name", + "metric_value", + ) + + def test_fromStackedMetrics_series_tuple(self): + df_input = self.get_data_as_sdf("input") + + self.assertRaises( + ValueError, + IntervalsDF.fromStackedMetrics, + df_input, + "start_ts", + "end_ts", + ("series_1",), + "metric_name", + "metric_value", + ) + + def test_fromStackedMetrics_series_list(self): + df_input = self.get_data_as_sdf("input") + idf_expected = self.get_data_as_idf("expected") + + df_input = df_input.withColumn( + "start_ts", f.to_timestamp("start_ts") + ).withColumn("end_ts", f.to_timestamp("end_ts")) + + idf = IntervalsDF.fromStackedMetrics( + df_input, + "start_ts", + "end_ts", + [ + "series_1", + ], + "metric_name", + "metric_value", + ) + + self.assertDataFrameEquality(idf, idf_expected, from_idf=True) + + def test_fromStackedMetrics_metric_names(self): + df_input = self.get_data_as_sdf("input") + idf_expected = self.get_data_as_idf("expected") + + df_input = df_input.withColumn( + "start_ts", f.to_timestamp("start_ts") + ).withColumn("end_ts", f.to_timestamp("end_ts")) + + idf = IntervalsDF.fromStackedMetrics( + df_input, + "start_ts", + "end_ts", + [ + "series_1", + ], + "metric_name", + "metric_value", + ["metric_1", "metric_2"], + ) + + self.assertDataFrameEquality(idf, idf_expected, from_idf=True) + + def test_disjoint(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_contains_interval_already_disjoint(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_contains_intervals_equal(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_intervals_same_start(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_intervals_same_end(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_multiple_series(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_single_metric(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_disjoint_interval_is_subset(self): + idf_input = self.get_data_as_idf("input") + idf_expected = self.get_data_as_idf("expected") + + idf_actual = idf_input.disjoint() + + self.assertDataFrameEquality( + idf_expected, idf_actual, from_idf=True, ignore_row_order=True + ) + + def test_union_other_idf(self): + idf_input_1 = self.get_data_as_idf("input") + idf_input_2 = self.get_data_as_idf("input") + + count_idf_1 = idf_input_1.df.count() + count_idf_2 = idf_input_2.df.count() + + union_idf = idf_input_1.union(idf_input_2) + + count_union = union_idf.df.count() + + self.assertEqual(count_idf_1 + count_idf_2, count_union) + + def test_union_other_df(self): + idf_input = self.get_data_as_idf("input") + df_input = self.get_data_as_sdf("input") + + self.assertRaises(TypeError, idf_input.union, df_input) + + def test_union_other_list_dicts(self): + idf_input = self.get_data_as_idf("input") + + self.assertRaises( + TypeError, idf_input.union, IntervalsDFTests.union_tests_dict_input + ) + + def test_unionByName_other_idf(self): + idf_input_1 = self.get_data_as_idf("input") + idf_input_2 = self.get_data_as_idf("input") + + count_idf_1 = idf_input_1.df.count() + count_idf_2 = idf_input_2.df.count() + + union_idf = idf_input_1.unionByName(idf_input_2) + + count_union_by_name = union_idf.df.count() + + self.assertEqual(count_idf_1 + count_idf_2, count_union_by_name) + + def test_unionByName_other_df(self): + idf_input = self.get_data_as_idf("input") + df_input = self.get_data_as_sdf("input") + + self.assertRaises(TypeError, idf_input.unionByName, df_input) + + def test_unionByName_other_list_dicts(self): + idf_input = self.get_data_as_idf("input") + + self.assertRaises( + TypeError, idf_input.unionByName, IntervalsDFTests.union_tests_dict_input + ) + + def test_unionByName_extra_column(self): + idf_extra_col = self.get_data_as_idf("input_extra_col") + idf_input = self.get_data_as_idf("input") + + self.assertRaises(AnalysisException, idf_extra_col.unionByName, idf_input) + + def test_unionByName_other_extra_column(self): + idf_input = self.get_data_as_idf("input") + idf_extra_col = self.get_data_as_idf("input_extra_col") + + self.assertRaises(AnalysisException, idf_input.unionByName, idf_extra_col) + + def test_toDF(self): + idf_input = self.get_data_as_idf("input") + expected_df = self.get_data_as_sdf("input") + + actual_df = idf_input.toDF() + + self.assertDataFrameEquality(actual_df, expected_df) + + def test_toDF_stack(self): + idf_input = self.get_data_as_idf("input") + expected_df = self.get_data_as_sdf("expected") + + expected_df = expected_df.withColumn( + "start_ts", f.to_timestamp("start_ts") + ).withColumn("end_ts", f.to_timestamp("end_ts")) + + actual_df = idf_input.toDF(stack=True) + + self.assertDataFrameEquality(actual_df, expected_df) diff --git a/python/tests/unit_test_data/intervals_tests.json b/python/tests/unit_test_data/intervals_tests.json new file mode 100644 index 00000000..672ad198 --- /dev/null +++ b/python/tests/unit_test_data/intervals_tests.json @@ -0,0 +1,932 @@ +{ + "__SharedData": { + "init": { + "schema": "start_ts STRING NOT NULL, end_ts STRING NOT NULL, series_1 STRING NOT NULL, metric_1 INT, metric_2 INT", + "other_ts_cols": [ + "start_ts", + "end_ts" + ], + "start_ts": "start_ts", + "end_ts": "end_ts", + "series": [ + "series_1" + ], + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:12", + "v1", + null, + 4 + ] + ] + } + }, + "IntervalsDFTests": { + "test_init_series_str": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_init_series_tuple": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_init_series_list": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_init_series_none": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_init_series_truthiness": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_init_metric_none": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_init_metric_truthiness": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_fromStackedMetrics_series_str": { + "input": { + "schema": "start_ts STRING NOT NULL, end_ts STRING NOT NULL, series_1 STRING NOT NULL, metric_name STRING NOT NULL, metric_value INT NOT NULL", + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + "metric_1", + 5 + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + "metric_2", + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:12", + "v1", + "metric_2", + 4 + ] + ] + } + }, + "test_fromStackedMetrics_series_tuple": { + "input": { + "$ref": "#/IntervalsDFTests/test_fromStackedMetrics_series_str/input" + } + }, + "test_fromStackedMetrics_series_list": { + "input": { + "$ref": "#/IntervalsDFTests/test_fromStackedMetrics_series_str/input" + }, + "expected": { + "schema": "start_ts STRING NOT NULL, end_ts STRING NOT NULL, series_1 STRING NOT NULL, metric_1 INT, metric_2 INT", + "other_ts_cols": [ + "start_ts", + "end_ts" + ], + "start_ts": "start_ts", + "end_ts": "end_ts", + "series": [ + "series_1" + ], + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:12", + "v1", + null, + 4 + ] + ] + } + }, + "test_fromStackedMetrics_metric_names": { + "input": { + "$ref": "#/IntervalsDFTests/test_fromStackedMetrics_series_list/input" + }, + "expected": { + "$ref": "#/IntervalsDFTests/test_fromStackedMetrics_series_list/expected" + } + }, + "test_disjoint": { + "input": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:14", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + null, + 0 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:10", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:11", + "v1", + 5, + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + 5, + null + ] + ] + } + }, + "test_disjoint_contains_interval_already_disjoint": { + "input": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:13", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:12", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v1", + null, + 4 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:10", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:12", + "v1", + 5, + 0 + ], + [ + "2020-08-01 00:00:12", + "2020-08-01 00:00:13", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v1", + null, + 4 + ] + ] + } + }, + "test_disjoint_contains_intervals_equal": { + "input": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:13", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:12", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v1", + null, + 4 + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v1", + 7, + null + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:10", + "v1", + null, + 0 + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:12", + "v1", + 5, + 0 + ], + [ + "2020-08-01 00:00:12", + "2020-08-01 00:00:13", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v1", + 7, + 4 + ] + ] + } + }, + "test_disjoint_intervals_same_start": { + "input": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + null, + 0 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + 5, + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + 5, + null + ] + ] + } + }, + "test_disjoint_intervals_same_end": { + "input": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + null, + 0 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + 5, + 0 + ] + ] + } + }, + "test_disjoint_multiple_series": { + "input": { + "schema": "start_ts STRING NOT NULL, end_ts STRING NOT NULL, series_1 STRING NOT NULL, series_2 STRING NOT NULL, metric_1 INT, metric_2 INT", + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": [ + "series_1", + "series_2" + ], + "data": [ + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:14", + "v1", + "foo", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + "bar", + 3, + 2 + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + "foo", + null, + 0 + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:13", + "v2", + "foo", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:12", + "v2", + "foo", + null, + 0 + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v2", + "foo", + null, + 4 + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v2", + "foo", + 6, + 3 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/IntervalsDFTests/test_disjoint_multiple_series/input/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/IntervalsDFTests/test_disjoint_multiple_series/input/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + "bar", + 3, + 2 + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:10", + "v1", + "foo", + null, + 0 + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:11", + "v1", + "foo", + 5, + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + "foo", + 5, + null + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:10", + "v2", + "foo", + null, + 0 + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:12", + "v2", + "foo", + 5, + 0 + ], + [ + "2020-08-01 00:00:12", + "2020-08-01 00:00:13", + "v2", + "foo", + 5, + null + ], + [ + "2020-08-01 00:00:13", + "2020-08-01 00:00:14", + "v2", + "foo", + 6, + 4 + ] + ] + } + }, + "test_disjoint_single_metric": { + "input": { + "schema": "start_ts STRING NOT NULL, end_ts STRING NOT NULL, series_1 STRING NOT NULL, metric_1 INT", + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + 5 + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + 4 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/IntervalsDFTests/test_disjoint_single_metric/input/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": { + "$ref": "#/IntervalsDFTests/test_disjoint_single_metric/input/data" + } + } + }, + "test_disjoint_interval_is_subset": { + "input": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:11", + "v1", + null, + 0 + ] + ] + }, + "expected": { + "schema": { + "$ref": "#/__SharedData/init/schema" + }, + "other_ts_cols": { + "$ref": "#/__SharedData/init/other_ts_cols" + }, + "start_ts": { + "$ref": "#/__SharedData/init/start_ts" + }, + "end_ts": { + "$ref": "#/__SharedData/init/end_ts" + }, + "series": { + "$ref": "#/__SharedData/init/series" + }, + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:10", + "v1", + 5, + null + ], + [ + "2020-08-01 00:00:10", + "2020-08-01 00:00:11", + "v1", + 5, + 0 + ], + [ + "2020-08-01 00:00:11", + "2020-08-01 00:00:14", + "v1", + 5, + null + ] + ] + } + }, + "test_union_other_idf": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_union_other_df": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_union_other_list_dicts": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_unionByName_other_idf": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_unionByName_other_df": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_unionByName_other_list_dicts": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_unionByName_extra_column": { + "input": { + "$ref": "#/__SharedData/init" + }, + "input_extra_col": { + "schema": "start_ts STRING NOT NULL, end_ts STRING NOT NULL, series_1 STRING NOT NULL, metric_1 INT, metric_2 INT, metric_3 INT", + "other_ts_cols": [ + "start_ts", + "end_ts" + ], + "start_ts": "start_ts", + "end_ts": "end_ts", + "series": [ + "series_1" + ], + "data": [ + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:14", + "v1", + 5, + null, + 1 + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:11", + "v1", + null, + 0, + 2 + ], + [ + "2020-08-01 00:00:09", + "2020-08-01 00:00:12", + "v1", + null, + 4, + 3 + ] + ] + } + }, + "test_unionByName_other_extra_column": { + "input": { + "$ref": "#/__SharedData/init" + }, + "input_extra_col": { + "$ref": "#/IntervalsDFTests/test_unionByName_extra_column/input_extra_col" + } + }, + "test_toDF": { + "input": { + "$ref": "#/__SharedData/init" + } + }, + "test_toDF_stack": { + "input": { + "$ref": "#/IntervalsDFTests/test_fromStackedMetrics_series_list/expected" + }, + "expected": { + "$ref": "#/IntervalsDFTests/test_fromStackedMetrics_series_list/input" + } + } + } +}