Skip to content

Commit

Permalink
addressed reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
WilliamDee committed Dec 16, 2023
1 parent 64398ac commit 4e34a5a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
3 changes: 1 addition & 2 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def _build_aggregated_conversion_node(
unaggregated_conversion_measure_node = AddGeneratedUuidColumnNode(
parent_node=conversion_measure_recipe.source_node
)
primary_key_specs = (MetadataSpec.from_name(MetricFlowReservedKeywords.MF_INTERNAL_UUID.value),)

# Get the agg time dimension for each measure used for matching conversion time windows
base_time_dimension_spec = TimeDimensionSpec.from_reference(
Expand Down Expand Up @@ -320,7 +319,7 @@ def _build_aggregated_conversion_node(
conversion_node=unaggregated_conversion_measure_node,
conversion_measure_spec=conversion_measure_spec.measure_spec,
conversion_time_dimension_spec=conversion_time_dimension_spec,
conversion_primary_key_specs=primary_key_specs,
unique_identifier_keys=(MetadataSpec.from_name(MetricFlowReservedKeywords.MF_INTERNAL_UUID.value),),
entity_spec=entity_spec,
window=window,
constant_properties=constant_property_specs,
Expand Down
15 changes: 7 additions & 8 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,14 +1473,13 @@ def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> S
CreateSqlColumnReferencesForInstances(base_data_set_alias, self._column_association_resolver)
)

conversion_primary_key_col_names = tuple(
self._column_association_resolver.resolve_spec(spec).column_name
for spec in node.conversion_primary_key_specs
unique_conversion_col_names = tuple(
self._column_association_resolver.resolve_spec(spec).column_name for spec in node.unique_identifier_keys
)
partition_by_columns: Tuple[str, ...] = (
entity_column_name,
conversion_time_dimension_column_name,
) + conversion_primary_key_col_names
) + unique_conversion_col_names
if node.constant_properties:
partition_by_columns += tuple(
conversion_column_name for _, conversion_column_name in constant_property_column_names
Expand Down Expand Up @@ -1528,7 +1527,7 @@ def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> S
)

# Deduplicate the fanout results
conversion_primary_key_select_columns = tuple(
conversion_unique_key_select_columns = tuple(
SqlSelectColumn(
expr=SqlColumnReferenceExpression(
SqlColumnReference(
Expand All @@ -1538,15 +1537,15 @@ def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> S
),
column_alias=column_name,
)
for column_name in conversion_primary_key_col_names
for column_name in unique_conversion_col_names
)
additional_conversion_select_columns = conversion_data_set_output_instance_set.transform(
CreateSelectColumnsForInstances(conversion_data_set_alias, self._column_association_resolver)
).as_tuple()
deduped_sql_select_node = SqlSelectStatementNode(
description=f"Dedupe the fanout on {node.conversion_primary_key_specs} in the conversion data set",
description=f"Dedupe the fanout with {','.join(spec.qualified_name for spec in node.unique_identifier_keys)} in the conversion data set",
select_columns=base_sql_select_columns
+ conversion_primary_key_select_columns
+ conversion_unique_key_select_columns
+ additional_conversion_select_columns,
from_source=base_data_set.sql_select_node,
from_source_alias=base_data_set_alias,
Expand Down

0 comments on commit 4e34a5a

Please sign in to comment.