Skip to content

Commit

Permalink
Conversion Metrics (#916)
Browse files Browse the repository at this point in the history
* Added AddGeneratedUuidColumnNode

* add JoinConversionEventsNode

* fix lookups and resolvers

* integrate conversion node to dataflow plan builder

* added constant properties support

* filter unnecessary columns to avoid needing to distinct select large amount of columns

* bump DSI version

* changelog

* added fixtures and tests

* updated conversion metric snapshots

* addressed reviews

* fix merge conflicts from ambiguous group by changes

* updated snapshots

* update frame clause to default as redshift requires it
  • Loading branch information
WilliamDee authored Dec 16, 2023
1 parent 50115e0 commit b63619a
Show file tree
Hide file tree
Showing 515 changed files with 38,321 additions and 15,333 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231210-165636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support for Conversion Metrics
time: 2023-12-10T16:56:36.680817-05:00
custom:
Author: WilliamDee
Issue: "252"
2 changes: 2 additions & 0 deletions metricflow/dag/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
DATAFLOW_NODE_SET_MEASURE_AGGREGATION_TIME = "sma"
DATAFLOW_NODE_SEMI_ADDITIVE_JOIN_ID_PREFIX = "saj"
DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX = "jts"
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
Expand Down
291 changes: 270 additions & 21 deletions metricflow/dataflow/builder/dataflow_plan_builder.py

Large diffs are not rendered by default.

179 changes: 179 additions & 0 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity

from metricflow.dag.id_generation import (
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX,
DATAFLOW_NODE_AGGREGATE_MEASURES_ID_PREFIX,
DATAFLOW_NODE_COMBINE_AGGREGATED_OUTPUTS_ID_PREFIX,
DATAFLOW_NODE_COMPUTE_METRICS_ID_PREFIX,
DATAFLOW_NODE_CONSTRAIN_TIME_RANGE_ID_PREFIX,
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX,
DATAFLOW_NODE_JOIN_SELF_OVER_TIME_RANGE_ID_PREFIX,
DATAFLOW_NODE_JOIN_TO_STANDARD_OUTPUT_ID_PREFIX,
DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX,
Expand All @@ -40,8 +42,12 @@
from metricflow.dataset.sql_dataset import SqlDataSet
from metricflow.filters.time_constraint import TimeRangeConstraint
from metricflow.specs.specs import (
ConstantPropertySpec,
EntitySpec,
InstanceSpec,
InstanceSpecSet,
LinklessEntitySpec,
MeasureSpec,
MetricInputMeasureSpec,
MetricSpec,
OrderBySpec,
Expand Down Expand Up @@ -175,6 +181,14 @@ def visit_metric_time_dimension_transform_node( # noqa: D
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> VisitorOutputT: # noqa: D
pass

@abstractmethod
def visit_add_generated_uuid_column_node(self, node: AddGeneratedUuidColumnNode) -> VisitorOutputT: # noqa: D
pass

@abstractmethod
def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> VisitorOutputT: # noqa: D
pass


class BaseOutput(DataflowPlanNode, ABC):
"""A node that outputs data in a "base" format.
Expand Down Expand Up @@ -1248,6 +1262,171 @@ def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> ConstrainT
)


class AddGeneratedUuidColumnNode(BaseOutput):
"""Adds a UUID column."""

def __init__(self, parent_node: BaseOutput) -> None: # noqa: D
super().__init__(node_id=self.create_unique_id(), parent_nodes=[parent_node])

@classmethod
def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D
return visitor.visit_add_generated_uuid_column_node(self)

@property
def description(self) -> str: # noqa: D
return "Adds an internally generated UUID column"

@property
def parent_node(self) -> DataflowPlanNode: # noqa: D
assert len(self.parent_nodes) == 1
return self.parent_nodes[0]

@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return super().displayed_properties

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D
return isinstance(other_node, self.__class__)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> AddGeneratedUuidColumnNode: # noqa: D
assert len(new_parent_nodes) == 1
return AddGeneratedUuidColumnNode(parent_node=new_parent_nodes[0])


class JoinConversionEventsNode(BaseOutput):
"""Builds a data set containing successful conversion events."""

def __init__(
self,
base_node: BaseOutput,
base_time_dimension_spec: TimeDimensionSpec,
conversion_node: BaseOutput,
conversion_measure_spec: MeasureSpec,
conversion_time_dimension_spec: TimeDimensionSpec,
unique_identifier_keys: Sequence[InstanceSpec],
entity_spec: EntitySpec,
window: Optional[MetricTimeWindow] = None,
constant_properties: Optional[Sequence[ConstantPropertySpec]] = None,
) -> None:
"""Constructor.
Args:
base_node: node containing dataset for computing base events.
base_time_dimension_spec: time dimension for the base events to compute against.
conversion_node: node containing dataset to join base node for computing conversion events.
conversion_measure_spec: expose this measure in the resulting dataset for aggregation.
conversion_time_dimension_spec: time dimension for the conversion events to compute against.
unique_identifier_keys: columns to uniquely identify each conversion event.
entity_spec: the specific entity in which the conversion is happening for.
window: time range bound for when a conversion is still considered valid (default: INF).
constant_properties: optional set of elements (either dimension/entity) to join the base
event to the conversion event.
"""
self._base_node = base_node
self._conversion_node = conversion_node
self._base_time_dimension_spec = base_time_dimension_spec
self._conversion_measure_spec = conversion_measure_spec
self._conversion_time_dimension_spec = conversion_time_dimension_spec
self._unique_identifier_keys = unique_identifier_keys
self._entity_spec = entity_spec
self._window = window
self._constant_properties = constant_properties
super().__init__(node_id=self.create_unique_id(), parent_nodes=[base_node, conversion_node])

@classmethod
def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D
return visitor.visit_join_conversion_events_node(self)

@property
def base_node(self) -> DataflowPlanNode: # noqa: D
return self._base_node

@property
def conversion_node(self) -> DataflowPlanNode: # noqa: D
return self._conversion_node

@property
def conversion_measure_spec(self) -> MeasureSpec: # noqa: D
return self._conversion_measure_spec

@property
def base_time_dimension_spec(self) -> TimeDimensionSpec: # noqa: D
return self._base_time_dimension_spec

@property
def conversion_time_dimension_spec(self) -> TimeDimensionSpec: # noqa: D
return self._conversion_time_dimension_spec

@property
def unique_identifier_keys(self) -> Sequence[InstanceSpec]: # noqa: D
return self._unique_identifier_keys

@property
def entity_spec(self) -> EntitySpec: # noqa: D
return self._entity_spec

@property
def window(self) -> Optional[MetricTimeWindow]: # noqa: D
return self._window

@property
def constant_properties(self) -> Optional[Sequence[ConstantPropertySpec]]: # noqa: D
return self._constant_properties

@property
def description(self) -> str: # noqa: D
return f"Find conversions for {self.entity_spec.qualified_name} within the range of {f'{self.window.count} {self.window.granularity.value}' if self.window else 'INF'}"

@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return (
super().displayed_properties
+ [
DisplayedProperty("base_time_dimension_spec", self.base_time_dimension_spec),
DisplayedProperty("conversion_time_dimension_spec", self.conversion_time_dimension_spec),
DisplayedProperty("entity_spec", self.entity_spec),
DisplayedProperty("window", self.window),
]
+ [DisplayedProperty("unique_key_specs", unique_spec) for unique_spec in self.unique_identifier_keys]
+ [
DisplayedProperty("constant_property", constant_property)
for constant_property in self.constant_properties or []
]
)

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D
return (
isinstance(other_node, self.__class__)
and other_node.base_time_dimension_spec == self.base_time_dimension_spec
and other_node.conversion_time_dimension_spec == self.conversion_time_dimension_spec
and other_node.conversion_measure_spec == self.conversion_measure_spec
and other_node.unique_identifier_keys == self.unique_identifier_keys
and other_node.entity_spec == self.entity_spec
and other_node.window == self.window
and other_node.constant_properties == self.constant_properties
)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinConversionEventsNode: # noqa: D
assert len(new_parent_nodes) == 2
return JoinConversionEventsNode(
base_node=new_parent_nodes[0],
base_time_dimension_spec=self.base_time_dimension_spec,
conversion_node=new_parent_nodes[1],
conversion_measure_spec=self.conversion_measure_spec,
conversion_time_dimension_spec=self.conversion_time_dimension_spec,
unique_identifier_keys=self.unique_identifier_keys,
entity_spec=self.entity_spec,
window=self.window,
constant_properties=self.constant_properties,
)


class DataflowPlan(MetricFlowDag[SinkOutput]):
"""Describes the flow of metric data as it goes from source nodes to sink nodes in the graph."""

Expand Down
14 changes: 14 additions & 0 deletions metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Optional, Sequence

from metricflow.dataflow.dataflow_plan import (
AddGeneratedUuidColumnNode,
AggregateMeasuresNode,
BaseOutput,
CombineAggregatedOutputsNode,
Expand All @@ -13,6 +14,7 @@
DataflowPlanNode,
DataflowPlanNodeVisitor,
FilterElementsNode,
JoinConversionEventsNode,
JoinOverTimeRangeNode,
JoinToBaseOutputNode,
JoinToTimeSpineNode,
Expand Down Expand Up @@ -401,3 +403,15 @@ def visit_metric_time_dimension_transform_node( # noqa: D
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> ComputeMetricsBranchCombinerResult: # noqa: D
self._log_visit_node_type(node)
return self._default_handler(node)

def visit_add_generated_uuid_column_node( # noqa: D
self, node: AddGeneratedUuidColumnNode
) -> ComputeMetricsBranchCombinerResult:
self._log_visit_node_type(node)
return self._default_handler(node)

def visit_join_conversion_events_node( # noqa: D
self, node: JoinConversionEventsNode
) -> ComputeMetricsBranchCombinerResult:
self._log_visit_node_type(node)
return self._default_handler(node)
10 changes: 10 additions & 0 deletions metricflow/dataflow/optimizer/source_scan/source_scan_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from metricflow.dag.id_generation import OPTIMIZED_DATAFLOW_PLAN_PREFIX, IdGeneratorRegistry
from metricflow.dataflow.dataflow_plan import (
AddGeneratedUuidColumnNode,
AggregateMeasuresNode,
BaseOutput,
CombineAggregatedOutputsNode,
Expand All @@ -15,6 +16,7 @@
DataflowPlanNode,
DataflowPlanNodeVisitor,
FilterElementsNode,
JoinConversionEventsNode,
JoinOverTimeRangeNode,
JoinToBaseOutputNode,
JoinToTimeSpineNode,
Expand Down Expand Up @@ -331,3 +333,11 @@ def optimize(self, dataflow_plan: DataflowPlan) -> DataflowPlan: # noqa: D
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
return self._default_base_output_handler(node)

def visit_add_generated_uuid_column_node(self, node: AddGeneratedUuidColumnNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
return self._default_base_output_handler(node)

def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
return self._default_base_output_handler(node)
10 changes: 10 additions & 0 deletions metricflow/model/semantics/linkable_spec_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,16 @@ def __init__(
or metric.type is MetricType.RATIO
):
linkable_sets_for_measure.append(self._get_linkable_element_set_for_measure(measure))
elif metric.type is MetricType.CONVERSION:
conversion_type_params = metric.type_params.conversion_type_params
assert (
conversion_type_params
), "A conversion metric should have type_params.conversion_type_params defined."
if measure == conversion_type_params.base_measure.measure_reference:
# Only can query against the base measure's linkable elements
# as it joins everything back to the base measure data set so
# there is no way of getting the conversion elements
linkable_sets_for_measure.append(self._get_linkable_element_set_for_measure(measure))
else:
assert_values_exhausted(metric.type)

Expand Down
4 changes: 3 additions & 1 deletion metricflow/model/semantics/metric_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def configured_input_measure_for_metric( # noqa: D
if metric.type is MetricType.CUMULATIVE or metric.type is MetricType.SIMPLE:
assert len(metric.input_measures) == 1, "Simple and cumulative metrics should have one input measure."
return metric.input_measures[0]
elif metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED:
elif (
metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED or metric.type is MetricType.CONVERSION
):
return None
else:
assert_values_exhausted(metric.type)
Expand Down
29 changes: 28 additions & 1 deletion metricflow/model/semantics/semantic_model_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
from metricflow.model.semantics.element_group import ElementGrouper
from metricflow.model.spec_converters import MeasureConverter
from metricflow.protocols.semantics import SemanticModelAccessor
from metricflow.specs.specs import MeasureSpec, NonAdditiveDimensionSpec
from metricflow.specs.specs import (
DimensionSpec,
EntitySpec,
LinkableInstanceSpec,
MeasureSpec,
NonAdditiveDimensionSpec,
TimeDimensionSpec,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,6 +64,9 @@ def __init__( # noqa: D
self._entity_ref_to_entity: Dict[EntityReference, Optional[str]] = {}
self._semantic_model_names: Set[str] = set()

self._dimension_ref_to_spec: Dict[DimensionReference, DimensionSpec] = {}
self._entity_ref_to_spec: Dict[EntityReference, EntitySpec] = {}

self._semantic_model_to_aggregation_time_dimensions: Dict[
SemanticModelReference, ElementGrouper[TimeDimensionReference, MeasureSpec]
] = {}
Expand Down Expand Up @@ -242,10 +252,17 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
for dim in semantic_model.dimensions:
self._linkable_reference_index[dim.reference].append(semantic_model)
self._dimension_index[dim.reference].append(semantic_model)
self._dimension_ref_to_spec[dim.time_dimension_reference or dim.reference] = (
TimeDimensionSpec.from_name(dim.name)
if dim.type is DimensionType.TIME
else DimensionSpec.from_name(dim.name)
)

for entity in semantic_model.entities:
self._entity_ref_to_entity[entity.reference] = entity.name
self._entity_index[entity.name].append(semantic_model)
self._linkable_reference_index[entity.reference].append(semantic_model)
self._entity_ref_to_spec[entity.reference] = EntitySpec.from_name(entity.name)

self._semantic_model_reference_to_semantic_model[semantic_model.reference] = semantic_model

Expand Down Expand Up @@ -320,3 +337,13 @@ def entity_links_for_local_elements(semantic_model: SemanticModel) -> Sequence[E
possible_entity_links.add(entity.reference)

return sorted(possible_entity_links, key=lambda entity_reference: entity_reference.element_name)

def get_element_spec_for_name(self, element_name: str) -> LinkableInstanceSpec: # noqa: D
if TimeDimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
return self._dimension_ref_to_spec[TimeDimensionReference(element_name=element_name)]
elif DimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
return self._dimension_ref_to_spec[DimensionReference(element_name=element_name)]
elif EntityReference(element_name=element_name) in self._entity_ref_to_spec:
return self._entity_ref_to_spec[EntityReference(element_name=element_name)]
else:
raise ValueError(f"Unable to find linkable element {element_name} in manifest")
Loading

0 comments on commit b63619a

Please sign in to comment.