-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify dataflow to SQL logic for JoinOverTimeRangeNode
#1540
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -468,70 +468,41 @@ def visit_source_node(self, node: ReadSqlSourceNode) -> SqlDataSet: | |
def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDataSet: | ||
"""Generate time range join SQL.""" | ||
table_alias_to_instance_set: OrderedDict[str, InstanceSet] = OrderedDict() | ||
input_data_set = node.parent_node.accept(self) | ||
input_data_set_alias = self._next_unique_table_alias() | ||
parent_data_set = node.parent_node.accept(self) | ||
parent_data_set_alias = self._next_unique_table_alias() | ||
|
||
# Find requested agg_time_dimensions in parent instance set. | ||
# Will use instance with the smallest base granularity in time spine join. | ||
agg_time_dimension_instance_for_join: Optional[TimeDimensionInstance] = None | ||
requested_agg_time_dimension_instances: Tuple[TimeDimensionInstance, ...] = () | ||
for instance in input_data_set.instance_set.time_dimension_instances: | ||
if instance.spec in node.queried_agg_time_dimension_specs: | ||
requested_agg_time_dimension_instances += (instance,) | ||
if not agg_time_dimension_instance_for_join or ( | ||
instance.spec.time_granularity.base_granularity.to_int() | ||
< agg_time_dimension_instance_for_join.spec.time_granularity.base_granularity.to_int() | ||
): | ||
agg_time_dimension_instance_for_join = instance | ||
assert ( | ||
agg_time_dimension_instance_for_join | ||
), "Specified metric time spec not found in parent data set. This should have been caught by validations." | ||
# For the purposes of this node, use base grains. Custom grains will be joined later in the dataflow plan. | ||
agg_time_dimension_specs = tuple({spec.with_base_grain() for spec in node.queried_agg_time_dimension_specs}) | ||
|
||
# Assemble time_spine dataset with a column for each agg_time_dimension requested. | ||
agg_time_dimension_instances = parent_data_set.instances_for_time_dimensions(agg_time_dimension_specs) | ||
time_spine_data_set_alias = self._next_unique_table_alias() | ||
|
||
# Assemble time_spine dataset with requested agg time dimension instances selected. | ||
time_spine_data_set = self._make_time_spine_data_set( | ||
agg_time_dimension_instances=requested_agg_time_dimension_instances, | ||
time_range_constraint=node.time_range_constraint, | ||
agg_time_dimension_instances=agg_time_dimension_instances, time_range_constraint=node.time_range_constraint | ||
) | ||
table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set | ||
|
||
# Build the join description. | ||
join_spec = self._choose_instance_for_time_spine_join(agg_time_dimension_instances).spec | ||
annotated_parent = parent_data_set.annotate(alias=parent_data_set_alias, metric_time_spec=join_spec) | ||
annotated_time_spine = time_spine_data_set.annotate(alias=time_spine_data_set_alias, metric_time_spec=join_spec) | ||
join_desc = SqlQueryPlanJoinBuilder.make_cumulative_metric_time_range_join_description( | ||
node=node, | ||
metric_data_set=AnnotatedSqlDataSet( | ||
data_set=input_data_set, | ||
alias=input_data_set_alias, | ||
_metric_time_column_name=input_data_set.column_association_for_time_dimension( | ||
agg_time_dimension_instance_for_join.spec | ||
).column_name, | ||
), | ||
time_spine_data_set=AnnotatedSqlDataSet( | ||
data_set=time_spine_data_set, | ||
alias=time_spine_data_set_alias, | ||
_metric_time_column_name=time_spine_data_set.column_association_for_time_dimension( | ||
agg_time_dimension_instance_for_join.spec | ||
).column_name, | ||
), | ||
node=node, metric_data_set=annotated_parent, time_spine_data_set=annotated_time_spine | ||
) | ||
|
||
# Remove instances of agg_time_dimension from input data set. They'll be replaced with time spine instances. | ||
agg_time_dimension_specs = tuple(dim.spec for dim in requested_agg_time_dimension_instances) | ||
modified_input_instance_set = input_data_set.instance_set.transform( | ||
# Build select columns, replacing agg_time_dimensions from the parent node with columns from the time spine. | ||
table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set | ||
table_alias_to_instance_set[parent_data_set_alias] = parent_data_set.instance_set.transform( | ||
FilterElements(exclude_specs=InstanceSpecSet(time_dimension_specs=agg_time_dimension_specs)) | ||
) | ||
table_alias_to_instance_set[input_data_set_alias] = modified_input_instance_set | ||
|
||
# The output instances are the same as the input instances. | ||
output_instance_set = ChangeAssociatedColumns(self._column_association_resolver).transform( | ||
input_data_set.instance_set | ||
select_columns = create_simple_select_columns_for_instance_sets( | ||
column_resolver=self._column_association_resolver, table_alias_to_instance_set=table_alias_to_instance_set | ||
) | ||
|
||
return SqlDataSet( | ||
instance_set=output_instance_set, | ||
instance_set=parent_data_set.instance_set, # The output instances are the same as the input instances. | ||
sql_select_node=SqlSelectStatementNode.create( | ||
description=node.description, | ||
select_columns=create_simple_select_columns_for_instance_sets( | ||
self._column_association_resolver, table_alias_to_instance_set | ||
), | ||
select_columns=select_columns, | ||
from_source=time_spine_data_set.checked_sql_select_node, | ||
from_source_alias=time_spine_data_set_alias, | ||
join_descs=(join_desc,), | ||
|
@@ -1392,6 +1363,21 @@ def visit_semi_additive_join_node(self, node: SemiAdditiveJoinNode) -> SqlDataSe | |
), | ||
) | ||
|
||
def _choose_instance_for_time_spine_join( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This helper function is only used once at this point, but it will be used again for the |
||
self, agg_time_dimension_instances: Sequence[TimeDimensionInstance] | ||
) -> TimeDimensionInstance: | ||
"""Find the agg_time_dimension instance with the smallest grain to use for the time spine join.""" | ||
# We can't use a date part spec to join to the time spine, so filter those out. | ||
agg_time_dimension_instances = [ | ||
instance for instance in agg_time_dimension_instances if not instance.spec.date_part | ||
] | ||
assert len(agg_time_dimension_instances) > 0, ( | ||
"No appropriate agg_time_dimension was found to join to the time spine. " | ||
"This indicates that the dataflow plan was configured incorrectly." | ||
) | ||
agg_time_dimension_instances.sort(key=lambda instance: instance.spec.time_granularity.base_granularity.to_int()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related to this PR, but we probably should just make the |
||
return agg_time_dimension_instances[0] | ||
|
||
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet: # noqa: D102 | ||
parent_data_set = node.parent_node.accept(self) | ||
parent_alias = self._next_unique_table_alias() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -803,6 +803,7 @@ def transform(self, instance_set: InstanceSet) -> SelectColumnSet: # noqa: D102 | |
) | ||
|
||
|
||
# TODO: delete this class & all uses. It doesn't do anything. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is supposed to change the column names, but if the specs didn't change, then the column names shouldn't either, so it seems like it doesn't do anything. LMK if I'm overlooking something here! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This used to be needed but maybe things have changed. There were cases where nodes did not output data sets where the column name in the generated SQL did not match the defined format. Trying to remember what that was though, but there was a bug fix that required this transform. |
||
class ChangeAssociatedColumns(InstanceSetTransform[InstanceSet]): | ||
"""Change the columns associated with instances to the one specified by the resolver. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should be unchanged. I needed to move it to resolve circular imports.