Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataflowPlan updates for group by metrics #1147

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
0a1d91b
Add helper method SemanticModelJoinPath.from_single_element()
courtneyholcomb Apr 15, 2024
050ee32
Add MetricSubqueryJoinPath class
courtneyholcomb Apr 15, 2024
11186c8
Rename method to specify 'valid' join (as opposed to fan-out join)
courtneyholcomb Apr 15, 2024
98c0fec
Update LinkableMetric to use MetricSubqueryJoinPath and remove redund…
courtneyholcomb Apr 15, 2024
735a451
Map joinable metrics to entities instead of semantic models for more …
courtneyholcomb Apr 15, 2024
7d7d163
Use helper properties
courtneyholcomb Apr 15, 2024
0608c91
Get more specific about avoiding cycle creation
courtneyholcomb Apr 15, 2024
b5bf038
Types cleanup
courtneyholcomb Apr 17, 2024
268cf40
Fix bug in snapshot generation
courtneyholcomb Apr 17, 2024
324f64a
Changelog
courtneyholcomb Apr 17, 2024
f05970e
Move setup dict to avoid circular logic error
courtneyholcomb Apr 18, 2024
23f976c
Privatize method
courtneyholcomb Apr 18, 2024
ecbb5e9
Set JOINED and MULTI_HOP properties from join_path length (cleanup)
courtneyholcomb Apr 18, 2024
5b9210d
Move logic for directly linkable metrics to _get_joined_elements (sem…
courtneyholcomb Apr 18, 2024
b3d54a8
Use new logic for finding linkable metrics (from previous commit) in …
courtneyholcomb Apr 18, 2024
ed59e3d
Update snapshots
courtneyholcomb Apr 18, 2024
4f4c45a
Add test with multi-hop join manifest
courtneyholcomb Apr 18, 2024
054e26f
Update documentation
courtneyholcomb Apr 18, 2024
c4487f4
Update changelog to be more specific
courtneyholcomb Apr 23, 2024
96408a8
Add test for filtering by the same metric you're querying
courtneyholcomb Apr 23, 2024
468a7d9
Update SQL engine snapshots
courtneyholcomb Apr 23, 2024
a769baa
Update method to handle group by metrics
courtneyholcomb Apr 23, 2024
dab28d7
Write test for 2-hop metric subquery join
courtneyholcomb Apr 23, 2024
08d8c0f
Use query parser to build group by metric source node
courtneyholcomb Apr 23, 2024
448c5dd
Parse entity links in Metric.group_by param
courtneyholcomb Apr 23, 2024
0e6830a
Add separate multi-hop test without explicity entity link
courtneyholcomb Apr 23, 2024
1ffc496
Handle group by metrics where missed
courtneyholcomb Apr 23, 2024
da0c8d0
Update LinkableMetrics & related classes to track metric_to_entity_jo…
courtneyholcomb Apr 24, 2024
c9a4c19
Add metric_subquery_entity_links to GroupByMetricSpec
courtneyholcomb Apr 24, 2024
c7681a3
Update DFP join logic to account for goup by metric source nodes
courtneyholcomb Apr 25, 2024
a11cf18
Capture metric_specs missed in ToElementNameSet transform function
courtneyholcomb Apr 25, 2024
5befd3e
Add multi-hop integration test
courtneyholcomb Apr 25, 2024
f42b345
Update AddGroupByMetrics to be singular AddGroupByMetric - should onl…
courtneyholcomb Apr 25, 2024
51a4a78
Remove cumulative and offset metrics from linkable metric options
courtneyholcomb Apr 25, 2024
5416b30
Delete unused expression
courtneyholcomb Apr 25, 2024
48404d7
Test that all expected single-hop filters can successfully build data…
courtneyholcomb Apr 25, 2024
40b80da
WIP - Test all expected metric filters work
courtneyholcomb Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240416-202600.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Refactor group by resolution for metric filters to allow options that were previously excluded inadvertently.
time: 2024-04-16T20:26:00.637184-07:00
custom:
Author: courtneyholcomb
Issue: "1139"
6 changes: 5 additions & 1 deletion metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from metricflow.plan_conversion.node_processor import PreJoinNodeProcessor
from metricflow.query.group_by_item.filter_spec_resolution.filter_location import WhereFilterLocation
from metricflow.query.group_by_item.filter_spec_resolution.filter_spec_lookup import FilterSpecResolutionLookUp
from metricflow.query.query_parser import MetricFlowQueryParser
from metricflow.specs.column_assoc import ColumnAssociationResolver
from metricflow.specs.specs import (
ConstantPropertySpec,
Expand Down Expand Up @@ -123,13 +124,15 @@ def __init__( # noqa: D107
semantic_manifest_lookup: SemanticManifestLookup,
node_output_resolver: DataflowPlanNodeOutputDataSetResolver,
column_association_resolver: ColumnAssociationResolver,
query_parser: MetricFlowQueryParser,
) -> None:
self._semantic_model_lookup = semantic_manifest_lookup.semantic_model_lookup
self._metric_lookup = semantic_manifest_lookup.metric_lookup
self._metric_time_dimension_reference = DataSet.metric_time_dimension_reference()
self._source_node_set = source_node_set
self._column_association_resolver = column_association_resolver
self._node_data_set_resolver = node_output_resolver
self._query_parser = query_parser

def build_plan(
self,
Expand Down Expand Up @@ -825,7 +828,8 @@ def _find_dataflow_recipe(
# MetricGroupBy source nodes could be extremely large (and potentially slow).
candidate_nodes_for_right_side_of_join += [
self._build_query_output_node(
query_spec=group_by_metric_spec.query_spec_for_source_node, for_group_by_source_node=True
query_spec=self._query_parser.build_query_spec_for_group_by_metric_source_node(group_by_metric_spec),
for_group_by_source_node=True,
)
for group_by_metric_spec in linkable_spec_set.group_by_metric_specs
]
Expand Down
5 changes: 0 additions & 5 deletions metricflow/dataflow/builder/node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs(
# then produce the linkable spec. See comments further below for more details.

for entity_spec_in_right_node in entity_specs_in_right_node:
# If an entity has links, what that means and whether it can be used is unclear at the moment,
# so skip it.
if len(entity_spec_in_right_node.entity_links) > 0:
continue

entity_instance_in_right_node = None
for instance in data_set_in_right_node.instance_set.entity_instances:
if instance.spec == entity_spec_in_right_node:
Expand Down
44 changes: 25 additions & 19 deletions metricflow/dataset/sql_dataset.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from __future__ import annotations

from typing import Optional, Sequence
from typing import List, Optional, Sequence

from dbt_semantic_interfaces.references import SemanticModelReference
from typing_extensions import override

from metricflow.assert_one_arg import assert_exactly_one_arg_set
from metricflow.dataset.dataset import DataSet
from metricflow.instances import (
InstanceSet,
)
from metricflow.instances import EntityInstance, InstanceSet
from metricflow.specs.column_assoc import ColumnAssociation
from metricflow.specs.specs import DimensionSpec, EntitySpec, TimeDimensionSpec
from metricflow.sql.sql_plan import (
Expand Down Expand Up @@ -60,25 +58,33 @@ def column_associations_for_entity(
entity_spec: EntitySpec,
) -> Sequence[ColumnAssociation]:
"""Given the name of the entity, return the set of columns associated with it in the data set."""
matching_instances = 0
column_associations_to_return = None
matching_instances_with_same_entity_links: List[EntityInstance] = []
matching_instances_with_different_entity_links: List[EntityInstance] = []
for linkable_instance in self.instance_set.entity_instances:
if (
entity_spec.element_name == linkable_instance.spec.element_name
and entity_spec.entity_links == linkable_instance.spec.entity_links
):
column_associations_to_return = linkable_instance.associated_columns
matching_instances += 1

if matching_instances > 1:
if entity_spec.element_name == linkable_instance.spec.element_name:
if entity_spec.entity_links == linkable_instance.spec.entity_links:
matching_instances_with_same_entity_links.append(linkable_instance)
else:
matching_instances_with_different_entity_links.append(linkable_instance)

# Prioritize instances with matching entity links, but use mismatched links if matching links not found.
# Semantic model source data sets might have multiple instances of the same entity, in which case we want the one without
# links. But group by metric source data sets might only have an instance of the entity with links, and we can join to that.
matching_instances = matching_instances_with_same_entity_links or matching_instances_with_different_entity_links

if len(matching_instances) != 1:
raise RuntimeError(
f"More than one instance with spec {entity_spec} in " f"instance set: {self.instance_set}"
f"Expected exactly one matching instance for {entity_spec} in instance set, but found: {matching_instances}"
)
matching_instance = matching_instances[0]
if not matching_instance.associated_columns:
print("entity links to compare:", entity_spec.entity_links, linkable_instance.spec.entity_links)
raise RuntimeError(
f"No associated columns for entity instance {matching_instance} in data set."
"This indicates internal misconfiguration."
)

if not column_associations_to_return:
raise RuntimeError(f"No instances with spec {entity_spec} in instance set: {self.instance_set}")

return column_associations_to_return
return matching_instance.associated_columns

def column_association_for_dimension(
self,
Expand Down
9 changes: 5 additions & 4 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,16 @@ def __init__(
)
node_output_resolver.cache_output_data_sets(source_node_set.all_nodes)

self._query_parser = query_parser or MetricFlowQueryParser(
semantic_manifest_lookup=self._semantic_manifest_lookup,
)

self._dataflow_plan_builder = DataflowPlanBuilder(
source_node_set=source_node_set,
semantic_manifest_lookup=self._semantic_manifest_lookup,
column_association_resolver=self._column_association_resolver,
node_output_resolver=node_output_resolver,
query_parser=self._query_parser,
)
self._to_sql_query_plan_converter = DataflowToSqlQueryPlanConverter(
column_association_resolver=self._column_association_resolver,
Expand All @@ -399,10 +404,6 @@ def __init__(
)
self._executor = SequentialPlanExecutor()

self._query_parser = query_parser or MetricFlowQueryParser(
semantic_manifest_lookup=self._semantic_manifest_lookup,
)

@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def query(self, mf_request: MetricFlowQueryRequest) -> MetricFlowQueryResult: # noqa: D102
logger.info(f"Starting query request:\n{indent(mf_pformat(mf_request))}")
Expand Down
Loading
Loading