-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Build DataflowPlan for custom offset window #1584
Open
courtneyholcomb
wants to merge
4
commits into
court/custom-offset5
Choose a base branch
from
court/custom-offset6
base: court/custom-offset5
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
1516096
Build DataflowPlan for custom offset window
courtneyholcomb 02e9775
Update JoinToTimeSpineNode to handle custom offset windows
courtneyholcomb bdafba5
Update snapshots for JoinToTimeSpineNodeChanges
courtneyholcomb ef1dbdc
Changelog
courtneyholcomb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
from metricflow_semantics.specs.where_filter.where_filter_spec import WhereFilterSpec | ||
from metricflow_semantics.specs.where_filter.where_filter_spec_set import WhereFilterSpecSet | ||
from metricflow_semantics.specs.where_filter.where_filter_transform import WhereSpecFactory | ||
from metricflow_semantics.sql.sql_exprs import SqlWindowFunction | ||
from metricflow_semantics.sql.sql_join_type import SqlJoinType | ||
from metricflow_semantics.sql.sql_table import SqlTable | ||
from metricflow_semantics.time.dateutil_adjuster import DateutilTimePeriodAdjuster | ||
|
@@ -84,6 +85,7 @@ | |
from metricflow.dataflow.nodes.combine_aggregated_outputs import CombineAggregatedOutputsNode | ||
from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode | ||
from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode | ||
from metricflow.dataflow.nodes.custom_granularity_bounds import CustomGranularityBoundsNode | ||
from metricflow.dataflow.nodes.filter_elements import FilterElementsNode | ||
from metricflow.dataflow.nodes.join_conversion_events import JoinConversionEventsNode | ||
from metricflow.dataflow.nodes.join_over_time import JoinOverTimeRangeNode | ||
|
@@ -92,6 +94,7 @@ | |
from metricflow.dataflow.nodes.join_to_time_spine import JoinToTimeSpineNode | ||
from metricflow.dataflow.nodes.metric_time_transform import MetricTimeDimensionTransformNode | ||
from metricflow.dataflow.nodes.min_max import MinMaxNode | ||
from metricflow.dataflow.nodes.offset_by_custom_granularity import OffsetByCustomGranularityNode | ||
from metricflow.dataflow.nodes.order_by_limit import OrderByLimitNode | ||
from metricflow.dataflow.nodes.read_sql_source import ReadSqlSourceNode | ||
from metricflow.dataflow.nodes.semi_additive_join import SemiAdditiveJoinNode | ||
|
@@ -658,13 +661,18 @@ def _build_derived_metric_output_node( | |
) | ||
if metric_spec.has_time_offset and queried_agg_time_dimension_specs: | ||
# TODO: move this to a helper method | ||
time_spine_node = self._build_time_spine_node(queried_agg_time_dimension_specs) | ||
time_spine_node = self._build_time_spine_node( | ||
queried_time_spine_specs=queried_agg_time_dimension_specs, | ||
offset_window=metric_spec.offset_window, | ||
) | ||
output_node = JoinToTimeSpineNode.create( | ||
metric_source_node=output_node, | ||
time_spine_node=time_spine_node, | ||
requested_agg_time_dimension_specs=queried_agg_time_dimension_specs, | ||
join_on_time_dimension_spec=self._sort_by_base_granularity(queried_agg_time_dimension_specs)[0], | ||
offset_window=metric_spec.offset_window, | ||
offset_window=( | ||
metric_spec.offset_window if not self._offset_window_is_custom(metric_spec.offset_window) else None | ||
), | ||
offset_to_grain=metric_spec.offset_to_grain, | ||
join_type=SqlJoinType.INNER, | ||
) | ||
|
@@ -1651,13 +1659,20 @@ def _build_aggregated_measure_from_measure_source_node( | |
required_time_spine_specs = base_queried_agg_time_dimension_specs | ||
if join_on_time_dimension_spec not in required_time_spine_specs: | ||
required_time_spine_specs = (join_on_time_dimension_spec,) + required_time_spine_specs | ||
time_spine_node = self._build_time_spine_node(required_time_spine_specs) | ||
time_spine_node = self._build_time_spine_node( | ||
queried_time_spine_specs=required_time_spine_specs, | ||
offset_window=before_aggregation_time_spine_join_description.offset_window, | ||
) | ||
unaggregated_measure_node = JoinToTimeSpineNode.create( | ||
metric_source_node=unaggregated_measure_node, | ||
time_spine_node=time_spine_node, | ||
requested_agg_time_dimension_specs=base_queried_agg_time_dimension_specs, | ||
join_on_time_dimension_spec=join_on_time_dimension_spec, | ||
offset_window=before_aggregation_time_spine_join_description.offset_window, | ||
offset_window=( | ||
before_aggregation_time_spine_join_description.offset_window | ||
if not self._offset_window_is_custom(before_aggregation_time_spine_join_description.offset_window) | ||
else None | ||
), | ||
offset_to_grain=before_aggregation_time_spine_join_description.offset_to_grain, | ||
join_type=before_aggregation_time_spine_join_description.join_type, | ||
) | ||
|
@@ -1864,6 +1879,7 @@ def _build_time_spine_node( | |
queried_time_spine_specs: Sequence[TimeDimensionSpec], | ||
where_filter_specs: Sequence[WhereFilterSpec] = (), | ||
time_range_constraint: Optional[TimeRangeConstraint] = None, | ||
offset_window: Optional[MetricTimeWindow] = None, | ||
) -> DataflowPlanNode: | ||
"""Return the time spine node needed to satisfy the specs.""" | ||
required_time_spine_spec_set = self.__get_required_linkable_specs( | ||
|
@@ -1872,39 +1888,85 @@ def _build_time_spine_node( | |
) | ||
required_time_spine_specs = required_time_spine_spec_set.time_dimension_specs | ||
|
||
# TODO: support multiple time spines here. Build node on the one with the smallest base grain. | ||
# Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine. | ||
time_spine_source = self._choose_time_spine_source(required_time_spine_specs) | ||
read_node = self._choose_time_spine_read_node(time_spine_source) | ||
time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node) | ||
|
||
# Change the column aliases to match the specs that were requested in the query. | ||
time_spine_node = AliasSpecsNode.create( | ||
parent_node=read_node, | ||
change_specs=tuple( | ||
SpecToAlias( | ||
input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part( | ||
time_granularity_name=required_spec.time_granularity.name, date_part=required_spec.date_part | ||
).spec, | ||
output_spec=required_spec, | ||
) | ||
for required_spec in required_time_spine_specs | ||
), | ||
) | ||
|
||
# If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping. | ||
should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in { | ||
spec.time_granularity for spec in queried_time_spine_specs | ||
} | ||
should_dedupe = False | ||
filter_to_specs = tuple(queried_time_spine_specs) | ||
if offset_window and self._offset_window_is_custom(offset_window): | ||
time_spine_node = self._build_custom_offset_time_spine_node( | ||
offset_window=offset_window, required_time_spine_specs=required_time_spine_specs | ||
) | ||
filter_to_specs = self._node_data_set_resolver.get_output_data_set( | ||
time_spine_node | ||
).instance_set.spec_set.time_dimension_specs | ||
else: | ||
# For simpler time spine queries, choose the appropriate time spine node and apply requested aliases. | ||
time_spine_source = self._choose_time_spine_source(required_time_spine_specs) | ||
# TODO: support multiple time spines here. Build node on the one with the smallest base grain. | ||
# Then, pass custom_granularity_specs into _build_pre_aggregation_plan if they aren't satisfied by smallest time spine. | ||
read_node = self._choose_time_spine_read_node(time_spine_source) | ||
time_spine_data_set = self._node_data_set_resolver.get_output_data_set(read_node) | ||
# Change the column aliases to match the specs that were requested in the query. | ||
time_spine_node = AliasSpecsNode.create( | ||
parent_node=read_node, | ||
change_specs=tuple( | ||
SpecToAlias( | ||
input_spec=time_spine_data_set.instance_from_time_dimension_grain_and_date_part( | ||
time_granularity_name=required_spec.time_granularity.name, date_part=required_spec.date_part | ||
).spec, | ||
output_spec=required_spec, | ||
) | ||
for required_spec in required_time_spine_specs | ||
), | ||
) | ||
# If the base grain of the time spine isn't selected, it will have duplicate rows that need deduping. | ||
should_dedupe = ExpandedTimeGranularity.from_time_granularity(time_spine_source.base_granularity) not in { | ||
spec.time_granularity for spec in queried_time_spine_specs | ||
} | ||
|
||
return self._build_pre_aggregation_plan( | ||
source_node=time_spine_node, | ||
filter_to_specs=InstanceSpecSet(time_dimension_specs=tuple(queried_time_spine_specs)), | ||
filter_to_specs=InstanceSpecSet(time_dimension_specs=filter_to_specs), | ||
time_range_constraint=time_range_constraint, | ||
where_filter_specs=where_filter_specs, | ||
distinct=should_dedupe, | ||
) | ||
|
||
def _build_custom_offset_time_spine_node( | ||
self, offset_window: MetricTimeWindow, required_time_spine_specs: Tuple[TimeDimensionSpec, ...] | ||
) -> DataflowPlanNode: | ||
# Build time spine node that offsets agg time dimensions by a custom grain. | ||
custom_grain = self._semantic_model_lookup._custom_granularities[offset_window.granularity] | ||
time_spine_source = self._choose_time_spine_source((DataSet.metric_time_dimension_spec(custom_grain),)) | ||
time_spine_read_node = self._choose_time_spine_read_node(time_spine_source) | ||
if {spec.time_granularity for spec in required_time_spine_specs} == {custom_grain}: | ||
# TODO: If querying with only the same grain as is used in the offset_window, can use a simpler plan. | ||
pass | ||
# For custom offset windows queried with other granularities, first, build CustomGranularityBoundsNode. | ||
# This will be used twice in the output node, and ideally will be turned into a CTE. | ||
bounds_node = CustomGranularityBoundsNode.create( | ||
parent_node=time_spine_read_node, custom_granularity_name=custom_grain.name | ||
) | ||
# Build a FilterElementsNode from bounds node to get required unique rows. | ||
bounds_data_set = self._node_data_set_resolver.get_output_data_set(bounds_node) | ||
bounds_specs = tuple( | ||
bounds_data_set.instance_from_window_function(window_func).spec | ||
for window_func in (SqlWindowFunction.FIRST_VALUE, SqlWindowFunction.LAST_VALUE) | ||
) | ||
custom_grain_spec = bounds_data_set.instance_from_time_dimension_grain_and_date_part( | ||
time_granularity_name=custom_grain.name, date_part=None | ||
).spec | ||
filter_elements_node = FilterElementsNode.create( | ||
parent_node=bounds_node, | ||
include_specs=InstanceSpecSet(time_dimension_specs=(custom_grain_spec,) + bounds_specs), | ||
distinct=True, | ||
) | ||
# Pass both the CustomGranularityBoundsNode and the FilterElementsNode into the OffsetByCustomGranularityNode. | ||
return OffsetByCustomGranularityNode.create( | ||
custom_granularity_bounds_node=bounds_node, | ||
filter_elements_node=filter_elements_node, | ||
offset_window=offset_window, | ||
required_time_spine_specs=required_time_spine_specs, | ||
) | ||
|
||
def _sort_by_base_granularity(self, time_dimension_specs: Sequence[TimeDimensionSpec]) -> List[TimeDimensionSpec]: | ||
"""Sort the time dimensions by their base granularity. | ||
|
||
|
@@ -1935,3 +1997,9 @@ def _determine_time_spine_join_spec( | |
time_granularity=join_spec_grain, date_part=None | ||
) | ||
return join_on_time_dimension_spec | ||
|
||
def _offset_window_is_custom(self, offset_window: Optional[MetricTimeWindow]) -> bool: | ||
return ( | ||
offset_window is not None | ||
and offset_window.granularity in self._semantic_model_lookup.custom_granularity_names | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens when the granularity is not in |
||
) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there cases where you can use a
JoinToTimeSpineNode
with a custom offset window? If not, an assertion in the class would help.