From b8b62aa56e2a11e4528d311b49014e80b14c088e Mon Sep 17 00:00:00 2001 From: Courtney Holcomb Date: Fri, 10 Nov 2023 15:50:15 -0800 Subject: [PATCH] Add join_type to JoinDescription --- .../dataflow/builder/dataflow_plan_builder.py | 71 ++++--------------- metricflow/dataflow/builder/node_evaluator.py | 50 ++++++++++--- metricflow/dataflow/dataflow_plan.py | 4 +- metricflow/plan_conversion/dataflow_to_sql.py | 45 ++++++------ metricflow/plan_conversion/node_processor.py | 14 ++-- .../plan_conversion/sql_join_builder.py | 21 ++++-- .../dataflow/builder/test_node_data_set.py | 3 +- .../dataflow/builder/test_node_evaluator.py | 32 +++++++-- .../test_cases/itest_dimensions.yaml | 2 +- .../test_dataflow_to_sql_plan.py | 6 ++ .../DataflowPlan/test_cyclic_join__dfp_0.xml | 1 + .../test_common_semantic_model__dfp_0.xml | 2 + ..._distinct_values_plan_with_join__dfp_0.xml | 9 +-- .../DataflowPlan/test_joined_plan__dfp_0.xml | 1 + .../test_measure_constraint_plan__dfp_0.xml | 2 + ...mantic_model_ratio_metrics_plan__dfp_0.xml | 2 + .../test_multihop_join_plan__dfp_0.xml | 2 + ...mantic_model_ratio_metrics_plan__dfp_0.xml | 2 + .../test_where_constrained_plan__dfp_0.xml | 1 + ...ained_with_common_linkable_plan__dfp_0.xml | 1 + ...ompute_metrics_node_simple_expr__plan0.xml | 1 + ...2_metrics_from_1_semantic_model__dfp_0.xml | 2 + ..._metrics_from_1_semantic_model__dfpo_0.xml | 1 + 23 files changed, 166 insertions(+), 109 deletions(-) diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 2f0a3c802b..dbbef071ef 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -28,7 +28,6 @@ ConstrainTimeRangeNode, DataflowPlan, FilterElementsNode, - JoinDescription, JoinOverTimeRangeNode, JoinToBaseOutputNode, JoinToTimeSpineNode, @@ -77,53 +76,6 @@ class DataflowRecipe: required_local_linkable_specs: Tuple[LinkableInstanceSpec, ...] join_linkable_instances_recipes: Tuple[JoinLinkableInstancesRecipe, ...] - @property - def join_targets(self) -> List[JoinDescription]: - """Joins to be made to source node.""" - join_targets = [] - for join_recipe in self.join_linkable_instances_recipes: - # Figure out what elements to filter from the joined node. - - # Sanity check - all linkable specs should have a link, or else why would we be joining them. - assert all([len(x.entity_links) > 0 for x in join_recipe.satisfiable_linkable_specs]) - - # If we're joining something in, then we need the associated entity, partitions, and time dimension - # specs defining the validity window (if necessary) - include_specs: List[LinkableInstanceSpec] = [ - LinklessEntitySpec.from_reference(x.entity_links[0]) for x in join_recipe.satisfiable_linkable_specs - ] - include_specs.extend([x.node_to_join_dimension_spec for x in join_recipe.join_on_partition_dimensions]) - include_specs.extend( - [x.node_to_join_time_dimension_spec for x in join_recipe.join_on_partition_time_dimensions] - ) - if join_recipe.validity_window: - include_specs.extend( - [ - join_recipe.validity_window.window_start_dimension, - join_recipe.validity_window.window_end_dimension, - ] - ) - - # satisfiable_linkable_specs describes what can be satisfied after the join, so remove the entity - # link when filtering before the join. - # e.g. if the node is used to satisfy "user_id__country", then the node must have the entity - # "user_id" and the "country" dimension so that it can be joined to the measure node. - include_specs.extend([x.without_first_entity_link for x in join_recipe.satisfiable_linkable_specs]) - filtered_node_to_join = FilterElementsNode( - parent_node=join_recipe.node_to_join, - include_specs=InstanceSpecSet.create_from_linkable_specs(include_specs), - ) - join_targets.append( - JoinDescription( - join_node=filtered_node_to_join, - join_on_entity=join_recipe.join_on_entity, - join_on_partition_dimensions=join_recipe.join_on_partition_dimensions, - join_on_partition_time_dimensions=join_recipe.join_on_partition_time_dimensions, - validity_window=join_recipe.validity_window, - ) - ) - return join_targets - @dataclass(frozen=True) class MeasureSpecProperties: @@ -306,10 +258,11 @@ def build_plan_for_distinct_values(self, query_spec: MetricFlowQuerySpec) -> Dat raise UnableToSatisfyQueryError(f"Recipe not found for linkable specs: {query_spec.linkable_specs}") joined_node: Optional[JoinToBaseOutputNode] = None - if dataflow_recipe.join_targets: - joined_node = JoinToBaseOutputNode( - left_node=dataflow_recipe.source_node, join_targets=dataflow_recipe.join_targets - ) + if dataflow_recipe.join_linkable_instances_recipes: + join_targets = [ + join_recipe.join_description for join_recipe in dataflow_recipe.join_linkable_instances_recipes + ] + joined_node = JoinToBaseOutputNode(left_node=dataflow_recipe.source_node, join_targets=join_targets) where_constraint_node: Optional[WhereConstraintNode] = None if query_spec.where_constraint: @@ -485,6 +438,7 @@ def _find_dataflow_recipe( potential_source_nodes: Sequence[BaseOutput] = self._select_source_nodes_with_measures( measure_specs=set(measure_spec_properties.measure_specs), source_nodes=source_nodes ) + default_join_type = SqlJoinType.LEFT_OUTER else: # Only read nodes can be source nodes for queries without measures source_nodes = self._read_nodes @@ -492,6 +446,7 @@ def _find_dataflow_recipe( linkable_specs=linkable_spec_set, read_nodes=source_nodes ) potential_source_nodes = list(source_nodes_to_linkable_specs.keys()) + default_join_type = SqlJoinType.FULL_OUTER logger.info(f"There are {len(potential_source_nodes)} potential source nodes") @@ -518,7 +473,9 @@ def _find_dataflow_recipe( f"After removing unnecessary nodes, there are {len(nodes_available_for_joins)} nodes available for joins" ) if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs): - nodes_available_for_joins = node_processor.add_multi_hop_joins(linkable_specs, source_nodes) + nodes_available_for_joins = node_processor.add_multi_hop_joins( + desired_linkable_specs=linkable_specs, nodes=source_nodes, join_type=default_join_type + ) logger.info( f"After adding multi-hop nodes, there are {len(nodes_available_for_joins)} nodes available for joins:\n" f"{pformat_big_objects(nodes_available_for_joins)}" @@ -552,7 +509,9 @@ def _find_dataflow_recipe( logger.debug(f"Evaluating source node:\n{pformat_big_objects(source_node=dataflow_dag_as_text(node))}") start_time = time.time() - evaluation = node_evaluator.evaluate_node(start_node=node, required_linkable_specs=list(linkable_specs)) + evaluation = node_evaluator.evaluate_node( + start_node=node, required_linkable_specs=list(linkable_specs), default_join_type=default_join_type + ) logger.info(f"Evaluation of {node} took {time.time() - start_time:.2f}s") logger.debug( @@ -597,7 +556,7 @@ def _find_dataflow_recipe( # Nodes containing the linkable instances will be joined to the source node, so these # entities will need to be present in the source node. - required_local_entity_specs = tuple(x.join_on_entity for x in evaluation.join_recipes) + required_local_entity_specs = tuple(x.join_on_entity for x in evaluation.join_recipes if x.join_on_entity) # Same thing with partitions. required_local_dimension_specs = tuple( y.start_node_dimension_spec for x in evaluation.join_recipes for y in x.join_on_partition_dimensions @@ -780,7 +739,7 @@ def _build_aggregated_measure_from_measure_source_node( ), ) - join_targets = measure_recipe.join_targets + join_targets = [join_recipe.join_description for join_recipe in measure_recipe.join_linkable_instances_recipes] unaggregated_measure_node: BaseOutput if len(join_targets) > 0: filtered_measures_with_joined_elements = JoinToBaseOutputNode( diff --git a/metricflow/dataflow/builder/node_evaluator.py b/metricflow/dataflow/builder/node_evaluator.py index 5ab12bed9f..daac7d0d81 100644 --- a/metricflow/dataflow/builder/node_evaluator.py +++ b/metricflow/dataflow/builder/node_evaluator.py @@ -27,6 +27,7 @@ from metricflow.dataflow.builder.partitions import PartitionJoinResolver from metricflow.dataflow.dataflow_plan import ( BaseOutput, + FilterElementsNode, JoinDescription, PartitionDimensionJoinDescription, PartitionTimeDimensionJoinDescription, @@ -37,10 +38,8 @@ from metricflow.model.semantics.semantic_model_join_evaluator import SemanticModelJoinEvaluator from metricflow.plan_conversion.instance_converters import CreateValidityWindowJoinDescription from metricflow.protocols.semantics import SemanticModelAccessor -from metricflow.specs.specs import ( - LinkableInstanceSpec, - LinklessEntitySpec, -) +from metricflow.specs.specs import InstanceSpecSet, LinkableInstanceSpec, LinklessEntitySpec +from metricflow.sql.sql_plan import SqlJoinType logger = logging.getLogger(__name__) @@ -55,12 +54,14 @@ class JoinLinkableInstancesRecipe: """ node_to_join: BaseOutput - # The entity to join "node_to_join" on. - join_on_entity: LinklessEntitySpec + # The entity to join "node_to_join" on. Not needed for cross-joins. + join_on_entity: Optional[LinklessEntitySpec] # The linkable instances from the query that can be satisfied if we join this node. Note that this is different from # the linkable specs in the node that can help to satisfy the query. e.g. "user_id__country" might be one of the # "satisfiable_linkable_specs", but "country" is the linkable spec in the node. satisfiable_linkable_specs: List[LinkableInstanceSpec] + # Join type to use when joining node + join_type: SqlJoinType # The partitions to join on, if there are matching partitions between the start_node and node_to_join. join_on_partition_dimensions: Tuple[PartitionDimensionJoinDescription, ...] @@ -71,12 +72,37 @@ class JoinLinkableInstancesRecipe: @property def join_description(self) -> JoinDescription: """The recipe as a join description to use in the dataflow plan node.""" + # Figure out what elements to filter from the joined node. + include_specs: List[LinkableInstanceSpec] = [] + if not self.join_type == SqlJoinType.CROSS_JOIN: + assert all([len(spec.entity_links) > 0 for spec in self.satisfiable_linkable_specs]) + include_specs.extend( + [LinklessEntitySpec.from_reference(spec.entity_links[0]) for spec in self.satisfiable_linkable_specs] + ) + + include_specs.extend([join.node_to_join_dimension_spec for join in self.join_on_partition_dimensions]) + include_specs.extend([join.node_to_join_time_dimension_spec for join in self.join_on_partition_time_dimensions]) + if self.validity_window: + include_specs.extend( + [self.validity_window.window_start_dimension, self.validity_window.window_end_dimension] + ) + + # `satisfiable_linkable_specs` describes what can be satisfied after the join, so remove the entity + # link when filtering before the join. + # e.g. if the node is used to satisfy "user_id__country", then the node must have the entity + # "user_id" and the "country" dimension so that it can be joined to the source node. + include_specs.extend([spec.without_first_entity_link for spec in self.satisfiable_linkable_specs]) + filtered_node_to_join = FilterElementsNode( + parent_node=self.node_to_join, include_specs=InstanceSpecSet.create_from_linkable_specs(include_specs) + ) + return JoinDescription( - join_node=self.node_to_join, + join_node=filtered_node_to_join, join_on_entity=self.join_on_entity, join_on_partition_dimensions=self.join_on_partition_dimensions, join_on_partition_time_dimensions=self.join_on_partition_time_dimensions, validity_window=self.validity_window, + join_type=self.join_type, ) @@ -133,6 +159,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( self, start_node_instance_set: InstanceSet, needed_linkable_specs: List[LinkableInstanceSpec], + join_type: SqlJoinType, ) -> List[JoinLinkableInstancesRecipe]: """Get nodes that can be joined to get 1 or more of the "needed_linkable_specs". @@ -257,6 +284,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( join_on_partition_dimensions=join_on_partition_dimensions, join_on_partition_time_dimensions=join_on_partition_time_dimensions, validity_window=validity_window_join_description, + join_type=join_type, ) ) @@ -271,6 +299,7 @@ def _find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( def _update_candidates_that_can_satisfy_linkable_specs( candidates_for_join: List[JoinLinkableInstancesRecipe], already_satisfisfied_linkable_specs: List[LinkableInstanceSpec], + join_type: SqlJoinType, ) -> List[JoinLinkableInstancesRecipe]: """Update / filter candidates_for_join based on linkable instance specs that we have already satisfied. @@ -294,6 +323,7 @@ def _update_candidates_that_can_satisfy_linkable_specs( join_on_partition_dimensions=candidate_for_join.join_on_partition_dimensions, join_on_partition_time_dimensions=candidate_for_join.join_on_partition_time_dimensions, validity_window=candidate_for_join.validity_window, + join_type=join_type, ) ) return sorted( @@ -306,6 +336,7 @@ def evaluate_node( self, start_node: BaseOutput, required_linkable_specs: Sequence[LinkableInstanceSpec], + default_join_type: SqlJoinType, ) -> LinkableInstanceSatisfiabilityEvaluation: """Evaluates if the "required_linkable_specs" can be realized by joining the "start_node" with other nodes. @@ -345,7 +376,9 @@ def evaluate_node( possibly_joinable_linkable_specs.append(required_linkable_spec) candidates_for_join = self._find_joinable_candidate_nodes_that_can_satisfy_linkable_specs( - start_node_instance_set=candidate_instance_set, needed_linkable_specs=possibly_joinable_linkable_specs + start_node_instance_set=candidate_instance_set, + needed_linkable_specs=possibly_joinable_linkable_specs, + join_type=default_join_type, ) join_candidates: List[JoinLinkableInstancesRecipe] = [] @@ -378,6 +411,7 @@ def evaluate_node( candidates_for_join = self._update_candidates_that_can_satisfy_linkable_specs( candidates_for_join=candidates_for_join, already_satisfisfied_linkable_specs=next_candidate.satisfiable_linkable_specs, + join_type=default_join_type, ) # The once possibly joinable specs are definitely joinable and no longer need to be searched for. diff --git a/metricflow/dataflow/dataflow_plan.py b/metricflow/dataflow/dataflow_plan.py index f3b101d3ef..e75ff38d0f 100644 --- a/metricflow/dataflow/dataflow_plan.py +++ b/metricflow/dataflow/dataflow_plan.py @@ -249,7 +249,8 @@ class JoinDescription: """Describes how data from a node should be joined to data from another node.""" join_node: BaseOutput - join_on_entity: LinklessEntitySpec + join_on_entity: Optional[LinklessEntitySpec] + join_type: SqlJoinType join_on_partition_dimensions: Tuple[PartitionDimensionJoinDescription, ...] join_on_partition_time_dimensions: Tuple[PartitionTimeDimensionJoinDescription, ...] @@ -339,6 +340,7 @@ def with_new_parents(self, new_parent_nodes: Sequence[BaseOutput]) -> JoinToBase join_on_partition_dimensions=old_join_target.join_on_partition_dimensions, join_on_partition_time_dimensions=old_join_target.join_on_partition_time_dimensions, validity_window=old_join_target.validity_window, + join_type=old_join_target.join_type, ) for i, old_join_target in enumerate(self._join_targets) ], diff --git a/metricflow/plan_conversion/dataflow_to_sql.py b/metricflow/plan_conversion/dataflow_to_sql.py index 66607417fa..b74bb1ed0e 100644 --- a/metricflow/plan_conversion/dataflow_to_sql.py +++ b/metricflow/plan_conversion/dataflow_to_sql.py @@ -396,28 +396,29 @@ def visit_join_to_base_output_node(self, node: JoinToBaseOutputNode) -> SqlDataS ) ) - # Remove the linkable instances with the join_on_entity as the leading link as the next step adds the - # link. This is to avoid cases where there is a primary entity and a dimension in the data set, and we - # create an instance in the next step that has the same entity link. - # e.g. a data set has the dimension "listing__country_latest" and "listing" is a primary entity in the - # data set. The next step would create an instance like "listing__listing__country_latest" without this - # filter. - - # logger.error(f"before filter is:\n{pformat_big_objects(right_data_set.instance_set.spec_set)}") - right_data_set_instance_set_filtered = FilterLinkableInstancesWithLeadingLink( - entity_link=join_on_entity, - ).transform(right_data_set.instance_set) - # logger.error(f"after filter is:\n{pformat_big_objects(right_data_set_instance_set_filtered.spec_set)}") - - # After the right data set is joined to the "from" data set, we need to change the links for some of the - # instances that represent the right data set. For example, if the "from" data set contains the "bookings" - # measure instance and the right dataset contains the "country" dimension instance, then after the join, - # the output data set should have the "country" dimension instance with the "user_id" entity link - # (if "user_id" equality was the join condition). "country" -> "user_id__country" - right_data_set_instance_set_after_join = right_data_set_instance_set_filtered.transform( - AddLinkToLinkableElements(join_on_entity=join_on_entity) - ) - table_alias_to_instance_set[right_data_set_alias] = right_data_set_instance_set_after_join + if join_description.join_type == SqlJoinType.CROSS_JOIN: + table_alias_to_instance_set[right_data_set_alias] = right_data_set.instance_set + else: + # Remove the linkable instances with the join_on_entity as the leading link as the next step adds the + # link. This is to avoid cases where there is a primary entity and a dimension in the data set, and we + # create an instance in the next step that has the same entity link. + # e.g. a data set has the dimension "listing__country_latest" and "listing" is a primary entity in the + # data set. The next step would create an instance like "listing__listing__country_latest" without this + # filter. + assert join_on_entity + right_data_set_instance_set_filtered = FilterLinkableInstancesWithLeadingLink( + entity_link=join_on_entity + ).transform(right_data_set.instance_set) + + # After the right data set is joined to the "from" data set, we need to change the links for some of the + # instances that represent the right data set. For example, if the "from" data set contains the "bookings" + # measure instance and the right dataset contains the "country" dimension instance, then after the join, + # the output data set should have the "country" dimension instance with the "user_id" entity link + # (if "user_id" equality was the join condition). "country" -> "user_id__country" + right_data_set_instance_set_after_join = right_data_set_instance_set_filtered.transform( + AddLinkToLinkableElements(join_on_entity=join_on_entity) + ) + table_alias_to_instance_set[right_data_set_alias] = right_data_set_instance_set_after_join from_data_set_output_instance_set = from_data_set.instance_set.transform( FilterElements(include_specs=from_data_set.instance_set.spec_set) diff --git a/metricflow/plan_conversion/node_processor.py b/metricflow/plan_conversion/node_processor.py index 1b94e4ceac..12968b0d84 100644 --- a/metricflow/plan_conversion/node_processor.py +++ b/metricflow/plan_conversion/node_processor.py @@ -21,6 +21,7 @@ from metricflow.protocols.semantics import SemanticModelAccessor from metricflow.specs.spec_set_transforms import ToElementNameSet from metricflow.specs.specs import InstanceSpecSet, LinkableInstanceSpec, LinklessEntitySpec +from metricflow.sql.sql_plan import SqlJoinType logger = logging.getLogger(__name__) @@ -149,9 +150,7 @@ def _node_contains_entity( return False def _get_candidates_nodes_for_multi_hop( - self, - desired_linkable_spec: LinkableInstanceSpec, - nodes: Sequence[BaseOutput], + self, desired_linkable_spec: LinkableInstanceSpec, nodes: Sequence[BaseOutput], join_type: SqlJoinType ) -> Sequence[MultiHopJoinCandidate]: """Assemble nodes representing all possible one-hop joins.""" if len(desired_linkable_spec.entity_links) > MAX_JOIN_HOPS: @@ -249,6 +248,7 @@ def _get_candidates_nodes_for_multi_hop( ), join_on_partition_dimensions=join_on_partition_dimensions, join_on_partition_time_dimensions=join_on_partition_time_dimensions, + join_type=join_type, ) ], ), @@ -276,7 +276,10 @@ def _get_candidates_nodes_for_multi_hop( return multi_hop_join_candidates def add_multi_hop_joins( - self, desired_linkable_specs: Sequence[LinkableInstanceSpec], nodes: Sequence[BaseOutput] + self, + desired_linkable_specs: Sequence[LinkableInstanceSpec], + nodes: Sequence[BaseOutput], + join_type: SqlJoinType, ) -> Sequence[BaseOutput]: """Assemble nodes representing all possible one-hop joins.""" all_multi_hop_join_candidates: List[MultiHopJoinCandidate] = [] @@ -284,8 +287,7 @@ def add_multi_hop_joins( for desired_linkable_spec in desired_linkable_specs: for multi_hop_join_candidate in self._get_candidates_nodes_for_multi_hop( - desired_linkable_spec=desired_linkable_spec, - nodes=nodes, + desired_linkable_spec=desired_linkable_spec, nodes=nodes, join_type=join_type ): # Dedupe candidates that are the same join. if multi_hop_join_candidate.lineage not in lineage_for_all_multi_hop_join_candidates: diff --git a/metricflow/plan_conversion/sql_join_builder.py b/metricflow/plan_conversion/sql_join_builder.py index 2311e5f87e..a7d9f686a0 100644 --- a/metricflow/plan_conversion/sql_join_builder.py +++ b/metricflow/plan_conversion/sql_join_builder.py @@ -147,8 +147,21 @@ def make_base_output_join_description( In addition to the entity equality condition, this will ensure datasets are joined on all partition columns and account for validity windows, if those are defined in one of the datasets. """ - join_on_entity = join_description.join_on_entity + validity_conditions = SqlQueryPlanJoinBuilder._make_validity_window_on_conditions( + left_data_set=left_data_set, right_data_set=right_data_set, join_description=join_description + ) + if join_description.join_type == SqlJoinType.CROSS_JOIN: + return SqlQueryPlanJoinBuilder.make_column_equality_sql_join_description( + right_source_node=right_data_set.data_set.sql_select_node, + left_source_alias=left_data_set.alias, + right_source_alias=right_data_set.alias, + column_equality_descriptions=[], + join_type=join_description.join_type, + additional_on_conditions=validity_conditions, + ) + join_on_entity = join_description.join_on_entity + assert join_on_entity, "Join on entity required unless using cross join." # Figure out which columns in the "left" data set correspond to the entity that we want to join on. # The column associations tell us which columns correspond to which instances in the data set. left_data_set_entity_column_associations = left_data_set.data_set.column_associations_for_entity(join_on_entity) @@ -198,16 +211,12 @@ def make_base_output_join_description( ) ) - validity_conditions = SqlQueryPlanJoinBuilder._make_validity_window_on_conditions( - left_data_set=left_data_set, right_data_set=right_data_set, join_description=join_description - ) - return SqlQueryPlanJoinBuilder.make_column_equality_sql_join_description( right_source_node=right_data_set.data_set.sql_select_node, left_source_alias=left_data_set.alias, right_source_alias=right_data_set.alias, column_equality_descriptions=column_equality_descriptions, - join_type=SqlJoinType.LEFT_OUTER, + join_type=join_description.join_type, additional_on_conditions=validity_conditions, ) diff --git a/metricflow/test/dataflow/builder/test_node_data_set.py b/metricflow/test/dataflow/builder/test_node_data_set.py index f9f696211a..772abb41cf 100644 --- a/metricflow/test/dataflow/builder/test_node_data_set.py +++ b/metricflow/test/dataflow/builder/test_node_data_set.py @@ -23,7 +23,7 @@ MeasureSpec, ) from metricflow.sql.sql_exprs import SqlColumnReference, SqlColumnReferenceExpression -from metricflow.sql.sql_plan import SqlSelectColumn, SqlSelectStatementNode, SqlTableFromClauseNode +from metricflow.sql.sql_plan import SqlJoinType, SqlSelectColumn, SqlSelectStatementNode, SqlTableFromClauseNode from metricflow.test.fixtures.model_fixtures import ConsistentIdObjectRepository from metricflow.test.fixtures.setup_fixtures import MetricFlowTestSessionState from metricflow.test.snapshot_utils import assert_spec_set_snapshot_equal @@ -111,6 +111,7 @@ def test_joined_node_data_set( # noqa: D join_on_entity=LinklessEntitySpec.from_element_name("user"), join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ) ], ) diff --git a/metricflow/test/dataflow/builder/test_node_evaluator.py b/metricflow/test/dataflow/builder/test_node_evaluator.py index c88fa7319e..991843a76d 100644 --- a/metricflow/test/dataflow/builder/test_node_evaluator.py +++ b/metricflow/test/dataflow/builder/test_node_evaluator.py @@ -27,6 +27,7 @@ LinklessEntitySpec, TimeDimensionSpec, ) +from metricflow.sql.sql_plan import SqlJoinType from metricflow.test.fixtures.model_fixtures import ConsistentIdObjectRepository logger = logging.getLogger(__name__) @@ -77,7 +78,9 @@ def make_multihop_node_evaluator( ) nodes_available_for_joins = node_processor.add_multi_hop_joins( - desired_linkable_specs=desired_linkable_specs, nodes=nodes_available_for_joins + desired_linkable_specs=desired_linkable_specs, + nodes=nodes_available_for_joins, + join_type=SqlJoinType.LEFT_OUTER, ) return NodeEvaluatorForLinkableInstances( @@ -92,7 +95,9 @@ def test_node_evaluator_with_no_linkable_specs( # noqa: D node_evaluator: NodeEvaluatorForLinkableInstances, ) -> None: bookings_source_node = consistent_id_object_repository.simple_model_read_nodes["bookings_source"] - evaluation = node_evaluator.evaluate_node(required_linkable_specs=[], start_node=bookings_source_node) + evaluation = node_evaluator.evaluate_node( + required_linkable_specs=[], start_node=bookings_source_node, default_join_type=SqlJoinType.LEFT_OUTER + ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( local_linkable_specs=(), joinable_linkable_specs=(), join_recipes=(), unjoinable_linkable_specs=() ) @@ -111,6 +116,7 @@ def test_node_evaluator_with_unjoinable_specs( # noqa: D ) ], start_node=bookings_source_node, + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( local_linkable_specs=(), @@ -134,6 +140,7 @@ def test_node_evaluator_with_local_spec( # noqa: D evaluation = node_evaluator.evaluate_node( required_linkable_specs=[DimensionSpec(element_name="is_instant", entity_links=(EntityReference("booking"),))], start_node=bookings_source_node, + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( local_linkable_specs=(DimensionSpec(element_name="is_instant", entity_links=(EntityReference("booking"),)),), @@ -154,6 +161,7 @@ def test_node_evaluator_with_local_spec_using_primary_entity( # noqa: D DimensionSpec(element_name="home_state_latest", entity_links=(EntityReference(element_name="user"),)) ], start_node=bookings_source_node, + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == ( @@ -190,6 +198,7 @@ def test_node_evaluator_with_joined_spec( # noqa: D ), ], start_node=bookings_source_node, + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -220,6 +229,7 @@ def test_node_evaluator_with_joined_spec( # noqa: D ], join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -240,6 +250,7 @@ def test_node_evaluator_with_joined_spec_on_unique_id( # noqa: D ), ], start_node=listings_node, + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -262,6 +273,7 @@ def test_node_evaluator_with_joined_spec_on_unique_id( # noqa: D ], join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -286,6 +298,7 @@ def test_node_evaluator_with_multiple_joined_specs( # noqa: D ), ], start_node=views_source, + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -312,6 +325,7 @@ def test_node_evaluator_with_multiple_joined_specs( # noqa: D ], join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ), JoinLinkableInstancesRecipe( node_to_join=consistent_id_object_repository.simple_model_read_nodes["users_latest"], @@ -324,6 +338,7 @@ def test_node_evaluator_with_multiple_joined_specs( # noqa: D ], join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -354,8 +369,7 @@ def test_node_evaluator_with_multihop_joined_spec( # noqa: D ) evaluation = multihop_node_evaluator.evaluate_node( - required_linkable_specs=linkable_specs, - start_node=txn_source, + required_linkable_specs=linkable_specs, start_node=txn_source, default_join_type=SqlJoinType.LEFT_OUTER ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -395,6 +409,7 @@ def test_node_evaluator_with_multihop_joined_spec( # noqa: D ), ), ), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -414,6 +429,7 @@ def test_node_evaluator_with_partition_joined_spec( # noqa: D ), ], start_node=consistent_id_object_repository.simple_model_read_nodes["id_verifications"], + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -447,6 +463,7 @@ def test_node_evaluator_with_partition_joined_spec( # noqa: D ), ), ), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -480,6 +497,7 @@ def test_node_evaluator_with_scd_target( ) ], start_node=consistent_id_object_repository.scd_model_read_nodes["bookings_source"], + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -506,6 +524,7 @@ def test_node_evaluator_with_scd_target( window_start_dimension=TimeDimensionSpec(element_name="window_start", entity_links=()), window_end_dimension=TimeDimensionSpec(element_name="window_end", entity_links=()), ), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -531,6 +550,7 @@ def test_node_evaluator_with_multi_hop_scd_target( evaluation = node_evaluator.evaluate_node( required_linkable_specs=linkable_specs, start_node=consistent_id_object_repository.scd_model_read_nodes["bookings_source"], + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -567,6 +587,7 @@ def test_node_evaluator_with_multi_hop_scd_target( element_name="window_end", entity_links=(EntityReference(element_name="lux_listing"),) ), ), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -592,6 +613,7 @@ def test_node_evaluator_with_multi_hop_through_scd( evaluation = node_evaluator.evaluate_node( required_linkable_specs=linkable_specs, start_node=consistent_id_object_repository.scd_model_read_nodes["bookings_source"], + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( @@ -624,6 +646,7 @@ def test_node_evaluator_with_multi_hop_through_scd( window_start_dimension=TimeDimensionSpec(element_name="window_start", entity_links=()), window_end_dimension=TimeDimensionSpec(element_name="window_end", entity_links=()), ), + join_type=SqlJoinType.LEFT_OUTER, ), ), unjoinable_linkable_specs=(), @@ -648,6 +671,7 @@ def test_node_evaluator_with_invalid_multi_hop_scd( evaluation = node_evaluator.evaluate_node( required_linkable_specs=linkable_specs, start_node=consistent_id_object_repository.scd_model_read_nodes["bookings_source"], + default_join_type=SqlJoinType.LEFT_OUTER, ) assert evaluation == LinkableInstanceSatisfiabilityEvaluation( diff --git a/metricflow/test/integration/test_cases/itest_dimensions.yaml b/metricflow/test/integration/test_cases/itest_dimensions.yaml index ed050c2233..6a04ee73b2 100644 --- a/metricflow/test/integration/test_cases/itest_dimensions.yaml +++ b/metricflow/test/integration/test_cases/itest_dimensions.yaml @@ -139,7 +139,7 @@ integration_test: u.home_state_latest AS user__home_state_latest , l.is_lux AS listing__is_lux_latest FROM {{ source_schema }}.dim_listings_latest l - LEFT OUTER JOIN {{ source_schema }}.dim_users_latest u + FULL OUTER JOIN {{ source_schema }}.dim_users_latest u ON u.user_id = l.user_id GROUP BY u.home_state_latest diff --git a/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py b/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py index caebaf4b6d..4ad14cf3f5 100644 --- a/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py +++ b/metricflow/test/plan_conversion/test_dataflow_to_sql_plan.py @@ -290,6 +290,7 @@ def test_single_join_node( # noqa: D join_on_entity=entity_spec, join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ) ], ) @@ -343,12 +344,14 @@ def test_multi_join_node( join_on_entity=LinklessEntitySpec.from_element_name(element_name="listing"), join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ), JoinDescription( join_node=filtered_dimension_node, join_on_entity=LinklessEntitySpec.from_element_name(element_name="listing"), join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ), ], ) @@ -406,6 +409,7 @@ def test_compute_metrics_node( join_on_entity=entity_spec, join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ) ], ) @@ -467,6 +471,7 @@ def test_compute_metrics_node_simple_expr( join_on_entity=entity_spec, join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ) ], ) @@ -747,6 +752,7 @@ def test_compute_metrics_node_ratio_from_single_semantic_model( join_on_entity=entity_spec, join_on_partition_dimensions=(), join_on_partition_time_dimensions=(), + join_type=SqlJoinType.LEFT_OUTER, ) ], ) diff --git a/metricflow/test/snapshots/test_cyclic_join.py/DataflowPlan/test_cyclic_join__dfp_0.xml b/metricflow/test/snapshots/test_cyclic_join.py/DataflowPlan/test_cyclic_join__dfp_0.xml index 2382b41493..fe97bc8b8a 100644 --- a/metricflow/test/snapshots/test_cyclic_join.py/DataflowPlan/test_cyclic_join__dfp_0.xml +++ b/metricflow/test/snapshots/test_cyclic_join.py/DataflowPlan/test_cyclic_join__dfp_0.xml @@ -39,6 +39,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_common_semantic_model__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_common_semantic_model__dfp_0.xml index 995d2264c8..1f786abff0 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_common_semantic_model__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_common_semantic_model__dfp_0.xml @@ -48,6 +48,7 @@ + @@ -151,6 +152,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml index 44139d9477..86301bfc80 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_distinct_values_plan_with_join__dfp_0.xml @@ -17,7 +17,7 @@ - + @@ -44,12 +44,13 @@ - + - + + @@ -64,7 +65,7 @@ - + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_joined_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_joined_plan__dfp_0.xml index 07ef1d04f2..e30c82f587 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_joined_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_joined_plan__dfp_0.xml @@ -42,6 +42,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_measure_constraint_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_measure_constraint_plan__dfp_0.xml index c284514433..ff0aefadcc 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_measure_constraint_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_measure_constraint_plan__dfp_0.xml @@ -99,6 +99,7 @@ + @@ -244,6 +245,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multi_semantic_model_ratio_metrics_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multi_semantic_model_ratio_metrics_plan__dfp_0.xml index 2688100204..8bb478d6ca 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multi_semantic_model_ratio_metrics_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multi_semantic_model_ratio_metrics_plan__dfp_0.xml @@ -58,6 +58,7 @@ + @@ -161,6 +162,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multihop_join_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multihop_join_plan__dfp_0.xml index d5210d4e79..70d034e35b 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multihop_join_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_multihop_join_plan__dfp_0.xml @@ -39,6 +39,7 @@ + @@ -118,6 +119,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_single_semantic_model_ratio_metrics_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_single_semantic_model_ratio_metrics_plan__dfp_0.xml index ad2e199a63..f9d0c651ae 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_single_semantic_model_ratio_metrics_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_single_semantic_model_ratio_metrics_plan__dfp_0.xml @@ -58,6 +58,7 @@ + @@ -161,6 +162,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_plan__dfp_0.xml index 062689be39..e7d35170b0 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_plan__dfp_0.xml @@ -70,6 +70,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_with_common_linkable_plan__dfp_0.xml b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_with_common_linkable_plan__dfp_0.xml index 1ef34a8a04..58ceffeb67 100644 --- a/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_with_common_linkable_plan__dfp_0.xml +++ b/metricflow/test/snapshots/test_dataflow_plan_builder.py/DataflowPlan/test_where_constrained_with_common_linkable_plan__dfp_0.xml @@ -52,6 +52,7 @@ + diff --git a/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/DataflowPlan/test_compute_metrics_node_simple_expr__plan0.xml b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/DataflowPlan/test_compute_metrics_node_simple_expr__plan0.xml index d1662fcdda..a03c45a687 100644 --- a/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/DataflowPlan/test_compute_metrics_node_simple_expr__plan0.xml +++ b/metricflow/test/snapshots/test_dataflow_to_sql_plan.py/DataflowPlan/test_compute_metrics_node_simple_expr__plan0.xml @@ -24,6 +24,7 @@ + diff --git a/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfp_0.xml b/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfp_0.xml index 995d2264c8..1f786abff0 100644 --- a/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfp_0.xml +++ b/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfp_0.xml @@ -48,6 +48,7 @@ + @@ -151,6 +152,7 @@ + diff --git a/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfpo_0.xml b/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfpo_0.xml index 878492035e..427c764d62 100644 --- a/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfpo_0.xml +++ b/metricflow/test/snapshots/test_source_scan_optimizer.py/DataflowPlan/test_2_metrics_from_1_semantic_model__dfpo_0.xml @@ -56,6 +56,7 @@ +