From ab57be676e98c90ea84993603edb036d047f8da1 Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Wed, 24 Apr 2024 16:03:36 -0700 Subject: [PATCH] WIP --- metricflow/dataflow/builder/node_evaluator.py | 12 +- metricflow/plan_conversion/dataflow_to_sql.py | 36 +-- .../plan_conversion/instance_converters.py | 1 + metricflow/plan_conversion/node_processor.py | 6 +- metricflow/specs/spec_set_transforms.py | 1 + .../test_metric_filter_rendering.py | 207 +++++++++++++++++- 6 files changed, 229 insertions(+), 34 deletions(-) diff --git a/metricflow/dataflow/builder/node_evaluator.py b/metricflow/dataflow/builder/node_evaluator.py index 995d7fca29..74ec2e8a65 100644 --- a/metricflow/dataflow/builder/node_evaluator.py +++ b/metricflow/dataflow/builder/node_evaluator.py @@ -219,11 +219,6 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( # then produce the linkable spec. See comments further below for more details. for entity_spec_in_right_node in entity_specs_in_right_node: - # If an entity has links, what that means and whether it can be used is unclear at the moment, - # so skip it. - if len(entity_spec_in_right_node.entity_links) > 0: - continue - entity_instance_in_right_node = None for instance in data_set_in_right_node.instance_set.entity_instances: if instance.spec == entity_spec_in_right_node: @@ -253,6 +248,13 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( if entity_instance_in_left_node is None: # The right node can have a superset of entities. + if isinstance(right_node, ComputeMetricsNode): + print("exited here") + print( + "left node entities:", + [inst.spec.reference for inst in left_node_instance_set.entity_instances], + ) + print("entity_spec_in_right_node:", entity_spec_in_right_node.reference) continue assert len(entity_instance_in_left_node.defined_from) == 1 diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index fb4d065180..0294e75926 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -67,6 +67,7 @@ CreateSqlColumnReferencesForInstances, FilterElements, FilterLinkableInstancesWithLeadingLink, + InstanceSetTransform, RemoveMeasures, RemoveMetrics, UpdateMeasureFillNullsWith, @@ -608,8 +609,7 @@ def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> SqlDataSet: # Add select columns that would compute the metrics to the select columns. metric_select_columns = [] - metric_instances = [] - group_by_metric_instances = [] + metric_instances: List[MetricInstance] = [] for metric_spec in node.metric_specs: metric = self._metric_lookup.get_metric(metric_spec.reference) @@ -723,23 +723,25 @@ def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> SqlDataSet: spec=metric_spec, ) ) - group_by_metric_instances.append( - GroupByMetricInstance( - associated_columns=(output_column_association,), - defined_from=MetricModelReference(metric_name=metric_spec.element_name), - spec=GroupByMetricSpec( - element_name=metric_spec.element_name, - entity_links=(), - metric_subquery_entity_links=(), # TODO - ), - ) + + transform_func: InstanceSetTransform = AddMetrics(metric_instances) + if node.for_group_by_source_node: + assert ( + len(metric_instances) == 1 and len(output_instance_set.entity_instances) == 1 + ), "Group by metrics currently only support exactly one metric grouped by exactly one entity." + metric_instance = metric_instances[0] + entity_instance = output_instance_set.entity_instances[0] + group_by_metric_instance = GroupByMetricInstance( + associated_columns=metric_instance.associated_columns, + defined_from=metric_instance.defined_from, + spec=GroupByMetricSpec( + element_name=metric_spec.element_name, + entity_links=(), # check this + metric_subquery_entity_links=entity_instance.spec.entity_links, + ), ) + transform_func = AddGroupByMetrics([group_by_metric_instance]) - transform_func = ( - AddGroupByMetrics(group_by_metric_instances) - if node.for_group_by_source_node - else AddMetrics(metric_instances) - ) output_instance_set = output_instance_set.transform(transform_func) combined_select_column_set = non_metric_select_column_set.merge( diff --git a/metricflow/plan_conversion/instance_converters.py b/metricflow/plan_conversion/instance_converters.py index e7c4b60a4f..550d90f452 100644 --- a/metricflow/plan_conversion/instance_converters.py +++ b/metricflow/plan_conversion/instance_converters.py @@ -745,6 +745,7 @@ def transform(self, instance_set: InstanceSet) -> InstanceSet: # noqa: D102 ) +# TODO: should this be singular, allowing only one group by metric? class AddGroupByMetrics(InstanceSetTransform[InstanceSet]): """Adds the given metric instances to the instance set.""" diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 89dbaa14b4..434bc2e0a2 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -11,6 +11,7 @@ from metricflow.dataflow.dataflow_plan import ( BaseOutput, ) +from metricflow.dataflow.nodes.compute_metrics import ComputeMetricsNode from metricflow.dataflow.nodes.constrain_time import ConstrainTimeRangeNode from metricflow.dataflow.nodes.filter_elements import FilterElementsNode from metricflow.dataflow.nodes.join_to_base import JoinDescription, JoinToBaseOutputNode @@ -130,9 +131,6 @@ def _node_contains_entity( if entity_spec_in_first_node.reference != entity_reference: continue - if len(entity_spec_in_first_node.entity_links) > 0: - continue - assert ( len(entity_instance_in_first_node.defined_from) == 1 ), "Multiple items in defined_from not yet supported" @@ -216,6 +214,8 @@ def _get_candidates_nodes_for_multi_hop( right_instance_set=data_set_of_second_node_that_can_be_joined.instance_set, on_entity_reference=entity_reference_to_join_first_and_second_nodes, ): + if isinstance(second_node_that_could_be_joined, ComputeMetricsNode): + print("exited multi hop here") continue # filter measures out of joinable_node diff --git a/metricflow/specs/spec_set_transforms.py b/metricflow/specs/spec_set_transforms.py index ad6e26cacd..f2241daa28 100644 --- a/metricflow/specs/spec_set_transforms.py +++ b/metricflow/specs/spec_set_transforms.py @@ -15,5 +15,6 @@ def transform(self, spec_set: InstanceSpecSet) -> Set[str]: # noqa: D102 .union({x.element_name for x in spec_set.dimension_specs}) .union({x.element_name for x in spec_set.time_dimension_specs}) .union({x.element_name for x in spec_set.entity_specs}) + .union({x.element_name for x in spec_set.metric_specs}) .union({x.element_name for x in spec_set.group_by_metric_specs}) ) diff --git a/tests/query_rendering/test_metric_filter_rendering.py b/tests/query_rendering/test_metric_filter_rendering.py index 7e0a58fc9e..225cc89a49 100644 --- a/tests/query_rendering/test_metric_filter_rendering.py +++ b/tests/query_rendering/test_metric_filter_rendering.py @@ -1,5 +1,7 @@ from __future__ import annotations +import string + import pytest from _pytest.fixtures import FixtureRequest from dbt_semantic_interfaces.implementations.filters.where_filter import PydanticWhereFilter @@ -283,8 +285,43 @@ def test_multi_hop_with_explicit_entity_link( ) -@pytest.mark.sql_engine_snapshot -def test_multi_hop_without_explicit_entity_link( +# TODO - need a different example because this one won't work due to ambiguous join path +# @pytest.mark.sql_engine_snapshot +# def test_multi_hop_without_explicit_entity_link( +# request: FixtureRequest, +# mf_test_configuration: MetricFlowTestConfiguration, +# dataflow_plan_builder: DataflowPlanBuilder, +# sql_client: SqlClient, +# dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, +# query_parser: MetricFlowQueryParser, +# ) -> None: +# """Tests a metric filter that requires multiple join hops and does not state the entity link. + +# Should return the same SQL as if the entity link was stated (group by resolution determines the entity link). +# """ +# query_spec = query_parser.parse_and_validate_query( +# metric_names=("listings",), +# where_constraint=PydanticWhereFilter( +# where_sql_template="{{ Metric('instant_bookings', ['company']) }} > 2", +# ), +# ) +# dataflow_plan = dataflow_plan_builder.build_plan(query_spec) + +# convert_and_check( +# request=request, +# mf_test_configuration=mf_test_configuration, +# dataflow_to_sql_converter=dataflow_to_sql_converter, +# sql_client=sql_client, +# node=dataflow_plan.sink_output_nodes[0].parent_node, +# ) + + +# Remove from group by options: cumulative & time offset metrics (temporary) - TODO +# Fix join path resolution for these. MORE IMPORTANT! +# The group by shows you the path to get from the query-level measure to the entity, not the filter-level metric to the entity. That's confusing. Is that how it should be?? + + +def test_all_available_single_hop_metric_filters( request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, dataflow_plan_builder: DataflowPlanBuilder, @@ -292,16 +329,167 @@ def test_multi_hop_without_explicit_entity_link( dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, query_parser: MetricFlowQueryParser, ) -> None: - """Tests a metric filter that requires multiple join hops and does not state the entity link. + """Checks that all allowed metric filters do not error.""" + # TODO: get filter options from linkable spec resolver instead of hard coding + filter_strs = [ + # "listing__active_listings", # why doesn't this work? + "listing__approximate_continuous_booking_value_p99", + "listing__approximate_discrete_booking_value_p99", + "listing__average_booking_value", + # "listing__average_instant_booking_value", + "listing__bookers", + "listing__booking_fees", + # "listing__booking_fees_last_week_per_booker_this_week", # offset metrics require metric_time + "listing__booking_fees_per_booker", + # "listing__booking_fees_since_start_of_month", + "listing__booking_payments", + "listing__booking_value", + # "listing__booking_value_for_non_null_listing_id", + # "listing__booking_value_p99", + # "listing__booking_value_per_view", + # "listing__booking_value_sub_instant", + # "listing__booking_value_sub_instant_add_10", + "listing__bookings", + # "listing__bookings_5_day_lag", + # "listing__bookings_at_start_of_month", + "listing__bookings_fill_nulls_with_0", + "listing__bookings_fill_nulls_with_0_without_time_spine", + # "listing__bookings_growth_2_weeks", + # "listing__bookings_growth_2_weeks_fill_nulls_with_0", + # "listing__bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset", + # "listing__bookings_growth_since_start_of_month", + "listing__bookings_join_to_time_spine", + # "listing__bookings_month_start_compared_to_1_month_prior", + # "listing__bookings_offset_once", + # "listing__bookings_offset_twice", + "listing__bookings_per_booker", + "listing__bookings_per_dollar", + "listing__bookings_per_listing", + # "listing__bookings_per_lux_listing_derived", + "listing__bookings_per_view", + # "listing__every_2_days_bookers_2_days_ago", + # "listing__every_two_days_bookers", + # "listing__every_two_days_bookers_fill_nulls_with_0", + # "listing__instant_booking_fraction_of_max_value", + # "listing__instant_booking_value", + # "listing__instant_booking_value_ratio", + "listing__instant_bookings", + # "listing__instant_lux_booking_value_rate", + "listing__instant_plus_non_referred_bookings_pct", + "listing__largest_listing", + "listing__listings", + # "listing__lux_booking_fraction_of_max_value", + # "listing__lux_booking_value_rate_expr", + # "listing__lux_listings", + # "listing__max_booking_value", + # "listing__median_booking_value", + # "listing__min_booking_value", + "listing__nested_fill_nulls_without_time_spine", + "listing__non_referred_bookings_pct", + "listing__referred_bookings", + "listing__smallest_listing", + "listing__twice_bookings_fill_nulls_with_0_without_time_spine", + "listing__views", + "listing__views_times_booking_value", + # "user__active_listings", + # "user__approximate_continuous_booking_value_p99", + # "user__approximate_discrete_booking_value_p99", + # "user__average_booking_value", + # "user__average_instant_booking_value", + # "user__bookers", + # "user__booking_fees", + # "user__booking_fees_last_week_per_booker_this_week", + # "user__booking_fees_per_booker", + # "user__booking_fees_since_start_of_month", + # "user__booking_payments", + # "user__booking_value", + # "user__booking_value_for_non_null_listing_id", + # "user__booking_value_p99", + # "user__booking_value_per_view", + # "user__booking_value_sub_instant", + # "user__booking_value_sub_instant_add_10", + # "user__bookings", + # "user__bookings_5_day_lag", + # "user__bookings_at_start_of_month", + # "user__bookings_fill_nulls_with_0", + # "user__bookings_fill_nulls_with_0_without_time_spine", + # "user__bookings_growth_2_weeks", + # "user__bookings_growth_2_weeks_fill_nulls_with_0", + # "user__bookings_growth_2_weeks_fill_nulls_with_0_for_non_offset", + # "user__bookings_growth_since_start_of_month", + # "user__bookings_join_to_time_spine", + # "user__bookings_month_start_compared_to_1_month_prior", + # "user__bookings_offset_once", + # "user__bookings_offset_twice", + # "user__bookings_per_booker", + # "user__bookings_per_dollar", + # "user__bookings_per_listing", + # "user__bookings_per_lux_listing_derived", + # "user__bookings_per_view", + # "user__every_2_days_bookers_2_days_ago", + # "user__every_two_days_bookers", + # "user__every_two_days_bookers_fill_nulls_with_0", + "user__identity_verifications", + # "user__instant_booking_fraction_of_max_value", + # "user__instant_booking_value", + # "user__instant_booking_value_ratio", + # "user__instant_bookings", + # "user__instant_lux_booking_value_rate", + # "user__instant_plus_non_referred_bookings_pct", + "user__largest_listing", + "user__listings", + # "user__lux_booking_fraction_of_max_value", + # "user__lux_booking_value_rate_expr", + # "user__lux_listings", + # "user__max_booking_value", + # "user__median_booking_value", + # "user__min_booking_value", + # "user__nested_fill_nulls_without_time_spine", + # "user__non_referred_bookings_pct", + # "user__referred_bookings", + # "user__regional_starting_balance_ratios", + "user__revenue", + "user__revenue_all_time", + "user__revenue_mtd", + "user__smallest_listing", + # "user__total_account_balance_first_day", + "user__trailing_2_months_revenue", + "user__trailing_2_months_revenue_sub_10", + # "user__twice_bookings_fill_nulls_with_0_without_time_spine", + "user__views", + # "user__views_times_booking_value", + "user__visit_buy_conversion_rate", + "user__visit_buy_conversion_rate_7days", + "user__visit_buy_conversion_rate_7days_fill_nulls_with_0", + "user__visit_buy_conversion_rate_by_session", + "user__visit_buy_conversions", + ] + for filter_str in filter_strs: + entity_name, metric_name = filter_str.split("__") + query_spec = query_parser.parse_and_validate_query( + metric_names=("listings",), + where_constraint=PydanticWhereFilter( + where_sql_template=string.Template("{{ Metric('$metric_name', ['$entity_name']) }} > 2").substitute( + metric_name=metric_name, entity_name=entity_name + ), + ), + ) + dataflow_plan_builder.build_plan(query_spec) - Should return the same SQL as if the entity link was stated (group by resolution determines the entity link). - """ + +@pytest.mark.sql_engine_snapshot +def test_testy_test( + request: FixtureRequest, + mf_test_configuration: MetricFlowTestConfiguration, + dataflow_plan_builder: DataflowPlanBuilder, + sql_client: SqlClient, + dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter, + query_parser: MetricFlowQueryParser, +) -> None: query_spec = query_parser.parse_and_validate_query( - metric_names=("listings",), - where_constraint=PydanticWhereFilter( - where_sql_template="{{ Metric('instant_bookings', ['company']) }} > 2", - ), + metric_names=("instant_bookings",), group_by_names=("user__company",) ) + assert 0, query_spec.entity_specs dataflow_plan = dataflow_plan_builder.build_plan(query_spec) convert_and_check( @@ -311,3 +499,4 @@ def test_multi_hop_without_explicit_entity_link( sql_client=sql_client, node=dataflow_plan.sink_output_nodes[0].parent_node, ) + # Join path used: listing__user__company