Skip to content

Commit

Permalink
Use query parser to build group by metric source node
Browse files Browse the repository at this point in the history
Needed to resolve the appropriate join path for multi-hop joins. Moved source node-building logic to query parser class to avoid cirular import errors. This change also required adding the query parser as a dependency for the dataflow plan builder.
  • Loading branch information
courtneyholcomb committed Apr 23, 2024
1 parent dab28d7 commit 08d8c0f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 13 deletions.
6 changes: 5 additions & 1 deletion metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from metricflow.plan_conversion.node_processor import PreJoinNodeProcessor
from metricflow.query.group_by_item.filter_spec_resolution.filter_location import WhereFilterLocation
from metricflow.query.group_by_item.filter_spec_resolution.filter_spec_lookup import FilterSpecResolutionLookUp
from metricflow.query.query_parser import MetricFlowQueryParser
from metricflow.specs.column_assoc import ColumnAssociationResolver
from metricflow.specs.specs import (
ConstantPropertySpec,
Expand Down Expand Up @@ -123,13 +124,15 @@ def __init__( # noqa: D107
semantic_manifest_lookup: SemanticManifestLookup,
node_output_resolver: DataflowPlanNodeOutputDataSetResolver,
column_association_resolver: ColumnAssociationResolver,
query_parser: MetricFlowQueryParser,
) -> None:
self._semantic_model_lookup = semantic_manifest_lookup.semantic_model_lookup
self._metric_lookup = semantic_manifest_lookup.metric_lookup
self._metric_time_dimension_reference = DataSet.metric_time_dimension_reference()
self._source_node_set = source_node_set
self._column_association_resolver = column_association_resolver
self._node_data_set_resolver = node_output_resolver
self._query_parser = query_parser

def build_plan(
self,
Expand Down Expand Up @@ -825,7 +828,8 @@ def _find_dataflow_recipe(
# MetricGroupBy source nodes could be extremely large (and potentially slow).
candidate_nodes_for_right_side_of_join += [
self._build_query_output_node(
query_spec=group_by_metric_spec.query_spec_for_source_node, for_group_by_source_node=True
query_spec=self._query_parser.build_query_spec_for_group_by_metric_source_node(group_by_metric_spec),
for_group_by_source_node=True,
)
for group_by_metric_spec in linkable_spec_set.group_by_metric_specs
]
Expand Down
9 changes: 5 additions & 4 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,16 @@ def __init__(
)
node_output_resolver.cache_output_data_sets(source_node_set.all_nodes)

self._query_parser = query_parser or MetricFlowQueryParser(
semantic_manifest_lookup=self._semantic_manifest_lookup,
)

self._dataflow_plan_builder = DataflowPlanBuilder(
source_node_set=source_node_set,
semantic_manifest_lookup=self._semantic_manifest_lookup,
column_association_resolver=self._column_association_resolver,
node_output_resolver=node_output_resolver,
query_parser=self._query_parser,
)
self._to_sql_query_plan_converter = DataflowToSqlQueryPlanConverter(
column_association_resolver=self._column_association_resolver,
Expand All @@ -399,10 +404,6 @@ def __init__(
)
self._executor = SequentialPlanExecutor()

self._query_parser = query_parser or MetricFlowQueryParser(
semantic_manifest_lookup=self._semantic_manifest_lookup,
)

@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def query(self, mf_request: MetricFlowQueryRequest) -> MetricFlowQueryResult: # noqa: D102
logger.info(f"Starting query request:\n{indent(mf_pformat(mf_request))}")
Expand Down
11 changes: 11 additions & 0 deletions metricflow/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
from metricflow.specs.patterns.base_time_grain import BaseTimeGrainPattern
from metricflow.specs.patterns.metric_time_pattern import MetricTimePattern
from metricflow.specs.patterns.none_date_part import NoneDatePartPattern
from metricflow.specs.query_param_implementations import DimensionOrEntityParameter, MetricParameter
from metricflow.specs.specs import (
GroupByMetricSpec,
InstanceSpec,
InstanceSpecSet,
MetricFlowQuerySpec,
Expand Down Expand Up @@ -511,3 +513,12 @@ def _parse_and_validate_query(
return query_spec.with_time_range_constraint(time_constraint)

return query_spec

def build_query_spec_for_group_by_metric_source_node(
self, group_by_metric_spec: GroupByMetricSpec
) -> MetricFlowQuerySpec:
"""Query spec that can be used to build a source node for this spec in the DFP."""
return self.parse_and_validate_query(
metrics=(MetricParameter(group_by_metric_spec.reference.element_name),),
group_by=(DimensionOrEntityParameter(group_by_metric_spec.entity_spec.qualified_name),),
)
18 changes: 10 additions & 8 deletions metricflow/specs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ def without_first_entity_link(self) -> GroupByMetricSpec: # noqa: D102
def without_entity_links(self) -> GroupByMetricSpec: # noqa: D102
return GroupByMetricSpec(element_name=self.element_name, entity_links=())

@property
def last_entity_link(self) -> EntityReference: # noqa: D102
assert len(self.entity_links) > 0, f"Spec does not have any entity links: {self}"
return self.entity_links[-1]

@staticmethod
def from_name(name: str) -> GroupByMetricSpec: # noqa: D102
structured_name = StructuredLinkableSpecName.from_name(name)
Expand All @@ -261,6 +266,11 @@ def from_name(name: str) -> GroupByMetricSpec: # noqa: D102
element_name=structured_name.element_name,
)

@property
def entity_spec(self) -> EntitySpec:
"""Entity that the metric will be grouped by on aggregation."""
return EntitySpec(element_name=self.last_entity_link.element_name, entity_links=self.entity_links[:-1])

def __eq__(self, other: Any) -> bool: # type: ignore[misc] # noqa: D105
if not isinstance(other, GroupByMetricSpec):
return False
Expand All @@ -281,14 +291,6 @@ def as_spec_set(self) -> InstanceSpecSet:
def accept(self, visitor: InstanceSpecVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D102
return visitor.visit_group_by_metric_spec(self)

@property
def query_spec_for_source_node(self) -> MetricFlowQuerySpec:
"""Query spec that can be used to build a source node for this spec in the DFP."""
return MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name=self.element_name),),
entity_specs=tuple(EntitySpec.from_name(entity_link.element_name) for entity_link in self.entity_links),
)


@dataclass(frozen=True)
class LinklessEntitySpec(EntitySpec, SerializableDataclass):
Expand Down
1 change: 1 addition & 0 deletions tests/fixtures/manifest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def dataflow_plan_builder(self) -> DataflowPlanBuilder:
semantic_manifest_lookup=self.semantic_manifest_lookup,
node_output_resolver=self._node_output_resolver.copy(),
column_association_resolver=self.column_association_resolver,
query_parser=self.query_parser,
)

@staticmethod
Expand Down

0 comments on commit 08d8c0f

Please sign in to comment.