Skip to content

Commit

Permalink
Clean up dataflow to SQL for JoinToTimeSpineNode
Browse files Browse the repository at this point in the history
Should not actually change behavior since we don't pass custom grains into this node
  • Loading branch information
courtneyholcomb committed Oct 18, 2024
1 parent fe83362 commit 3b6fa94
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,15 @@ def _make_time_spine_data_set(
from_source=SqlTableNode.create(sql_table=time_spine_source.spine_table),
from_source_alias=time_spine_table_alias,
group_bys=select_columns if apply_group_by else (),
where=_make_time_range_comparison_expr(
table_alias=time_spine_table_alias,
column_alias=time_spine_source.base_column,
time_range_constraint=time_range_constraint,
)
if time_range_constraint
else None,
where=(
_make_time_range_comparison_expr(
table_alias=time_spine_table_alias,
column_alias=time_spine_source.base_column,
time_range_constraint=time_range_constraint,
)
if time_range_constraint
else None
),
)

# Where constraints must be applied in an outer query since they are using an alias (e.g., 'metric_time__day'),
Expand Down Expand Up @@ -380,7 +382,6 @@ def visit_source_node(self, node: ReadSqlSourceNode) -> SqlDataSet:
instance_set=node.data_set.instance_set,
)

# TODO: write tests for custom granularities that hit this node
def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDataSet:
"""Generate time range join SQL."""
table_alias_to_instance_set: OrderedDict[str, InstanceSet] = OrderedDict()
Expand Down Expand Up @@ -1307,7 +1308,6 @@ def visit_semi_additive_join_node(self, node: SemiAdditiveJoinNode) -> SqlDataSe
),
)

# TODO: write tests for custom granularities that hit this node
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet: # noqa: D102
parent_data_set = node.parent_node.accept(self)
parent_alias = self._next_unique_table_alias()
Expand All @@ -1330,16 +1330,14 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
):
agg_time_dimension_instances.append(instance)

# Choose the instance with the smallest standard granularity available.
assert all(
[not instance.spec.time_granularity.is_custom_granularity for instance in agg_time_dimension_instances]
), "Custom granularities are not yet supported for all queries."
# Choose the instance with the smallest base granularity available.
agg_time_dimension_instances.sort(key=lambda instance: instance.spec.time_granularity.base_granularity.to_int())
assert len(agg_time_dimension_instances) > 0, (
"Couldn't find requested agg_time_dimension in parent data set. The dataflow plan may have been "
"configured incorrectly."
)
agg_time_dimension_instance_for_join = agg_time_dimension_instances[0]
agg_time_dim_for_join_with_base_grain = agg_time_dimension_instance_for_join.spec.with_base_grain()

# Build time spine data set using the requested agg_time_dimension name.
time_spine_alias = self._next_unique_table_alias()
Expand All @@ -1354,7 +1352,7 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
node=node,
time_spine_alias=time_spine_alias,
agg_time_dimension_column_name=self.column_association_resolver.resolve_spec(
agg_time_dimension_instance_for_join.spec
agg_time_dim_for_join_with_base_grain
).column_name,
parent_sql_select_node=parent_data_set.checked_sql_select_node,
parent_alias=parent_alias,
Expand Down Expand Up @@ -1391,15 +1389,15 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet
self._column_association_resolver, OrderedDict({parent_alias: parent_instance_set})
)

# Select instance from time spine data set that matches the join instance in the parent dataset.
# Select matching instance from time spine data set (using base grain - custom grain will be joined in a later node).
original_time_spine_dim_instance: Optional[TimeDimensionInstance] = None
for time_dimension_instance in time_spine_dataset.instance_set.time_dimension_instances:
if time_dimension_instance.spec == agg_time_dimension_instance_for_join.spec:
if time_dimension_instance.spec == agg_time_dim_for_join_with_base_grain:
original_time_spine_dim_instance = time_dimension_instance
break
assert original_time_spine_dim_instance, (
"Couldn't find requested agg_time_dimension_instance_for_join in time spine data set, which "
f"indicates it may have been configured incorrectly. Expected: {agg_time_dimension_instance_for_join.spec};"
f"indicates it may have been configured incorrectly. Expected: {agg_time_dim_for_join_with_base_grain};"
f" Got: {[instance.spec for instance in time_spine_dataset.instance_set.time_dimension_instances]}"
)
time_spine_column_select_expr: Union[
Expand Down

0 comments on commit 3b6fa94

Please sign in to comment.