-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improved source node logic for custom granularities #1427
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,7 +102,7 @@ | |
|
||
|
||
@dataclass(frozen=True) | ||
class DataflowRecipe: | ||
class SourceNodeRecipe: | ||
"""Get a recipe for how to build a dataflow plan node that outputs measures and linkable instances as needed.""" | ||
|
||
source_node: DataflowPlanNode | ||
|
@@ -277,13 +277,13 @@ def _build_aggregated_conversion_node( | |
queried_linkable_specs=queried_linkable_specs, | ||
filter_specs=base_measure_spec.filter_specs, | ||
) | ||
base_measure_recipe = self._find_dataflow_recipe( | ||
base_measure_recipe = self._find_source_node_recipe( | ||
measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]), | ||
predicate_pushdown_state=time_range_only_pushdown_state, | ||
linkable_spec_set=base_required_linkable_specs, | ||
) | ||
logger.debug(LazyFormat(lambda: f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}")) | ||
conversion_measure_recipe = self._find_dataflow_recipe( | ||
conversion_measure_recipe = self._find_source_node_recipe( | ||
measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]), | ||
predicate_pushdown_state=disabled_pushdown_state, | ||
linkable_spec_set=LinkableSpecSet(), | ||
|
@@ -368,7 +368,7 @@ def _build_aggregated_conversion_node( | |
) | ||
|
||
# Aggregate the conversion events with the JoinConversionEventsNode as the source node | ||
recipe_with_join_conversion_source_node = DataflowRecipe( | ||
recipe_with_join_conversion_source_node = SourceNodeRecipe( | ||
source_node=join_conversion_node, | ||
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs, | ||
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes, | ||
|
@@ -800,7 +800,7 @@ def _build_plan_for_distinct_values( | |
predicate_pushdown_state = PredicatePushdownState( | ||
time_range_constraint=query_spec.time_range_constraint, where_filter_specs=query_level_filter_specs | ||
) | ||
dataflow_recipe = self._find_dataflow_recipe( | ||
dataflow_recipe = self._find_source_node_recipe( | ||
linkable_spec_set=required_linkable_specs, predicate_pushdown_state=predicate_pushdown_state | ||
) | ||
if not dataflow_recipe: | ||
|
@@ -907,18 +907,8 @@ def _select_source_nodes_with_linkable_specs( | |
# Use a dictionary to dedupe for consistent ordering. | ||
selected_nodes: Dict[DataflowPlanNode, None] = {} | ||
|
||
# Find the source node that will satisfy the base granularity. Custom granularities will be joined in later. | ||
linkable_specs_set_with_base_granularities: Set[LinkableInstanceSpec] = set() | ||
# TODO: Add support for no-metrics queries for custom grains without a join (i.e., select directly from time spine). | ||
for linkable_spec in linkable_specs.as_tuple: | ||
if isinstance(linkable_spec, TimeDimensionSpec) and linkable_spec.time_granularity.is_custom_granularity: | ||
linkable_spec_with_base_grain = linkable_spec.with_grain( | ||
ExpandedTimeGranularity.from_time_granularity(linkable_spec.time_granularity.base_granularity) | ||
) | ||
linkable_specs_set_with_base_granularities.add(linkable_spec_with_base_grain) | ||
else: | ||
linkable_specs_set_with_base_granularities.add(linkable_spec) | ||
|
||
linkable_specs_set_with_base_granularities = set(linkable_specs.as_tuple) | ||
for source_node in source_nodes: | ||
output_spec_set = self._node_data_set_resolver.get_output_data_set(source_node).instance_set.spec_set | ||
all_linkable_specs_in_node = set(output_spec_set.linkable_specs) | ||
|
@@ -984,16 +974,21 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) - | |
non_additive_dimension_spec=non_additive_dimension_spec, | ||
) | ||
|
||
def _find_dataflow_recipe( | ||
def _find_source_node_recipe( | ||
self, | ||
linkable_spec_set: LinkableSpecSet, | ||
predicate_pushdown_state: PredicatePushdownState, | ||
measure_spec_properties: Optional[MeasureSpecProperties] = None, | ||
) -> Optional[DataflowRecipe]: | ||
linkable_specs = linkable_spec_set.as_tuple | ||
) -> Optional[SourceNodeRecipe]: | ||
"""Find the most suitable source nodes to satisfy the requested specs, as well as how to join them.""" | ||
candidate_nodes_for_left_side_of_join: List[DataflowPlanNode] = [] | ||
candidate_nodes_for_right_side_of_join: List[DataflowPlanNode] = [] | ||
|
||
# Replace any custom granularities with their base granularities. The custom granularity will be joined in | ||
# later, since custom granularities cannot be satisfied by source nodes. But we will need the dimension at | ||
# base granularity from the source node in order to join to the appropriate time spine later. | ||
Comment on lines
+987
to
+989
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
linkable_specs_to_satisfy = linkable_spec_set.replace_custom_granularity_with_base_granularity() | ||
linkable_specs_to_satisfy_tuple = linkable_specs_to_satisfy.as_tuple | ||
if measure_spec_properties: | ||
candidate_nodes_for_right_side_of_join += self._source_node_set.source_nodes_for_metric_queries | ||
candidate_nodes_for_left_side_of_join += self._select_source_nodes_with_measures( | ||
|
@@ -1005,15 +1000,15 @@ def _find_dataflow_recipe( | |
candidate_nodes_for_right_side_of_join += list(self._source_node_set.source_nodes_for_group_by_item_queries) | ||
candidate_nodes_for_left_side_of_join += list( | ||
self._select_source_nodes_with_linkable_specs( | ||
linkable_specs=linkable_spec_set, | ||
linkable_specs=linkable_specs_to_satisfy, | ||
source_nodes=self._source_node_set.source_nodes_for_group_by_item_queries, | ||
) | ||
) | ||
# If metric_time is requested without metrics, choose appropriate time spine node to select those values from. | ||
if linkable_spec_set.metric_time_specs: | ||
if linkable_specs_to_satisfy.metric_time_specs: | ||
time_spine_node = self._source_node_set.time_spine_nodes[ | ||
TimeSpineSource.choose_time_spine_source( | ||
required_time_spine_specs=linkable_spec_set.metric_time_specs, | ||
required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs, | ||
time_spine_sources=self._source_node_builder.time_spine_sources, | ||
).base_granularity | ||
] | ||
|
@@ -1052,7 +1047,7 @@ def _find_dataflow_recipe( | |
) | ||
|
||
candidate_nodes_for_right_side_of_join = node_processor.remove_unnecessary_nodes( | ||
desired_linkable_specs=linkable_specs, | ||
desired_linkable_specs=linkable_specs_to_satisfy_tuple, | ||
nodes=candidate_nodes_for_right_side_of_join, | ||
metric_time_dimension_reference=self._metric_time_dimension_reference, | ||
time_spine_nodes=self._source_node_set.time_spine_nodes_tuple, | ||
|
@@ -1064,10 +1059,10 @@ def _find_dataflow_recipe( | |
) | ||
) | ||
# TODO: test multi-hop with custom grains | ||
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs): | ||
if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs_to_satisfy_tuple): | ||
candidate_nodes_for_right_side_of_join = list( | ||
node_processor.add_multi_hop_joins( | ||
desired_linkable_specs=linkable_specs, | ||
desired_linkable_specs=linkable_specs_to_satisfy_tuple, | ||
nodes=candidate_nodes_for_right_side_of_join, | ||
join_type=default_join_type, | ||
) | ||
|
@@ -1084,14 +1079,16 @@ def _find_dataflow_recipe( | |
# We do this at query time instead of during usual source node generation because the number of potential | ||
# MetricGroupBy source nodes could be extremely large (and potentially slow). | ||
logger.debug( | ||
LazyFormat(lambda: f"Building source nodes for group by metrics: {linkable_spec_set.group_by_metric_specs}") | ||
LazyFormat( | ||
lambda: f"Building source nodes for group by metrics: {linkable_specs_to_satisfy.group_by_metric_specs}" | ||
) | ||
) | ||
candidate_nodes_for_right_side_of_join += [ | ||
self._build_query_output_node( | ||
query_spec=self._source_node_builder.build_source_node_inputs_for_group_by_metric(group_by_metric_spec), | ||
for_group_by_source_node=True, | ||
) | ||
for group_by_metric_spec in linkable_spec_set.group_by_metric_specs | ||
for group_by_metric_spec in linkable_specs_to_satisfy.group_by_metric_specs | ||
] | ||
|
||
logger.debug(LazyFormat(lambda: f"Processing nodes took: {time.time()-start_time:.2f}s")) | ||
|
@@ -1134,7 +1131,7 @@ def _find_dataflow_recipe( | |
start_time = time.time() | ||
evaluation = node_evaluator.evaluate_node( | ||
left_node=node, | ||
required_linkable_specs=list(linkable_specs), | ||
required_linkable_specs=list(linkable_specs_to_satisfy_tuple), | ||
default_join_type=default_join_type, | ||
) | ||
logger.debug(LazyFormat(lambda: f"Evaluation of {node} took {time.time() - start_time:.2f}s")) | ||
|
@@ -1203,7 +1200,7 @@ def _find_dataflow_recipe( | |
for x in evaluation.join_recipes | ||
for y in x.join_on_partition_time_dimensions | ||
) | ||
return DataflowRecipe( | ||
return SourceNodeRecipe( | ||
source_node=node_with_lowest_cost_plan, | ||
required_local_linkable_specs=( | ||
evaluation.local_linkable_specs | ||
|
@@ -1392,7 +1389,7 @@ def build_aggregated_measure( | |
metric_input_measure_spec: MetricInputMeasureSpec, | ||
queried_linkable_specs: LinkableSpecSet, | ||
predicate_pushdown_state: PredicatePushdownState, | ||
measure_recipe: Optional[DataflowRecipe] = None, | ||
measure_recipe: Optional[SourceNodeRecipe] = None, | ||
) -> DataflowPlanNode: | ||
"""Returns a node where the measures are aggregated by the linkable specs and constrained appropriately. | ||
|
||
|
@@ -1457,7 +1454,7 @@ def _build_aggregated_measure_from_measure_source_node( | |
metric_input_measure_spec: MetricInputMeasureSpec, | ||
queried_linkable_specs: LinkableSpecSet, | ||
predicate_pushdown_state: PredicatePushdownState, | ||
measure_recipe: Optional[DataflowRecipe] = None, | ||
measure_recipe: Optional[SourceNodeRecipe] = None, | ||
) -> DataflowPlanNode: | ||
measure_spec = metric_input_measure_spec.measure_spec | ||
cumulative = metric_input_measure_spec.cumulative_description is not None | ||
|
@@ -1531,7 +1528,7 @@ def _build_aggregated_measure_from_measure_source_node( | |
) | ||
|
||
find_recipe_start_time = time.time() | ||
measure_recipe = self._find_dataflow_recipe( | ||
measure_recipe = self._find_source_node_recipe( | ||
measure_spec_properties=measure_properties, | ||
predicate_pushdown_state=measure_pushdown_state, | ||
linkable_spec_set=required_linkable_specs, | ||
|
@@ -1613,10 +1610,7 @@ def _build_aggregated_measure_from_measure_source_node( | |
|
||
specs_to_keep_after_join = InstanceSpecSet(measure_specs=(measure_spec,)).merge( | ||
InstanceSpecSet.create_from_specs( | ||
[ | ||
spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec | ||
for spec in required_linkable_specs.as_tuple | ||
] | ||
required_linkable_specs.replace_custom_granularity_with_base_granularity().as_tuple | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, this class is fairly complex, so I'm going to give up and just ask - I thought that by moving one of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, that was the plan! I hadn't realized this use case was outside of that function. |
||
), | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.