Skip to content

Commit

Permalink
Use ExpandedTimeGranularity in TimeDimensionSpec
Browse files Browse the repository at this point in the history
In order to support custom granularities we need a way to propagate
them through the dataflow plan building and plan conversion processes.
We typically use the TimeDimensionSpec classes for carrying that
information, since the specs indicate the grain of the specific
variation of the TimeDimension element we're operating on. We
decided to use an ExpandedTimeGranularity construct to encapsulate
the custom granularity metadata in a manner interchangeable with the
default granularity enumeration values, with the idea that these
would make it more obvious when and how to do comparisons and things.

This change updates the TimeDimensionSpec itself to use the new construct.
The changes here are meant to be as mechanical as possible, so there are
places where restructuring or follow-up is needed. It was not possible to
do purely mechanical changes in all cases.
  • Loading branch information
tlento committed Sep 3, 2024
1 parent e0f58ae commit 3d5bdbe
Show file tree
Hide file tree
Showing 256 changed files with 2,918 additions and 1,053 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def _path_key_to_spec(path_key: ElementPathKey) -> LinkableInstanceSpec:
return TimeDimensionSpec(
element_name=path_key.element_name,
entity_links=path_key.entity_links,
time_granularity=path_key.time_granularity.base_granularity,
time_granularity=path_key.time_granularity,
date_part=path_key.date_part,
)
elif path_key.element_type is LinkableElementType.ENTITY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def transform(self, spec_set: InstanceSpecSet) -> Sequence[str]:
if time_dimension_spec.date_part is not None:
items.append(DunderNamingScheme.date_part_suffix(date_part=time_dimension_spec.date_part))
else:
items.append(time_dimension_spec.time_granularity.value)
items.append(time_dimension_spec.time_granularity.name)
names_to_return.append(DUNDER.join(items))

for other_group_by_item_specs in (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
)
from dbt_semantic_interfaces.naming.keywords import DUNDER
from dbt_semantic_interfaces.references import EntityReference
from dbt_semantic_interfaces.type_enums import TimeGranularity
from dbt_semantic_interfaces.type_enums.date_part import DatePart
from typing_extensions import override

Expand All @@ -32,7 +31,7 @@ def input_str_from_entity_call_parameter_set(parameter_set: EntityCallParameterS
element_name=parameter_set.entity_reference.element_name,
entity_links=parameter_set.entity_path,
group_by=(),
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
return f"Entity({initializer_parameter_str})"
Expand All @@ -45,7 +44,7 @@ def input_str_from_metric_call_parameter_set(parameter_set: MetricCallParameterS
group_by=tuple(
EntityReference(element_name=group_by_ref.element_name) for group_by_ref in parameter_set.group_by
),
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
return f"Metric({initializer_parameter_str})"
Expand All @@ -55,7 +54,7 @@ def initializer_parameter_str(
element_name: str,
entity_links: Sequence[EntityReference],
group_by: Sequence[EntityReference],
time_granularity: Optional[TimeGranularity],
time_granularity_name: Optional[str],
date_part: Optional[DatePart],
) -> str:
"""Return the parameters that should go in the initializer.
Expand All @@ -68,9 +67,9 @@ def initializer_parameter_str(
initializer_parameters.append(repr(entity_link_names[-1] + DUNDER + element_name))
else:
initializer_parameters.append(repr(element_name))
if time_granularity is not None:
if time_granularity_name is not None:
initializer_parameters.append(
f"'{time_granularity.value}'",
f"'{time_granularity_name}'",
)
if date_part is not None:
initializer_parameters.append(f"date_part_name={repr(date_part.value)}")
Expand Down Expand Up @@ -101,7 +100,7 @@ def transform(self, spec_set: InstanceSpecSet) -> Sequence[str]:
element_name=entity_spec.element_name,
entity_links=entity_spec.entity_links,
group_by=(),
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
names_to_return.append(f"Entity({initializer_parameter_str})")
Expand All @@ -111,7 +110,7 @@ def transform(self, spec_set: InstanceSpecSet) -> Sequence[str]:
element_name=dimension_spec.element_name,
entity_links=dimension_spec.entity_links,
group_by=(),
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
names_to_return.append(f"Dimension({initializer_parameter_str})")
Expand All @@ -121,7 +120,7 @@ def transform(self, spec_set: InstanceSpecSet) -> Sequence[str]:
element_name=time_dimension_spec.element_name,
entity_links=time_dimension_spec.entity_links,
group_by=(),
time_granularity=time_dimension_spec.time_granularity,
time_granularity_name=time_dimension_spec.time_granularity.name,
date_part=time_dimension_spec.date_part,
)
names_to_return.append(f"TimeDimension({initializer_parameter_str})")
Expand All @@ -131,7 +130,7 @@ def transform(self, spec_set: InstanceSpecSet) -> Sequence[str]:
element_name=group_by_metric_spec.element_name,
entity_links=(),
group_by=group_by_metric_spec.entity_links,
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
names_to_return.append(f"Metric({initializer_parameter_str})")
Expand All @@ -153,7 +152,7 @@ def input_str_from_dimension_call_parameter_set(parameter_set: DimensionCallPara
element_name=parameter_set.dimension_reference.element_name,
entity_links=parameter_set.entity_path,
group_by=(),
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
return f"Dimension({initializer_parameter_str})"
Expand All @@ -166,7 +165,7 @@ def input_str_from_time_dimension_call_parameter_set( # noqa: D102
element_name=parameter_set.time_dimension_reference.element_name,
entity_links=parameter_set.entity_path,
group_by=(),
time_granularity=None,
time_granularity_name=None,
date_part=None,
)
return f"TimeDimension({initializer_parameter_str})"
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class PushDownResult:
# The issues seen so far while pushing down the result / resolving the ambiguity.
issue_set: MetricFlowQueryResolutionIssueSet
# The largest default time granularity of the metrics seen in the DAG so far. Used to resolve metric_time.
# TODO: [custom granularity] decide whether or not to support custom granularities as metric_time defaults and
# update accordingly
max_metric_default_time_granularity: Optional[TimeGranularity] = None

def __post_init__(self) -> None: # noqa: D105
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def resolve_available_items(
)

def resolve_min_metric_time_grain(self) -> TimeGranularity:
"""Returns the finest time grain of metric_time for querying."""
"""Returns the finest base time grain of metric_time for querying."""
metric_time_grain_resolution = self.resolve_matching_item_for_querying(
spec_pattern=TimeDimensionPattern.from_call_parameter_set(
TimeDimensionCallParameterSet(
Expand All @@ -247,4 +247,4 @@ def resolve_min_metric_time_grain(self) -> TimeGranularity:
f"{metric_time_grain_resolution.spec} and issues:\n\n"
f"{indent(mf_pformat(metric_time_grain_resolution.issue_set))}"
)
return metric_time_spec_set.time_dimension_specs[0].time_granularity
return metric_time_spec_set.time_dimension_specs[0].time_granularity.base_granularity
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _get_smallest_requested_metric_time_granularity(
len(time_dimension_specs) == 1
), f"Bug with MinimumTimeGrainPattern - should have returned exactly 1 spec but got {time_dimension_specs}"

return time_dimension_specs[0].time_granularity
return time_dimension_specs[0].time_granularity.base_granularity

def _adjust_time_constraint(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.sql.sql_column_type import SqlColumnType
from metricflow_semantics.time.granularity import ExpandedTimeGranularity


def hash_items(items: Sequence[SqlColumnType]) -> str:
Expand Down Expand Up @@ -48,11 +49,19 @@ def bucket_hash(self) -> str:
values.extend(sorted(self.window_groupings))
return hash_items(values)

def linkable_specs( # noqa: D102
self, non_additive_dimension_grain: TimeGranularity
) -> Sequence[LinkableInstanceSpec]:
def linkable_specs(self, non_additive_dimension_grain: TimeGranularity) -> Sequence[LinkableInstanceSpec]:
"""Return the set of linkable specs referenced by the NonAdditiveDimensionSpec.
In practice, the name will always point to a time dimension. This method requires the time granularity
provided in the model Dimension definition, which is why the input is typed as an enum value rather than
an expanded granularity object - custom granularities are not eligible for consideration here.
"""
return (
TimeDimensionSpec(element_name=self.name, entity_links=(), time_granularity=non_additive_dimension_grain),
TimeDimensionSpec(
element_name=self.name,
entity_links=(),
time_granularity=ExpandedTimeGranularity.from_time_granularity(non_additive_dimension_grain),
),
) + tuple(LinklessEntitySpec.from_element_name(entity_name) for entity_name in self.window_groupings)

def __eq__(self, other: Any) -> bool: # type: ignore[misc] # noqa: D105
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,22 @@ def _match_entity_links(self, candidate_specs: Sequence[LinkableInstanceSpec]) -
shortest_entity_link_length = min(len(matching_spec.entity_links) for matching_spec in matching_specs)
return tuple(spec for spec in matching_specs if len(spec.entity_links) == shortest_entity_link_length)

def _match_time_granularities(
self, candidate_specs: Sequence[LinkableInstanceSpec]
) -> Sequence[LinkableInstanceSpec]:
"""Do a partial match on time granularities.
TODO: [custom granularity] Support custom granularities properly. This requires us to allow these pattern classes
to take in ExpandedTimeGranularity types, which should be viable. Once that is done, this method can be removed.
"""
matching_specs: Sequence[LinkableInstanceSpec] = tuple(
candidate_spec
for candidate_spec in group_specs_by_type(candidate_specs).time_dimension_specs
if candidate_spec.time_granularity.base_granularity == self.parameter_set.time_granularity
)

return matching_specs

@override
def match(self, candidate_specs: Sequence[InstanceSpec]) -> Sequence[LinkableInstanceSpec]:
filtered_candidate_specs = group_specs_by_type(candidate_specs).linkable_specs
Expand All @@ -120,10 +136,13 @@ def match(self, candidate_specs: Sequence[InstanceSpec]) -> Sequence[LinkableIns
# Entity links could be a partial match, so it's handled separately.
if ParameterSetField.ENTITY_LINKS in self.parameter_set.fields_to_compare:
filtered_candidate_specs = self._match_entity_links(filtered_candidate_specs)
# Time granularities are special, so they are also handled separately.
if ParameterSetField.TIME_GRANULARITY in self.parameter_set.fields_to_compare:
filtered_candidate_specs = self._match_time_granularities(filtered_candidate_specs)

other_keys_to_check = set(
field_to_compare.value for field_to_compare in self.parameter_set.fields_to_compare
).difference({ParameterSetField.ENTITY_LINKS.value})
).difference({ParameterSetField.ENTITY_LINKS.value, ParameterSetField.TIME_GRANULARITY.value})

matching_specs: List[LinkableInstanceSpec] = []
parameter_set_values = tuple(getattr(self.parameter_set, key_to_check) for key_to_check in other_keys_to_check)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TimeDimensionSpecComparisonKey,
TimeDimensionSpecField,
)
from metricflow_semantics.time.granularity import ExpandedTimeGranularity


class MetricTimeDefaultGranularityPattern(SpecPattern):
Expand Down Expand Up @@ -53,9 +54,12 @@ def match(self, candidate_specs: Sequence[InstanceSpec]) -> Sequence[InstanceSpe
return candidate_specs

# If there are metrics in the query, use max metric default. For no-metric queries, use standard default.
default_granularity = self._max_metric_default_time_granularity or DEFAULT_TIME_GRANULARITY
# TODO: [custom granularity] allow custom granularities to be used as defaults if appropriate
default_granularity = ExpandedTimeGranularity.from_time_granularity(
self._max_metric_default_time_granularity or DEFAULT_TIME_GRANULARITY
)

spec_key_to_grains: Dict[TimeDimensionSpecComparisonKey, Set[TimeGranularity]] = defaultdict(set)
spec_key_to_grains: Dict[TimeDimensionSpecComparisonKey, Set[ExpandedTimeGranularity]] = defaultdict(set)
spec_key_to_specs: Dict[TimeDimensionSpecComparisonKey, Tuple[TimeDimensionSpec, ...]] = defaultdict(tuple)
for metric_time_spec in spec_set.metric_time_specs:
spec_key = metric_time_spec.comparison_key(exclude_fields=(TimeDimensionSpecField.TIME_GRANULARITY,))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
TimeDimensionSpecComparisonKey,
TimeDimensionSpecField,
)
from metricflow_semantics.time.granularity import ExpandedTimeGranularity


class MinimumTimeGrainPattern(SpecPattern):
"""A pattern that matches linkable specs, but for time dimension specs, only the one with the finest grain.
"""A pattern that matches linkable specs, but for time dimension specs, only the one with the finest base grain.
e.g.
Expand All @@ -35,7 +36,7 @@ class MinimumTimeGrainPattern(SpecPattern):
]
The finest grain represents the defined grain of the time dimension in the semantic model when evaluating specs
of the source.
of the source. For custom granularities, this means the base grain associated with the time dimension spec.
This pattern helps to implement matching of group-by-items for where filters - in those cases, an ambiguously
specified group-by-item can only match to time dimension spec with the base grain.
Expand All @@ -49,12 +50,16 @@ def match(self, candidate_specs: Sequence[InstanceSpec]) -> Sequence[InstanceSpe
spec_key_to_specs: Dict[TimeDimensionSpecComparisonKey, List[TimeDimensionSpec]] = defaultdict(list)
for time_dimension_spec in spec_set.time_dimension_specs:
spec_key = time_dimension_spec.comparison_key(exclude_fields=(TimeDimensionSpecField.TIME_GRANULARITY,))
spec_key_to_grains[spec_key].add(time_dimension_spec.time_granularity)
spec_key_to_grains[spec_key].add(time_dimension_spec.time_granularity.base_granularity)
spec_key_to_specs[spec_key].append(time_dimension_spec)

matched_time_dimension_specs: List[TimeDimensionSpec] = []
for spec_key, time_grains in spec_key_to_grains.items():
matched_time_dimension_specs.append(spec_key_to_specs[spec_key][0].with_grain(min(time_grains)))
matched_time_dimension_specs.append(
spec_key_to_specs[spec_key][0].with_grain(
ExpandedTimeGranularity.from_time_granularity(min(time_grains))
)
)

matching_specs: Sequence[LinkableInstanceSpec] = (
spec_set.dimension_specs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, source_spec: TimeDimensionSpec, exclude_fields: Sequence[Time

# This is a list of field values of TimeDimensionSpec that we should use for comparison.
spec_field_values_for_comparison: List[
Union[str, Tuple[EntityReference, ...], TimeGranularity, Optional[DatePart]]
Union[str, Tuple[EntityReference, ...], ExpandedTimeGranularity, Optional[DatePart]]
] = [self._source_spec.element_name, self._source_spec.entity_links]

if TimeDimensionSpecField.TIME_GRANULARITY not in self._excluded_fields:
Expand Down Expand Up @@ -84,7 +84,7 @@ def __hash__(self) -> int:

@dataclass(frozen=True)
class TimeDimensionSpec(DimensionSpec): # noqa: D101
time_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY
time_granularity: ExpandedTimeGranularity = ExpandedTimeGranularity.from_time_granularity(DEFAULT_TIME_GRANULARITY)
date_part: Optional[DatePart] = None

# Used for semi-additive joins. Some more thought is needed, but this may be useful in InstanceSpec.
Expand Down Expand Up @@ -122,7 +122,7 @@ def qualified_name(self) -> str: # noqa: D102
return StructuredLinkableSpecName(
entity_link_names=tuple(x.element_name for x in self.entity_links),
element_name=self.element_name,
time_granularity_name=self.time_granularity.value,
time_granularity_name=self.time_granularity.name,
date_part=self.date_part,
).qualified_name

Expand All @@ -133,7 +133,7 @@ def element_path_key(self) -> ElementPathKey:
element_name=self.element_name,
element_type=LinkableElementType.TIME_DIMENSION,
entity_links=self.entity_links,
time_granularity=ExpandedTimeGranularity.from_time_granularity(self.time_granularity),
time_granularity=self.time_granularity,
date_part=self.date_part,
)

Expand All @@ -145,7 +145,7 @@ def from_reference(reference: TimeDimensionReference) -> TimeDimensionSpec:
def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_time_dimension_spec(self)

def with_grain(self, time_granularity: TimeGranularity) -> TimeDimensionSpec: # noqa: D102
def with_grain(self, time_granularity: ExpandedTimeGranularity) -> TimeDimensionSpec: # noqa: D102
return TimeDimensionSpec(
element_name=self.element_name,
entity_links=self.entity_links,
Expand Down Expand Up @@ -174,14 +174,18 @@ def comparison_key(self, exclude_fields: Sequence[TimeDimensionSpecField] = ())
def generate_possible_specs_for_time_dimension(
time_dimension_reference: TimeDimensionReference, entity_links: Tuple[EntityReference, ...]
) -> List[TimeDimensionSpec]:
"""Generate a list of time dimension specs with all combinations of granularity & date part."""
"""Generate a list of time dimension specs with all combinations of granularity & date part.
TODO: [custom calendar] decide whether to add support for custom granularities or rename this to indicate that
it only includes standard granularities.
"""
time_dimension_specs: List[TimeDimensionSpec] = []
for time_granularity in TimeGranularity:
time_dimension_specs.append(
TimeDimensionSpec(
element_name=time_dimension_reference.element_name,
entity_links=entity_links,
time_granularity=time_granularity,
time_granularity=ExpandedTimeGranularity.from_time_granularity(time_granularity),
date_part=None,
)
)
Expand All @@ -191,7 +195,7 @@ def generate_possible_specs_for_time_dimension(
TimeDimensionSpec(
element_name=time_dimension_reference.element_name,
entity_links=entity_links,
time_granularity=time_granularity,
time_granularity=ExpandedTimeGranularity.from_time_granularity(time_granularity),
date_part=date_part,
)
)
Expand Down
Loading

0 comments on commit 3d5bdbe

Please sign in to comment.