Skip to content

Commit

Permalink
Add DateutilTimePeriodAdjuster (#1233)
Browse files Browse the repository at this point in the history
### Description
This adds an implementation for adjusting time periods using `dateutil`
instead
of `pandas`.

<!--- 
  Before requesting review, please make sure you have:
1. read [the contributing
guide](https://github.com/dbt-labs/metricflow/blob/main/CONTRIBUTING.md),
2. signed the
[CLA](https://docs.getdbt.com/docs/contributor-license-agreements)
3. run `changie new` to [create a changelog
entry](https://github.com/dbt-labs/metricflow/blob/main/CONTRIBUTING.md#adding-a-changelog-entry)
-->
  • Loading branch information
plypaul authored May 31, 2024
1 parent 853e079 commit 4bbccce
Show file tree
Hide file tree
Showing 5 changed files with 5,754 additions and 27 deletions.
30 changes: 6 additions & 24 deletions metricflow-semantics/metricflow_semantics/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dataclasses import dataclass
from typing import List, Optional, Sequence, Tuple, Union

import pandas as pd
from dbt_semantic_interfaces.implementations.filters.where_filter import (
PydanticWhereFilter,
PydanticWhereFilterIntersection,
Expand Down Expand Up @@ -64,12 +63,7 @@
TimeDimensionSpec,
)
from metricflow_semantics.specs.spec_set import group_specs_by_type
from metricflow_semantics.time.pandas_adjuster import (
adjust_to_end_of_period,
adjust_to_start_of_period,
is_period_end,
is_period_start,
)
from metricflow_semantics.time.dateutil_adjuster import DateutilTimePeriodAdjuster

logger = logging.getLogger(__name__)

Expand All @@ -95,6 +89,7 @@ def __init__( # noqa: D107
DunderNamingScheme(),
)
self._where_filter_pattern_factory = where_filter_pattern_factory
self._time_period_adjuster = DateutilTimePeriodAdjuster()

def parse_and_validate_saved_query(
self,
Expand Down Expand Up @@ -191,23 +186,10 @@ def _adjust_time_constraint(
e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
constraint_start = time_constraint.start_time
constraint_end = time_constraint.end_time

start_ts = pd.Timestamp(time_constraint.start_time)
if not is_period_start(metric_time_granularity, start_ts):
constraint_start = adjust_to_start_of_period(metric_time_granularity, start_ts).to_pydatetime()

end_ts = pd.Timestamp(time_constraint.end_time)
if not is_period_end(metric_time_granularity, end_ts):
constraint_end = adjust_to_end_of_period(metric_time_granularity, end_ts).to_pydatetime()

if constraint_start < TimeRangeConstraint.ALL_TIME_BEGIN():
constraint_start = TimeRangeConstraint.ALL_TIME_BEGIN()
if constraint_end > TimeRangeConstraint.ALL_TIME_END():
constraint_end = TimeRangeConstraint.ALL_TIME_END()

return TimeRangeConstraint(start_time=constraint_start, end_time=constraint_end)
return self._time_period_adjuster.expand_time_constraint_to_fill_granularity(
time_constraint=time_constraint,
granularity=metric_time_granularity,
)

def _parse_order_by_names(
self,
Expand Down
114 changes: 114 additions & 0 deletions metricflow-semantics/metricflow_semantics/time/dateutil_adjuster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

import datetime
from typing import Optional

import dateutil.relativedelta
from dateutil.relativedelta import relativedelta
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

from metricflow_semantics.filters.time_constraint import TimeRangeConstraint
from metricflow_semantics.time.time_period import TimePeriodAdjuster


class DateutilTimePeriodAdjuster(TimePeriodAdjuster):
"""Implementation of time period adjustments using `dateutil`.
* `relativedelta` will not change weekday if already at the given weekday, even with a Nth parameter.
* `relativedelta` will automatically handle day values that exceed the number of days in months with < 31 days.
"""

def _relative_delta_for_window(self, time_granularity: TimeGranularity, count: int) -> relativedelta:
"""Relative-delta to cover time windows specified at different grains."""
if time_granularity is TimeGranularity.DAY:
return relativedelta(days=count)
elif time_granularity is TimeGranularity.WEEK:
return relativedelta(weeks=count)
elif time_granularity is TimeGranularity.MONTH:
return relativedelta(months=count)
elif time_granularity is TimeGranularity.QUARTER:
return relativedelta(months=count * 3)
elif time_granularity is TimeGranularity.YEAR:
return relativedelta(years=count)
else:
assert_values_exhausted(time_granularity)

@override
def expand_time_constraint_to_fill_granularity(
self, time_constraint: TimeRangeConstraint, granularity: TimeGranularity
) -> TimeRangeConstraint:
adjusted_start = self.adjust_to_start_of_period(granularity, time_constraint.start_time)
adjusted_end = self.adjust_to_end_of_period(granularity, time_constraint.end_time)

if adjusted_start < TimeRangeConstraint.ALL_TIME_BEGIN():
adjusted_start = TimeRangeConstraint.ALL_TIME_BEGIN()
if adjusted_end > TimeRangeConstraint.ALL_TIME_END():
adjusted_end = TimeRangeConstraint.ALL_TIME_END()

return TimeRangeConstraint(start_time=adjusted_start, end_time=adjusted_end)

@override
def adjust_to_start_of_period(
self, time_granularity: TimeGranularity, date_to_adjust: datetime.datetime
) -> datetime.datetime:
if time_granularity is TimeGranularity.DAY:
return date_to_adjust
elif time_granularity is TimeGranularity.WEEK:
return date_to_adjust + relativedelta(weekday=dateutil.relativedelta.MO(-1))
elif time_granularity is TimeGranularity.MONTH:
return date_to_adjust + relativedelta(day=1)
elif time_granularity is TimeGranularity.QUARTER:
if date_to_adjust.month <= 3:
return date_to_adjust + relativedelta(month=1, day=1)
elif date_to_adjust.month <= 6:
return date_to_adjust + relativedelta(month=4, day=1)
elif date_to_adjust.month <= 9:
return date_to_adjust + relativedelta(month=7, day=1)
else:
return date_to_adjust + relativedelta(month=10, day=1)
elif time_granularity is TimeGranularity.YEAR:
return date_to_adjust + relativedelta(month=1, day=1)
else:
assert_values_exhausted(time_granularity)

@override
def adjust_to_end_of_period(
self, time_granularity: TimeGranularity, date_to_adjust: datetime.datetime
) -> datetime.datetime:
if time_granularity is TimeGranularity.DAY:
return date_to_adjust
elif time_granularity is TimeGranularity.WEEK:
return date_to_adjust + relativedelta(weekday=dateutil.relativedelta.SU(1))
elif time_granularity is TimeGranularity.MONTH:
return date_to_adjust + relativedelta(day=31)
elif time_granularity is TimeGranularity.QUARTER:
if date_to_adjust.month <= 3:
return date_to_adjust + relativedelta(month=3, day=31)
elif date_to_adjust.month <= 6:
return date_to_adjust + relativedelta(month=6, day=31)
elif date_to_adjust.month <= 9:
return date_to_adjust + relativedelta(month=9, day=31)
else:
return date_to_adjust + relativedelta(month=12, day=31)
elif time_granularity is TimeGranularity.YEAR:
return date_to_adjust + relativedelta(month=12, day=31)
else:
assert_values_exhausted(time_granularity)

@override
def expand_time_constraint_for_cumulative_metric(
self, time_constraint: TimeRangeConstraint, granularity: Optional[TimeGranularity], count: int
) -> TimeRangeConstraint:
if granularity is not None:
return TimeRangeConstraint(
start_time=time_constraint.start_time - self._relative_delta_for_window(granularity, count),
end_time=time_constraint.end_time,
)

# if no window is specified we want to accumulate from the beginning of time
return TimeRangeConstraint(
start_time=TimeRangeConstraint.ALL_TIME_BEGIN(),
end_time=time_constraint.end_time,
)
Loading

0 comments on commit 4bbccce

Please sign in to comment.