Skip to content

Commit

Permalink
Merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Dec 8, 2023
2 parents e44d4d7 + 868748a commit 1374fb8
Show file tree
Hide file tree
Showing 20 changed files with 2,645 additions and 3 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20231206-155814.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: 'Bug fix: Keep where constraint column until used for nested derived offset
metric queries.'
time: 2023-12-06T15:58:14.895141-08:00
custom:
Author: courtneyholcomb
Issue: "930"
15 changes: 14 additions & 1 deletion metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ def _build_derived_metric_output_node(
f"{pformat_big_objects(metric_input_specs=metric_input_specs)}"
)

required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs, where_constraint=where_constraint
)

parent_nodes: List[BaseOutput] = []
for metric_input_spec in metric_input_specs:
parent_nodes.append(
Expand All @@ -240,7 +244,9 @@ def _build_derived_metric_output_node(
offset_window=metric_input_spec.offset_window,
offset_to_grain=metric_input_spec.offset_to_grain,
),
queried_linkable_specs=queried_linkable_specs,
queried_linkable_specs=queried_linkable_specs
if not metric_spec.has_time_offset
else required_linkable_specs,
# If metric is offset, we'll apply where constraint after offset to avoid removing values unexpectedly.
# Time constraint will be applied by INNER JOINing to time spine.
where_constraint=where_constraint if not metric_spec.has_time_offset else None,
Expand Down Expand Up @@ -268,6 +274,13 @@ def _build_derived_metric_output_node(
)
if where_constraint:
output_node = WhereConstraintNode(parent_node=output_node, where_constraint=where_constraint)
if not extraneous_linkable_specs.is_subset_of(queried_linkable_specs):
output_node = FilterElementsNode(
parent_node=output_node,
include_specs=InstanceSpecSet(metric_specs=(metric_spec.without_offset(),)).merge(
queried_linkable_specs.as_spec_set
),
)
return output_node

def _build_any_metric_output_node(
Expand Down
9 changes: 8 additions & 1 deletion metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from metricflow.dataset.convert_semantic_model import SemanticModelToDataSetConverter
from metricflow.dataset.dataset import DataSet
from metricflow.dataset.semantic_model_adapter import SemanticModelDataSet
from metricflow.engine.models import Dimension, Entity, Measure, Metric
from metricflow.engine.models import Dimension, Entity, Measure, Metric, SavedQuery
from metricflow.engine.time_source import ServerTimeSource
from metricflow.errors.errors import ExecutionException
from metricflow.execution.execution_plan import ExecutionPlan, SqlQuery
Expand Down Expand Up @@ -679,6 +679,13 @@ def list_metrics(self) -> List[Metric]: # noqa: D
for metric in metrics
]

@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def list_saved_queries(self) -> List[SavedQuery]: # noqa: D
return [
SavedQuery.from_pydantic(saved_query)
for saved_query in self._semantic_manifest_lookup.semantic_manifest.saved_queries
]

@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def get_dimension_values( # noqa: D
self,
Expand Down
28 changes: 28 additions & 0 deletions metricflow/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
from dbt_semantic_interfaces.protocols.metadata import Metadata
from dbt_semantic_interfaces.protocols.metric import Metric as SemanticManifestMetric
from dbt_semantic_interfaces.protocols.metric import MetricInputMeasure, MetricType, MetricTypeParams
from dbt_semantic_interfaces.protocols.saved_query import (
SavedQuery as SemanticManifestSavedQuery,
)
from dbt_semantic_interfaces.protocols.saved_query import (
SavedQueryQueryParams,
)
from dbt_semantic_interfaces.protocols.where_filter import WhereFilterIntersection
from dbt_semantic_interfaces.transformations.add_input_metric_measures import AddInputMetricMeasuresRule
from dbt_semantic_interfaces.type_enums.aggregation_type import AggregationType
Expand Down Expand Up @@ -144,3 +150,25 @@ class Measure:
description: Optional[str] = None
expr: Optional[str] = None
agg_params: Optional[MeasureAggregationParameters] = None


@dataclass(frozen=True)
class SavedQuery:
"""Dataclass representation of a SavedQuery."""

name: str
description: Optional[str]
label: Optional[str]
query_params: SavedQueryQueryParams
metadata: Optional[Metadata]

@classmethod
def from_pydantic(cls, pydantic_saved_query: SemanticManifestSavedQuery) -> SavedQuery:
"""Build from pydantic SavedQuery object."""
return cls(
name=pydantic_saved_query.name,
description=pydantic_saved_query.description,
label=pydantic_saved_query.label,
query_params=pydantic_saved_query.query_params,
metadata=pydantic_saved_query.metadata,
)
4 changes: 4 additions & 0 deletions metricflow/specs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ def reference(self) -> MetricReference:
def has_time_offset(self) -> bool: # noqa: D
return bool(self.offset_window or self.offset_to_grain)

def without_offset(self) -> MetricSpec:
"""Represents the metric spec with any time offsets removed."""
return MetricSpec(element_name=self.element_name, constraint=self.constraint, alias=self.alias)


@dataclass(frozen=True)
class CumulativeMeasureDescription:
Expand Down
50 changes: 50 additions & 0 deletions metricflow/test/integration/test_cases/itest_metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1693,3 +1693,53 @@ integration_test:
) subq_4
ON {{ render_date_sub("subq_5", "metric_time__day", 2, TimeGranularity.DAY) }} = subq_4.metric_time__day
GROUP BY subq_5.metric_time__day
---
integration_test:
name: nested_derived_metric_offset_with_joined_where_constraint_not_selected
description: Tests a nested derived metric where the outer metric has an offset and where constraint that requires an additional join, and is not used in the select statement.
model: SIMPLE_MODEL
metrics: ["bookings_offset_twice"]
group_by_objs: [{"name": "metric_time", "grain": "day"}]
where_filter: "{{ render_dimension_template('booking__is_instant') }}"
check_query: |
SELECT
metric_time__day
, 2 * bookings_offset_once AS bookings_offset_twice
FROM (
SELECT
metric_time__day
, bookings_offset_once
FROM (
SELECT
subq_10.ds AS metric_time__day
, subq_8.booking__is_instant AS booking__is_instant
, subq_8.bookings_offset_once AS bookings_offset_once
FROM {{ source_schema }}.mf_time_spine subq_10
INNER JOIN (
SELECT
metric_time__day
, booking__is_instant
, 2 * bookings AS bookings_offset_once
FROM (
SELECT
subq_3.ds AS metric_time__day
, subq_1.booking__is_instant AS booking__is_instant
, SUM(subq_1.bookings) AS bookings
FROM {{ source_schema }}.mf_time_spine subq_3
INNER JOIN (
SELECT
{{ render_date_trunc("ds", TimeGranularity.DAY) }} AS metric_time__day
, is_instant AS booking__is_instant
, 1 AS bookings
FROM {{ source_schema }}.fct_bookings
) subq_1
ON {{ render_date_sub("subq_3", "ds", 5, TimeGranularity.DAY) }} = subq_1.metric_time__day
GROUP BY
subq_3.ds
, subq_1.booking__is_instant
) subq_7
) subq_8
ON {{ render_date_sub("subq_10", "ds", 2, TimeGranularity.DAY) }} = subq_8.metric_time__day
) subq_11
WHERE booking__is_instant
)
31 changes: 31 additions & 0 deletions metricflow/test/query_rendering/test_derived_metric_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,3 +496,34 @@ def test_cumulative_time_offset_metric_with_time_constraint( # noqa: D
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)


@pytest.mark.sql_engine_snapshot
def test_nested_derived_metric_offset_with_joined_where_constraint_not_selected( # noqa: D
request: FixtureRequest,
mf_test_session_state: MetricFlowTestSessionState,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
create_source_tables: bool,
column_association_resolver: ColumnAssociationResolver,
) -> None:
dataflow_plan = dataflow_plan_builder.build_plan(
query_spec=MetricFlowQuerySpec(
metric_specs=(MetricSpec(element_name="bookings_offset_twice"),),
time_dimension_specs=(MTD_SPEC_DAY,),
where_constraint=WhereSpecFactory(
column_association_resolver=column_association_resolver,
).create_from_where_filter(
PydanticWhereFilter(where_sql_template=("{{ Dimension('booking__is_instant') }}"))
),
)
)

convert_and_check(
request=request,
mf_test_session_state=mf_test_session_state,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
node=dataflow_plan.sink_output_nodes[0].parent_node,
)
Loading

0 comments on commit 1374fb8

Please sign in to comment.