Skip to content

Commit

Permalink
Refactor JoinOverTimeRangeNode SQL rendering for readability & simpli…
Browse files Browse the repository at this point in the history
…city

Also supports an upcoming change to allow multiple agg_time_dimensions in this node.
  • Loading branch information
courtneyholcomb committed Jun 12, 2024
1 parent 7fc74c0 commit ad7c2bc
Showing 1 changed file with 19 additions and 39 deletions.
58 changes: 19 additions & 39 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ def visit_source_node(self, node: ReadSqlSourceNode) -> SqlDataSet:
def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDataSet:
"""Generate time range join SQL."""
table_alias_to_instance_set: OrderedDict[str, InstanceSet] = OrderedDict()

input_data_set = node.parent_node.accept(self)
input_data_set_alias = self._next_unique_table_alias()

Expand All @@ -302,61 +301,42 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat

time_spine_data_set_alias = self._next_unique_table_alias()

# Assemble time_spine dataset with metric_time_dimension to join.
# Granularity of time_spine column should match granularity of metric_time column from parent dataset.
# Assemble time_spine dataset with agg_time_dimension_instance selected.
time_spine_data_set = self._make_time_spine_data_set(
agg_time_dimension_instance=agg_time_dimension_instance,
time_spine_source=self._time_spine_source,
time_range_constraint=node.time_range_constraint,
)
table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set

# Figure out which columns correspond to the time dimension that we want to join on.
input_data_set_metric_time_column_association = input_data_set.column_association_for_time_dimension(
agg_time_dimension_instance.spec
)
input_data_set_metric_time_col = input_data_set_metric_time_column_association.column_name

time_spine_data_set_column_associations = time_spine_data_set.column_association_for_time_dimension(
agg_time_dimension_instance.spec
)
time_spine_data_set_time_dimension_col = time_spine_data_set_column_associations.column_name

annotated_input_data_set = AnnotatedSqlDataSet(
data_set=input_data_set, alias=input_data_set_alias, _metric_time_column_name=input_data_set_metric_time_col
)
annotated_time_spine_data_set = AnnotatedSqlDataSet(
data_set=time_spine_data_set,
alias=time_spine_data_set_alias,
_metric_time_column_name=time_spine_data_set_time_dimension_col,
)

join_desc = SqlQueryPlanJoinBuilder.make_cumulative_metric_time_range_join_description(
node=node, metric_data_set=annotated_input_data_set, time_spine_data_set=annotated_time_spine_data_set
node=node,
metric_data_set=AnnotatedSqlDataSet(
data_set=input_data_set,
alias=input_data_set_alias,
_metric_time_column_name=input_data_set.column_association_for_time_dimension(
agg_time_dimension_instance.spec
).column_name,
),
time_spine_data_set=AnnotatedSqlDataSet(
data_set=time_spine_data_set,
alias=time_spine_data_set_alias,
_metric_time_column_name=time_spine_data_set.column_association_for_time_dimension(
agg_time_dimension_instance.spec
).column_name,
),
)

modified_input_instance_set = InstanceSet(
measure_instances=input_data_set.instance_set.measure_instances,
dimension_instances=input_data_set.instance_set.dimension_instances,
entity_instances=input_data_set.instance_set.entity_instances,
metric_instances=input_data_set.instance_set.metric_instances,
# we omit the metric time dimension from the right side of the self-join because we need to use
# the metric time dimension from the right side
time_dimension_instances=tuple(
[
time_dimension_instance
for time_dimension_instance in input_data_set.instance_set.time_dimension_instances
if time_dimension_instance != agg_time_dimension_instance
]
),
# Remove agg_time_dimension from input data set. It will be replaced with the time spine instance.
modified_input_instance_set = input_data_set.instance_set.transform(
FilterElements(exclude_specs=InstanceSpecSet(time_dimension_specs=(agg_time_dimension_instance.spec,)))
)
table_alias_to_instance_set[input_data_set_alias] = modified_input_instance_set

# The output instances are the same as the input instances.
output_instance_set = ChangeAssociatedColumns(self._column_association_resolver).transform(
input_data_set.instance_set
)

return SqlDataSet(
instance_set=output_instance_set,
sql_select_node=SqlSelectStatementNode(
Expand Down

0 comments on commit ad7c2bc

Please sign in to comment.