Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow Plan for Min & Max of Distinct Values Query #854

Merged
merged 17 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metricflow/plan_conversion/column_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def visit_entity_spec(self, entity_spec: EntitySpec) -> ColumnAssociation: # no

def visit_metadata_spec(self, metadata_spec: MetadataSpec) -> ColumnAssociation: # noqa: D
return ColumnAssociation(
column_name=metadata_spec.element_name,
column_name=metadata_spec.qualified_name,
single_column_correlation_key=SingleColumnCorrelationKey(),
)

Expand Down
48 changes: 27 additions & 21 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,15 @@
from metricflow.dataset.dataset import DataSet
from metricflow.dataset.sql_dataset import SqlDataSet
from metricflow.filters.time_constraint import TimeRangeConstraint
from metricflow.instances import (
InstanceSet,
MetricInstance,
TimeDimensionInstance,
)
from metricflow.instances import InstanceSet, MetadataInstance, MetricInstance, TimeDimensionInstance
from metricflow.model.semantic_manifest_lookup import SemanticManifestLookup
from metricflow.plan_conversion.instance_converters import (
AddLinkToLinkableElements,
AddMetrics,
AliasAggregatedMeasures,
ChangeAssociatedColumns,
ChangeMeasureAggregationState,
ConvertToMetadata,
CreateSelectColumnForCombineOutputNode,
CreateSelectColumnsForInstances,
CreateSelectColumnsWithMeasuresAggregated,
Expand Down Expand Up @@ -75,6 +72,7 @@
from metricflow.specs.column_assoc import ColumnAssociation, ColumnAssociationResolver, SingleColumnCorrelationKey
from metricflow.specs.specs import (
MeasureSpec,
MetadataSpec,
MetricSpec,
TimeDimensionSpec,
)
Expand Down Expand Up @@ -1324,31 +1322,39 @@ def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> SqlDataSet

def visit_min_max_node(self, node: MinMaxNode) -> SqlDataSet: # noqa: D
parent_data_set = node.parent_node.accept(self)
parent_alias = self._next_unique_table_alias()

parent_table_alias = self._next_unique_table_alias()
assert (
len(parent_data_set.sql_select_node.select_columns) == 1
), "MinMaxNode supports exactly one parent select column."
parent_column_alias_expr = SqlStringExpression(parent_data_set.sql_select_node.select_columns[0].column_alias)

# Build aggregate columns, labeled "min" & "max"
select_columns = [
SqlSelectColumn(
expr=SqlAggregateFunctionExpression(
sql_function=SqlFunction.from_aggregation_type(aggregation_type=agg_type),
sql_function_args=[parent_column_alias_expr],
),
column_alias=agg_type.value,
parent_column_alias = parent_data_set.sql_select_node.select_columns[0].column_alias

select_columns: List[SqlSelectColumn] = []
metadata_instances: List[MetadataInstance] = []
for agg_type in (AggregationType.MIN, AggregationType.MAX):
metadata_spec = MetadataSpec.from_name(name=parent_column_alias, agg_type=agg_type)
output_column_association = self._column_association_resolver.resolve_spec(metadata_spec)
select_columns.append(
SqlSelectColumn(
expr=SqlFunctionExpression.build_expression_from_aggregation_type(
aggregation_type=agg_type,
sql_column_expression=SqlColumnReferenceExpression(
SqlColumnReference(table_alias=parent_table_alias, column_name=parent_column_alias)
),
),
column_alias=output_column_association.column_name,
)
)
for agg_type in (AggregationType.MIN, AggregationType.MAX)
]
metadata_instances.append(
MetadataInstance(associated_columns=(output_column_association,), spec=metadata_spec)
)

return SqlDataSet(
instance_set=parent_data_set.instance_set,
instance_set=parent_data_set.instance_set.transform(ConvertToMetadata(metadata_instances)),
sql_select_node=SqlSelectStatementNode(
description=node.description,
select_columns=tuple(select_columns),
from_source=parent_data_set.sql_select_node,
from_source_alias=parent_alias,
from_source_alias=parent_table_alias,
joins_descs=(),
group_bys=(),
order_bys=(),
Expand Down
12 changes: 12 additions & 0 deletions metricflow/plan_conversion/instance_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,18 @@ def transform(self, instance_set: InstanceSet) -> InstanceSet: # noqa: D
)


class ConvertToMetadata(InstanceSetTransform[InstanceSet]):
"""Removes all instances from old instance set and replaces them with a set of metadata instances."""

def __init__(self, metadata_instances: Sequence[MetadataInstance]) -> None: # noqa: D
self._metadata_instances = metadata_instances

def transform(self, instance_set: InstanceSet) -> InstanceSet: # noqa: D
return InstanceSet(
metadata_instances=tuple(self._metadata_instances),
)


def create_select_columns_for_instance_sets(
column_resolver: ColumnAssociationResolver,
table_alias_to_instance_set: OrderedDict[str, InstanceSet],
Expand Down
7 changes: 4 additions & 3 deletions metricflow/specs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ class MetadataSpec(InstanceSpec):
"""A specification for a specification that is built during the dataflow plan and not defined in config."""

element_name: str
agg_type: Optional[AggregationType] = None

@property
def qualified_name(self) -> str: # noqa: D
return self.element_name
return f"{self.agg_type.value}_{self.element_name}" if self.agg_type else self.element_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a thought here.

We do something similar elsewhere in a temporary way for semi-additive metrics (and we overload aggregation state for it). We likely need to expand that to cover things like auto-aliasing derived metric offsets, since offset metrics have the same name, fundamentally, as non-offset metrics.

I think we'd be better off having a structured representation of these added bits of information. I'll see if I can get a PR up with what I've been thinking about on Monday. If not maybe we put this in and then I'll fast-follow with an update.

We can have the DUNDER conversation over there, since that PR would centralize all of these glued on bits of internal state information that we need to communicate across subqueries and, at least in this case, in the final output column names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - will leave PR this until next week!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on this - have you thought about the above yet?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have! Just chatted with Paul and we think it makes sense to move to name__max and name__min for this PR.

The idea is, eventually, to take this thing:

https://github.com/dbt-labs/metricflow/blob/main/metricflow/plan_conversion/dataflow_to_sql.py#L1103-L1104

and generalize it to a standard property. What that does in the resolver is glue on the aggregation state with a DUNDER to the end of the column name.

So I think we'll move to that model more broadly. If you use name__max and name__min here I can update the naming logic and consolidate it when I fix up the rest of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Just updated to use those names.

Anything else missing for this PR? Would love to get it merged if not!


@staticmethod
def from_name(name: str) -> MetadataSpec: # noqa: D
return MetadataSpec(element_name=name)
def from_name(name: str, agg_type: Optional[AggregationType] = None) -> MetadataSpec: # noqa: D
return MetadataSpec(element_name=name, agg_type=agg_type)

def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D
return visitor.visit_metadata_spec(self)
Expand Down
12 changes: 6 additions & 6 deletions metricflow/test/integration/test_cases/itest_dimensions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ integration_test:
min_max_only: true
check_query: |
SELECT
MIN(user__home_state) AS min
, MAX(user__home_state) AS max
MIN(user__home_state) AS min_user__home_state
, MAX(user__home_state) AS max_user__home_state
FROM (
SELECT
u.home_state AS user__home_state
Expand All @@ -209,8 +209,8 @@ integration_test:
min_max_only: true
check_query: |
SELECT
MIN(verification__ds__day) AS min
, MAX(verification__ds__day) AS max
MIN(verification__ds__day) AS min_verification__ds__day
, MAX(verification__ds__day) AS max_verification__ds__day
FROM (
SELECT
v.ds as verification__ds__day
Expand All @@ -227,8 +227,8 @@ integration_test:
min_max_only: true
check_query: |
SELECT
MIN(verification__ds__month) AS min
, MAX(verification__ds__month) AS max
MIN(verification__ds__month) AS min_verification__ds__month
, MAX(verification__ds__month) AS max_verification__ds__month
FROM (
SELECT
{{ render_date_trunc("v.ds", TimeGranularity.MONTH) }} as verification__ds__month
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Calculate min and max
SELECT
MIN(listing__country_latest) AS min
, MAX(listing__country_latest) AS max
MIN(subq_1.listing__country_latest) AS min_listing__country_latest
, MAX(subq_1.listing__country_latest) AS max_listing__country_latest
FROM (
-- Pass Only Elements:
-- ['listing__country_latest']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Calculate min and max
SELECT
MIN(listing__country_latest) AS min
, MAX(listing__country_latest) AS max
MIN(listing__country_latest) AS min_listing__country_latest
, MAX(listing__country_latest) AS max_listing__country_latest
FROM (
-- Read Elements From Semantic Model 'listings_latest'
-- Pass Only Elements:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Calculate min and max
SELECT
MIN(booking__paid_at__day) AS min
, MAX(booking__paid_at__day) AS max
MIN(subq_1.booking__paid_at__day) AS min_booking__paid_at__day
, MAX(subq_1.booking__paid_at__day) AS max_booking__paid_at__day
FROM (
-- Pass Only Elements:
-- ['booking__paid_at__day']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Calculate min and max
SELECT
MIN(booking__paid_at__day) AS min
, MAX(booking__paid_at__day) AS max
MIN(booking__paid_at__day) AS min_booking__paid_at__day
, MAX(booking__paid_at__day) AS max_booking__paid_at__day
FROM (
-- Read Elements From Semantic Model 'bookings_source'
-- Pass Only Elements:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Calculate min and max
SELECT
MIN(booking__paid_at__quarter) AS min
, MAX(booking__paid_at__quarter) AS max
MIN(subq_1.booking__paid_at__quarter) AS min_booking__paid_at__quarter
, MAX(subq_1.booking__paid_at__quarter) AS max_booking__paid_at__quarter
FROM (
-- Pass Only Elements:
-- ['booking__paid_at__quarter']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Calculate min and max
SELECT
MIN(booking__paid_at__quarter) AS min
, MAX(booking__paid_at__quarter) AS max
MIN(booking__paid_at__quarter) AS min_booking__paid_at__quarter
, MAX(booking__paid_at__quarter) AS max_booking__paid_at__quarter
FROM (
-- Read Elements From Semantic Model 'bookings_source'
-- Pass Only Elements:
Expand Down
Loading