-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JoinToTimeSpineNode
bug fix
#1541
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
ada07f3
Pass base grain specs into dataflow plan nodes before joining custom …
courtneyholcomb 66971f1
Bug fix: treat metric_time and agg_time the same when both are includ…
courtneyholcomb ae8281b
Update DuckDB snapshots
courtneyholcomb 9f2853c
Update snapshots for other SQL engines
courtneyholcomb da476c8
Changelog
courtneyholcomb 3c7b851
Feedback: use Sequence type
courtneyholcomb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
kind: Fixes | ||
body: Always treat metric_time and the agg_time_dimension the same in the JoinToTimeSpineNode. | ||
time: 2024-11-21T07:39:23.698194-08:00 | ||
custom: | ||
Author: courtneyholcomb | ||
Issue: "1541" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,9 +6,8 @@ | |
from typing import Callable, Dict, FrozenSet, List, Optional, Sequence, Set, Tuple, TypeVar, Union | ||
|
||
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted | ||
from dbt_semantic_interfaces.naming.keywords import METRIC_TIME_ELEMENT_NAME | ||
from dbt_semantic_interfaces.protocols.metric import MetricInputMeasure, MetricType | ||
from dbt_semantic_interfaces.references import EntityReference, MetricModelReference, SemanticModelElementReference | ||
from dbt_semantic_interfaces.references import MetricModelReference, SemanticModelElementReference | ||
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType | ||
from dbt_semantic_interfaces.type_enums.conversion_calculation_type import ConversionCalculationType | ||
from dbt_semantic_interfaces.type_enums.period_agg import PeriodAggregation | ||
|
@@ -471,11 +470,10 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat | |
parent_data_set = node.parent_node.accept(self) | ||
parent_data_set_alias = self._next_unique_table_alias() | ||
|
||
# For the purposes of this node, use base grains. Custom grains will be joined later in the dataflow plan. | ||
agg_time_dimension_specs = tuple({spec.with_base_grain() for spec in node.queried_agg_time_dimension_specs}) | ||
|
||
# Assemble time_spine dataset with a column for each agg_time_dimension requested. | ||
agg_time_dimension_instances = parent_data_set.instances_for_time_dimensions(agg_time_dimension_specs) | ||
agg_time_dimension_instances = parent_data_set.instances_for_time_dimensions( | ||
node.queried_agg_time_dimension_specs | ||
) | ||
time_spine_data_set_alias = self._next_unique_table_alias() | ||
time_spine_data_set = self._make_time_spine_data_set( | ||
agg_time_dimension_instances=agg_time_dimension_instances, time_range_constraint=node.time_range_constraint | ||
|
@@ -492,7 +490,7 @@ def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> SqlDat | |
# Build select columns, replacing agg_time_dimensions from the parent node with columns from the time spine. | ||
table_alias_to_instance_set[time_spine_data_set_alias] = time_spine_data_set.instance_set | ||
table_alias_to_instance_set[parent_data_set_alias] = parent_data_set.instance_set.transform( | ||
FilterElements(exclude_specs=InstanceSpecSet(time_dimension_specs=agg_time_dimension_specs)) | ||
FilterElements(exclude_specs=InstanceSpecSet(time_dimension_specs=node.queried_agg_time_dimension_specs)) | ||
) | ||
select_columns = create_simple_select_columns_for_instance_sets( | ||
column_resolver=self._column_association_resolver, table_alias_to_instance_set=table_alias_to_instance_set | ||
|
@@ -1382,33 +1380,30 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet | |
parent_data_set = node.parent_node.accept(self) | ||
parent_alias = self._next_unique_table_alias() | ||
|
||
if node.use_custom_agg_time_dimension: | ||
agg_time_dimension = node.requested_agg_time_dimension_specs[0] | ||
agg_time_element_name = agg_time_dimension.element_name | ||
agg_time_entity_links: Tuple[EntityReference, ...] = agg_time_dimension.entity_links | ||
else: | ||
agg_time_element_name = METRIC_TIME_ELEMENT_NAME | ||
agg_time_entity_links = () | ||
agg_time_dimension_instances = parent_data_set.instances_for_time_dimensions( | ||
node.requested_agg_time_dimension_specs | ||
) | ||
|
||
# Find the time dimension instances in the parent data set that match the one we want to join with. | ||
agg_time_dimension_instances: List[TimeDimensionInstance] = [] | ||
for instance in parent_data_set.instance_set.time_dimension_instances: | ||
if ( | ||
instance.spec.date_part is None # Ensure we don't join using an instance with date part | ||
and instance.spec.element_name == agg_time_element_name | ||
and instance.spec.entity_links == agg_time_entity_links | ||
): | ||
agg_time_dimension_instances.append(instance) | ||
# Select the dimension for the join from the parent node because it may not have been included in the request. | ||
# Default to using metric_time for the join if it was requested, otherwise use the agg_time_dimension. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the one place we still have some differentiation between |
||
included_metric_time_instances = [ | ||
instance for instance in agg_time_dimension_instances if instance.spec.is_metric_time | ||
] | ||
if included_metric_time_instances: | ||
join_on_time_dimension_sample = included_metric_time_instances[0].spec | ||
else: | ||
join_on_time_dimension_sample = agg_time_dimension_instances[0].spec | ||
|
||
# Choose the instance with the smallest base granularity available. | ||
agg_time_dimension_instances.sort(key=lambda instance: instance.spec.time_granularity.base_granularity.to_int()) | ||
assert len(agg_time_dimension_instances) > 0, ( | ||
"Couldn't find requested agg_time_dimension in parent data set. The dataflow plan may have been " | ||
"configured incorrectly." | ||
agg_time_dimension_instance_for_join = self._choose_instance_for_time_spine_join( | ||
[ | ||
instance | ||
for instance in parent_data_set.instance_set.time_dimension_instances | ||
if instance.spec.element_name == join_on_time_dimension_sample.element_name | ||
and instance.spec.entity_links == join_on_time_dimension_sample.entity_links | ||
] | ||
) | ||
agg_time_dimension_instance_for_join = agg_time_dimension_instances[0] | ||
|
||
# Build time spine data set using the requested agg_time_dimension name. | ||
# Build time spine data set with just the agg_time_dimension instance needed for the join. | ||
time_spine_alias = self._next_unique_table_alias() | ||
time_spine_dataset = self._make_time_spine_data_set( | ||
agg_time_dimension_instances=(agg_time_dimension_instance_for_join,), | ||
|
@@ -1432,24 +1427,14 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet | |
time_dimensions_to_select_from_parent: Tuple[TimeDimensionInstance, ...] = () | ||
time_dimensions_to_select_from_time_spine: Tuple[TimeDimensionInstance, ...] = () | ||
for time_dimension_instance in parent_data_set.instance_set.time_dimension_instances: | ||
if ( | ||
time_dimension_instance.spec.element_name == agg_time_element_name | ||
and time_dimension_instance.spec.entity_links == agg_time_entity_links | ||
): | ||
if time_dimension_instance in agg_time_dimension_instances: | ||
time_dimensions_to_select_from_time_spine += (time_dimension_instance,) | ||
else: | ||
time_dimensions_to_select_from_parent += (time_dimension_instance,) | ||
parent_instance_set = InstanceSet( | ||
measure_instances=parent_data_set.instance_set.measure_instances, | ||
dimension_instances=parent_data_set.instance_set.dimension_instances, | ||
time_dimension_instances=tuple( | ||
time_dimension_instance | ||
for time_dimension_instance in parent_data_set.instance_set.time_dimension_instances | ||
if not ( | ||
time_dimension_instance.spec.element_name == agg_time_element_name | ||
and time_dimension_instance.spec.entity_links == agg_time_entity_links | ||
) | ||
), | ||
time_dimension_instances=time_dimensions_to_select_from_parent, | ||
entity_instances=parent_data_set.instance_set.entity_instances, | ||
metric_instances=parent_data_set.instance_set.metric_instances, | ||
metadata_instances=parent_data_set.instance_set.metadata_instances, | ||
|
@@ -1481,8 +1466,8 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet | |
) | ||
|
||
# Add requested granularities (if different from time_spine) and date_parts to time spine column. | ||
for time_dimension_instance in time_dimensions_to_select_from_time_spine: | ||
time_dimension_spec = time_dimension_instance.spec | ||
for parent_time_dimension_instance in time_dimensions_to_select_from_time_spine: | ||
time_dimension_spec = parent_time_dimension_instance.spec | ||
if ( | ||
time_dimension_spec.time_granularity.base_granularity.to_int() | ||
< original_time_spine_dim_instance.spec.time_granularity.base_granularity.to_int() | ||
|
@@ -1521,13 +1506,9 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet | |
# Apply date_part to time spine column select expression. | ||
if time_dimension_spec.date_part: | ||
select_expr = SqlExtractExpression.create(date_part=time_dimension_spec.date_part, arg=select_expr) | ||
time_dim_spec = original_time_spine_dim_instance.spec.with_grain_and_date_part( | ||
time_granularity=time_dimension_spec.time_granularity, date_part=time_dimension_spec.date_part | ||
) | ||
time_spine_dim_instance = TimeDimensionInstance( | ||
defined_from=original_time_spine_dim_instance.defined_from, | ||
associated_columns=(self._column_association_resolver.resolve_spec(time_dim_spec),), | ||
spec=time_dim_spec, | ||
|
||
time_spine_dim_instance = parent_time_dimension_instance.with_new_defined_from( | ||
original_time_spine_dim_instance.defined_from | ||
) | ||
time_spine_dim_instances.append(time_spine_dim_instance) | ||
time_spine_select_columns.append( | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is handled in the
__post_init__
forJoinToTimeSpineNode
, so I removed the duplication.