Skip to content

Commit

Permalink
WIP - incorporate JoinToCustomGranularityNode in to DataflowPlanBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Aug 22, 2024
1 parent fc7b3be commit b055757
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode
from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode
from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinOnEntitiesNode
from metricflow.dataflow.nodes.join_to_custom_granularity import JoinToCustomGranularityNode
from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode
from metricflow.dataflow.nodes.min_max import MinMaxNode
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode
Expand Down Expand Up @@ -1517,6 +1518,14 @@ def _build_aggregated_measure_from_measure_source_node(
else:
unaggregated_measure_node = filtered_measure_source_node

# TODO: duplicate this logic for no-metric queries
for time_dimension_spec in queried_linkable_specs.time_dimension_specs:
# TODO: Add conditional when avail - if the time dimension spec has a custom granularity
unaggregated_measure_node = JoinToCustomGranularityNode.create(
parent_node=unaggregated_measure_node,
time_dimension_spec=time_dimension_spec,
)

# If time constraint was previously adjusted for cumulative window or grain, apply original time constraint
# here. Can skip if metric is being aggregated over all time.
cumulative_metric_constrained_node: Optional[ConstrainTimeRangeNode] = None
Expand All @@ -1543,12 +1552,12 @@ def _build_aggregated_measure_from_measure_source_node(
if non_additive_dimension_spec is not None:
# Apply semi additive join on the node
agg_time_dimension = measure_properties.agg_time_dimension
queried_time_dimension_spec: Optional[
TimeDimensionSpec
] = self._find_non_additive_dimension_in_linkable_specs(
agg_time_dimension=agg_time_dimension,
linkable_specs=queried_linkable_specs.as_tuple,
non_additive_dimension_spec=non_additive_dimension_spec,
queried_time_dimension_spec: Optional[TimeDimensionSpec] = (
self._find_non_additive_dimension_in_linkable_specs(
agg_time_dimension=agg_time_dimension,
linkable_specs=queried_linkable_specs.as_tuple,
non_additive_dimension_spec=non_additive_dimension_spec,
)
)
time_dimension_spec = TimeDimensionSpec.from_name(non_additive_dimension_spec.name)
window_groupings = tuple(
Expand Down

0 comments on commit b055757

Please sign in to comment.