Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversion Metrics #916

Merged
merged 14 commits into from
Dec 16, 2023
Merged
  •  
  •  
  •  
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231210-165636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Support for Conversion Metrics
time: 2023-12-10T16:56:36.680817-05:00
custom:
Author: WilliamDee
Issue: "252"
2 changes: 2 additions & 0 deletions metricflow/dag/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
DATAFLOW_NODE_SET_MEASURE_AGGREGATION_TIME = "sma"
DATAFLOW_NODE_SEMI_ADDITIVE_JOIN_ID_PREFIX = "saj"
DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX = "jts"
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX = "auid"
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX = "jce"

SQL_EXPR_COLUMN_REFERENCE_ID_PREFIX = "cr"
SQL_EXPR_COMPARISON_ID_PREFIX = "cmp"
Expand Down
291 changes: 270 additions & 21 deletions metricflow/dataflow/builder/dataflow_plan_builder.py

Large diffs are not rendered by default.

179 changes: 179 additions & 0 deletions metricflow/dataflow/dataflow_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity

from metricflow.dag.id_generation import (
DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX,
DATAFLOW_NODE_AGGREGATE_MEASURES_ID_PREFIX,
DATAFLOW_NODE_COMBINE_AGGREGATED_OUTPUTS_ID_PREFIX,
DATAFLOW_NODE_COMPUTE_METRICS_ID_PREFIX,
DATAFLOW_NODE_CONSTRAIN_TIME_RANGE_ID_PREFIX,
DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX,
DATAFLOW_NODE_JOIN_SELF_OVER_TIME_RANGE_ID_PREFIX,
DATAFLOW_NODE_JOIN_TO_STANDARD_OUTPUT_ID_PREFIX,
DATAFLOW_NODE_JOIN_TO_TIME_SPINE_ID_PREFIX,
Expand All @@ -40,8 +42,12 @@
from metricflow.dataset.sql_dataset import SqlDataSet
from metricflow.filters.time_constraint import TimeRangeConstraint
from metricflow.specs.specs import (
ConstantPropertySpec,
EntitySpec,
InstanceSpec,
InstanceSpecSet,
LinklessEntitySpec,
MeasureSpec,
MetricInputMeasureSpec,
MetricSpec,
OrderBySpec,
Expand Down Expand Up @@ -175,6 +181,14 @@ def visit_metric_time_dimension_transform_node( # noqa: D
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> VisitorOutputT: # noqa: D
pass

@abstractmethod
def visit_add_generated_uuid_column_node(self, node: AddGeneratedUuidColumnNode) -> VisitorOutputT: # noqa: D
pass

@abstractmethod
def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> VisitorOutputT: # noqa: D
pass


class BaseOutput(DataflowPlanNode, ABC):
"""A node that outputs data in a "base" format.
Expand Down Expand Up @@ -1248,6 +1262,171 @@ def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> ConstrainT
)


class AddGeneratedUuidColumnNode(BaseOutput):
"""Adds a UUID column."""

def __init__(self, parent_node: BaseOutput) -> None: # noqa: D
super().__init__(node_id=self.create_unique_id(), parent_nodes=[parent_node])

@classmethod
def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_ADD_UUID_COLUMN_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D
return visitor.visit_add_generated_uuid_column_node(self)

@property
def description(self) -> str: # noqa: D
return "Adds an internally generated UUID column"

@property
def parent_node(self) -> DataflowPlanNode: # noqa: D
assert len(self.parent_nodes) == 1
return self.parent_nodes[0]

@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return super().displayed_properties

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D
return isinstance(other_node, self.__class__)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> AddGeneratedUuidColumnNode: # noqa: D
assert len(new_parent_nodes) == 1
return AddGeneratedUuidColumnNode(parent_node=new_parent_nodes[0])


class JoinConversionEventsNode(BaseOutput):
"""Builds a data set containing successful conversion events."""

def __init__(
self,
base_node: BaseOutput,
base_time_dimension_spec: TimeDimensionSpec,
conversion_node: BaseOutput,
conversion_measure_spec: MeasureSpec,
conversion_time_dimension_spec: TimeDimensionSpec,
unique_identifier_keys: Sequence[InstanceSpec],
entity_spec: EntitySpec,
window: Optional[MetricTimeWindow] = None,
constant_properties: Optional[Sequence[ConstantPropertySpec]] = None,
) -> None:
"""Constructor.

Args:
base_node: node containing dataset for computing base events.
base_time_dimension_spec: time dimension for the base events to compute against.
conversion_node: node containing dataset to join base node for computing conversion events.
conversion_measure_spec: expose this measure in the resulting dataset for aggregation.
conversion_time_dimension_spec: time dimension for the conversion events to compute against.
unique_identifier_keys: columns to uniquely identify each conversion event.
entity_spec: the specific entity in which the conversion is happening for.
window: time range bound for when a conversion is still considered valid (default: INF).
constant_properties: optional set of elements (either dimension/entity) to join the base
event to the conversion event.
"""
self._base_node = base_node
self._conversion_node = conversion_node
self._base_time_dimension_spec = base_time_dimension_spec
self._conversion_measure_spec = conversion_measure_spec
self._conversion_time_dimension_spec = conversion_time_dimension_spec
self._unique_identifier_keys = unique_identifier_keys
self._entity_spec = entity_spec
self._window = window
self._constant_properties = constant_properties
super().__init__(node_id=self.create_unique_id(), parent_nodes=[base_node, conversion_node])

@classmethod
def id_prefix(cls) -> str: # noqa: D
return DATAFLOW_NODE_JOIN_CONVERSION_EVENTS_PREFIX

def accept(self, visitor: DataflowPlanNodeVisitor[VisitorOutputT]) -> VisitorOutputT: # noqa: D
return visitor.visit_join_conversion_events_node(self)

@property
def base_node(self) -> DataflowPlanNode: # noqa: D
return self._base_node

@property
def conversion_node(self) -> DataflowPlanNode: # noqa: D
return self._conversion_node

@property
def conversion_measure_spec(self) -> MeasureSpec: # noqa: D
return self._conversion_measure_spec

@property
def base_time_dimension_spec(self) -> TimeDimensionSpec: # noqa: D
return self._base_time_dimension_spec

@property
def conversion_time_dimension_spec(self) -> TimeDimensionSpec: # noqa: D
return self._conversion_time_dimension_spec

@property
def unique_identifier_keys(self) -> Sequence[InstanceSpec]: # noqa: D
return self._unique_identifier_keys

@property
def entity_spec(self) -> EntitySpec: # noqa: D
return self._entity_spec

@property
def window(self) -> Optional[MetricTimeWindow]: # noqa: D
return self._window

@property
def constant_properties(self) -> Optional[Sequence[ConstantPropertySpec]]: # noqa: D
return self._constant_properties

@property
def description(self) -> str: # noqa: D
return f"Find conversions for {self.entity_spec.qualified_name} within the range of {f'{self.window.count} {self.window.granularity.value}' if self.window else 'INF'}"

@property
def displayed_properties(self) -> List[DisplayedProperty]: # noqa: D
return (
super().displayed_properties
+ [
DisplayedProperty("base_time_dimension_spec", self.base_time_dimension_spec),
DisplayedProperty("conversion_time_dimension_spec", self.conversion_time_dimension_spec),
DisplayedProperty("entity_spec", self.entity_spec),
DisplayedProperty("window", self.window),
]
+ [DisplayedProperty("unique_key_specs", unique_spec) for unique_spec in self.unique_identifier_keys]
+ [
DisplayedProperty("constant_property", constant_property)
for constant_property in self.constant_properties or []
]
)

def functionally_identical(self, other_node: DataflowPlanNode) -> bool: # noqa: D
return (
isinstance(other_node, self.__class__)
and other_node.base_time_dimension_spec == self.base_time_dimension_spec
and other_node.conversion_time_dimension_spec == self.conversion_time_dimension_spec
and other_node.conversion_measure_spec == self.conversion_measure_spec
and other_node.unique_identifier_keys == self.unique_identifier_keys
and other_node.entity_spec == self.entity_spec
and other_node.window == self.window
and other_node.constant_properties == self.constant_properties
)

def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinConversionEventsNode: # noqa: D
assert len(new_parent_nodes) == 2
return JoinConversionEventsNode(
base_node=new_parent_nodes[0],
base_time_dimension_spec=self.base_time_dimension_spec,
conversion_node=new_parent_nodes[1],
conversion_measure_spec=self.conversion_measure_spec,
conversion_time_dimension_spec=self.conversion_time_dimension_spec,
unique_identifier_keys=self.unique_identifier_keys,
entity_spec=self.entity_spec,
window=self.window,
constant_properties=self.constant_properties,
)


class DataflowPlan(MetricFlowDag[SinkOutput]):
"""Describes the flow of metric data as it goes from source nodes to sink nodes in the graph."""

Expand Down
14 changes: 14 additions & 0 deletions metricflow/dataflow/optimizer/source_scan/cm_branch_combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List, Optional, Sequence

from metricflow.dataflow.dataflow_plan import (
AddGeneratedUuidColumnNode,
AggregateMeasuresNode,
BaseOutput,
CombineAggregatedOutputsNode,
Expand All @@ -13,6 +14,7 @@
DataflowPlanNode,
DataflowPlanNodeVisitor,
FilterElementsNode,
JoinConversionEventsNode,
JoinOverTimeRangeNode,
JoinToBaseOutputNode,
JoinToTimeSpineNode,
Expand Down Expand Up @@ -401,3 +403,15 @@ def visit_metric_time_dimension_transform_node( # noqa: D
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> ComputeMetricsBranchCombinerResult: # noqa: D
self._log_visit_node_type(node)
return self._default_handler(node)

def visit_add_generated_uuid_column_node( # noqa: D
self, node: AddGeneratedUuidColumnNode
) -> ComputeMetricsBranchCombinerResult:
self._log_visit_node_type(node)
return self._default_handler(node)

def visit_join_conversion_events_node( # noqa: D
self, node: JoinConversionEventsNode
) -> ComputeMetricsBranchCombinerResult:
self._log_visit_node_type(node)
return self._default_handler(node)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from metricflow.dag.id_generation import OPTIMIZED_DATAFLOW_PLAN_PREFIX, IdGeneratorRegistry
from metricflow.dataflow.dataflow_plan import (
AddGeneratedUuidColumnNode,
AggregateMeasuresNode,
BaseOutput,
CombineAggregatedOutputsNode,
Expand All @@ -15,6 +16,7 @@
DataflowPlanNode,
DataflowPlanNodeVisitor,
FilterElementsNode,
JoinConversionEventsNode,
JoinOverTimeRangeNode,
JoinToBaseOutputNode,
JoinToTimeSpineNode,
Expand Down Expand Up @@ -331,3 +333,11 @@ def optimize(self, dataflow_plan: DataflowPlan) -> DataflowPlan: # noqa: D
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
return self._default_base_output_handler(node)

def visit_add_generated_uuid_column_node(self, node: AddGeneratedUuidColumnNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
return self._default_base_output_handler(node)

def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> OptimizeBranchResult: # noqa: D
self._log_visit_node_type(node)
return self._default_base_output_handler(node)
10 changes: 10 additions & 0 deletions metricflow/model/semantics/linkable_spec_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,16 @@ def __init__(
or metric.type is MetricType.RATIO
):
linkable_sets_for_measure.append(self._get_linkable_element_set_for_measure(measure))
elif metric.type is MetricType.CONVERSION:
conversion_type_params = metric.type_params.conversion_type_params
assert (
conversion_type_params
), "A conversion metric should have type_params.conversion_type_params defined."
if measure == conversion_type_params.base_measure.measure_reference:
# Only can query against the base measure's linkable elements
WilliamDee marked this conversation as resolved.
Show resolved Hide resolved
# as it joins everything back to the base measure data set so
# there is no way of getting the conversion elements
linkable_sets_for_measure.append(self._get_linkable_element_set_for_measure(measure))
else:
assert_values_exhausted(metric.type)

Expand Down
4 changes: 3 additions & 1 deletion metricflow/model/semantics/metric_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def configured_input_measure_for_metric( # noqa: D
if metric.type is MetricType.CUMULATIVE or metric.type is MetricType.SIMPLE:
assert len(metric.input_measures) == 1, "Simple and cumulative metrics should have one input measure."
return metric.input_measures[0]
elif metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED:
elif (
metric.type is MetricType.RATIO or metric.type is MetricType.DERIVED or metric.type is MetricType.CONVERSION
):
return None
else:
assert_values_exhausted(metric.type)
Expand Down
29 changes: 28 additions & 1 deletion metricflow/model/semantics/semantic_model_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
from metricflow.model.semantics.element_group import ElementGrouper
from metricflow.model.spec_converters import MeasureConverter
from metricflow.protocols.semantics import SemanticModelAccessor
from metricflow.specs.specs import MeasureSpec, NonAdditiveDimensionSpec
from metricflow.specs.specs import (
DimensionSpec,
EntitySpec,
LinkableInstanceSpec,
MeasureSpec,
NonAdditiveDimensionSpec,
TimeDimensionSpec,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,6 +64,9 @@ def __init__( # noqa: D
self._entity_ref_to_entity: Dict[EntityReference, Optional[str]] = {}
self._semantic_model_names: Set[str] = set()

self._dimension_ref_to_spec: Dict[DimensionReference, DimensionSpec] = {}
self._entity_ref_to_spec: Dict[EntityReference, EntitySpec] = {}

self._semantic_model_to_aggregation_time_dimensions: Dict[
SemanticModelReference, ElementGrouper[TimeDimensionReference, MeasureSpec]
] = {}
Expand Down Expand Up @@ -242,10 +252,17 @@ def _add_semantic_model(self, semantic_model: SemanticModel) -> None:
for dim in semantic_model.dimensions:
self._linkable_reference_index[dim.reference].append(semantic_model)
self._dimension_index[dim.reference].append(semantic_model)
self._dimension_ref_to_spec[dim.time_dimension_reference or dim.reference] = (
TimeDimensionSpec.from_name(dim.name)
if dim.type is DimensionType.TIME
else DimensionSpec.from_name(dim.name)
)

for entity in semantic_model.entities:
self._entity_ref_to_entity[entity.reference] = entity.name
self._entity_index[entity.name].append(semantic_model)
self._linkable_reference_index[entity.reference].append(semantic_model)
self._entity_ref_to_spec[entity.reference] = EntitySpec.from_name(entity.name)

self._semantic_model_reference_to_semantic_model[semantic_model.reference] = semantic_model

Expand Down Expand Up @@ -320,3 +337,13 @@ def entity_links_for_local_elements(semantic_model: SemanticModel) -> Sequence[E
possible_entity_links.add(entity.reference)

return sorted(possible_entity_links, key=lambda entity_reference: entity_reference.element_name)

def get_element_spec_for_name(self, element_name: str) -> LinkableInstanceSpec: # noqa: D
if TimeDimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
return self._dimension_ref_to_spec[TimeDimensionReference(element_name=element_name)]
elif DimensionReference(element_name=element_name) in self._dimension_ref_to_spec:
return self._dimension_ref_to_spec[DimensionReference(element_name=element_name)]
elif EntityReference(element_name=element_name) in self._entity_ref_to_spec:
return self._entity_ref_to_spec[EntityReference(element_name=element_name)]
else:
raise ValueError(f"Unable to find linkable element {element_name} in manifest")
Loading
Loading