Skip to content

Commit

Permalink
Support object syntax for order_by param
Browse files Browse the repository at this point in the history
  • Loading branch information
courtneyholcomb committed Sep 20, 2023
1 parent 3877253 commit 0afa3fb
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 44 deletions.
11 changes: 6 additions & 5 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from metricflow.query.query_parser import MetricFlowQueryParser
from metricflow.random_id import random_id
from metricflow.specs.column_assoc import ColumnAssociationResolver
from metricflow.specs.query_interface import QueryInterfaceMetric, QueryParameter
from metricflow.specs.query_interface import OrderByQueryParameter, QueryInterfaceMetric, QueryParameter
from metricflow.specs.specs import InstanceSpecSet, MetricFlowQuerySpec
from metricflow.sql.optimizer.optimization_levels import SqlQueryOptimizationLevel
from metricflow.telemetry.models import TelemetryLevel
Expand Down Expand Up @@ -106,7 +106,7 @@ class MetricFlowQueryRequest:
time_constraint_end: Optional[datetime.datetime] = None
where_constraint: Optional[str] = None
order_by_names: Optional[Sequence[str]] = None
order_by: Optional[Sequence[QueryParameter]] = None
order_by: Optional[Sequence[OrderByQueryParameter]] = None
output_table: Optional[str] = None
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4
query_type: MetricFlowQueryType = MetricFlowQueryType.METRIC
Expand All @@ -122,7 +122,7 @@ def create_with_random_request_id( # noqa: D
time_constraint_end: Optional[datetime.datetime] = None,
where_constraint: Optional[str] = None,
order_by_names: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[QueryParameter]] = None,
order_by: Optional[Sequence[OrderByQueryParameter]] = None,
output_table: Optional[str] = None,
sql_optimization_level: SqlQueryOptimizationLevel = SqlQueryOptimizationLevel.O4,
query_type: MetricFlowQueryType = MetricFlowQueryType.METRIC,
Expand Down Expand Up @@ -421,7 +421,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
time_constraint_start=mf_query_request.time_constraint_start,
time_constraint_end=mf_query_request.time_constraint_end,
where_constraint_str=mf_query_request.where_constraint,
order=mf_query_request.order_by_names,
order_by_names=mf_query_request.order_by_names,
order_by=mf_query_request.order_by,
)
logger.info(f"Query spec is:\n{pformat_big_objects(query_spec)}")
Expand Down Expand Up @@ -461,7 +461,8 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
time_constraint_start=mf_query_request.time_constraint_start,
time_constraint_end=mf_query_request.time_constraint_end,
where_constraint_str=mf_query_request.where_constraint,
order=mf_query_request.order_by_names,
order_by_names=mf_query_request.order_by_names,
order_by=mf_query_request.order_by,
)
logger.warning(f"Query spec updated to:\n{pformat_big_objects(query_spec)}")

Expand Down
86 changes: 47 additions & 39 deletions metricflow/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from metricflow.naming.linkable_spec_name import StructuredLinkableSpecName
from metricflow.query.query_exceptions import InvalidQueryException
from metricflow.specs.column_assoc import ColumnAssociationResolver
from metricflow.specs.query_interface import QueryInterfaceMetric, QueryParameter
from metricflow.specs.query_interface import OrderByQueryParameter, QueryInterfaceMetric, QueryParameter
from metricflow.specs.specs import (
DimensionSpec,
EntitySpec,
Expand All @@ -43,6 +43,7 @@
WhereFilterSpec,
)
from metricflow.specs.where_filter_transform import WhereSpecFactory
from metricflow.time.date_part import DatePart
from metricflow.time.time_granularity_solver import (
PartialTimeDimensionSpec,
RequestTimeGranularityException,
Expand Down Expand Up @@ -177,9 +178,8 @@ def parse_and_validate_query(
time_constraint_end: Optional[datetime.datetime] = None,
where_constraint: Optional[WhereFilter] = None,
where_constraint_str: Optional[str] = None,
order: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[QueryParameter]] = None,
time_granularity: Optional[TimeGranularity] = None,
order_by_names: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[OrderByQueryParameter]] = None,
) -> MetricFlowQuerySpec:
"""Parse the query into spec objects, validating them in the process.
Expand All @@ -197,9 +197,8 @@ def parse_and_validate_query(
time_constraint_end=time_constraint_end,
where_constraint=where_constraint,
where_constraint_str=where_constraint_str,
order=order,
order_by_names=order_by_names,
order_by=order_by,
time_granularity=time_granularity,
)
finally:
logger.info(f"Parsing the query took: {time.time() - start_time:.2f}s")
Expand Down Expand Up @@ -307,12 +306,6 @@ def _get_where_filter(
PydanticWhereFilter(where_sql_template=where_constraint_str) if where_constraint_str else where_constraint
)

def _get_order(self, order: Optional[Sequence[str]], order_by: Optional[Sequence[QueryParameter]]) -> Sequence[str]:
assert not (
order and order_by
), "Both order_by_names and order_by were set, but if an order by is specified you should only use one of these!"
return order if order else [f"{o.name}__{o.grain}" if o.grain else o.name for o in order_by] if order_by else []

def _parse_and_validate_query(
self,
metric_names: Optional[Sequence[str]] = None,
Expand All @@ -324,13 +317,11 @@ def _parse_and_validate_query(
time_constraint_end: Optional[datetime.datetime] = None,
where_constraint: Optional[WhereFilter] = None,
where_constraint_str: Optional[str] = None,
order: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[QueryParameter]] = None,
time_granularity: Optional[TimeGranularity] = None,
order_by_names: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[OrderByQueryParameter]] = None,
) -> MetricFlowQuerySpec:
metric_names = self._get_metric_names(metric_names, metrics)
where_filter = self._get_where_filter(where_constraint, where_constraint_str)
order = self._get_order(order, order_by)

# Get metric references used for validations
# In a case of derived metric, all the input metrics would be here.
Expand Down Expand Up @@ -416,7 +407,11 @@ def _parse_and_validate_query(
self._time_granularity_solver.validate_time_granularity(metric_references, time_dimension_specs)
self._validate_date_part(metric_references, time_dimension_specs)

order_by_specs = self._parse_order_by(order or [], partial_time_dimension_spec_replacements)
order_by_specs = self._parse_order_by(
order_by_names=order_by_names,
order_by=order_by,
time_dimension_spec_replacements=partial_time_dimension_spec_replacements,
)

# For each metric, verify that it's possible to retrieve all group by elements, including the ones as required
# by the filters.
Expand Down Expand Up @@ -813,40 +808,52 @@ def _get_invalid_linkable_specs(

def _parse_order_by(
self,
order_by_names: Sequence[str],
time_dimension_spec_replacements: Dict[PartialTimeDimensionSpec, TimeDimensionSpec],
order_by_names: Optional[Sequence[str]] = None,
order_by: Optional[Sequence[OrderByQueryParameter]] = None,
) -> Tuple[OrderBySpec, ...]:
"""time_dimension_spec_replacements is used to replace a partial spec from parsing the names to a full one."""
assert not (
order_by_names and order_by
), "Both order_by_names and order_by were set, but if an order by is specified you should only use one of these!"

# TODO: Validate entity links
# TODO: Validate order by items are in the query
order_by_specs: List[OrderBySpec] = []
for order_by_name in order_by_names:
descending = False
if order_by_name.startswith("-"):
order_by_name = order_by_name[1:]
descending = True
parsed_name = StructuredLinkableSpecName.from_name(order_by_name)
for order in order_by_names or order_by or []:
if isinstance(order, str):
descending = False
if order.startswith("-"):
order = order[1:]
descending = True
parsed_name = StructuredLinkableSpecName.from_name(order)
time_granularity = parsed_name.time_granularity
date_part: Optional[DatePart] = None
else:
descending = order.descending
parsed_name = StructuredLinkableSpecName.from_name(order.order_by.name)
time_granularity = order.order_by.grain
if parsed_name.time_granularity and parsed_name.time_granularity != time_granularity:
raise InvalidQueryException("Must use object syntax to request a time granularity.")
date_part = order.order_by.date_part

if MetricReference(element_name=parsed_name.element_name) in self._known_metric_names:
if parsed_name.time_granularity:
if time_granularity:
raise InvalidQueryException(
f"Order by item '{order_by_name}' references a metric but has a time granularity"
f"Order by item '{order}' references a metric but has a time granularity"
)
if parsed_name.entity_link_names:
raise InvalidQueryException(
f"Order by item '{order_by_name}' references a metric but has entity links"
)
raise InvalidQueryException(f"Order by item '{order}' references a metric but has entity links")
order_by_specs.append(
OrderBySpec(
metric_spec=MetricSpec(element_name=parsed_name.element_name),
descending=descending,
)
)
elif DimensionReference(element_name=parsed_name.element_name) in self._known_dimension_element_references:
if parsed_name.time_granularity:
if time_granularity:
raise InvalidQueryException(
f"Order by item '{order_by_name}' references a categorical dimension but has a time "
f"granularity"
f"Order by item '{order}' references a categorical dimension but has a time granularity"
)
order_by_specs.append(
OrderBySpec(
Expand All @@ -862,14 +869,14 @@ def _parse_order_by(
in self._known_time_dimension_element_references
):
entity_links = tuple(EntityReference(element_name=x) for x in parsed_name.entity_link_names)
if parsed_name.time_granularity:
if time_granularity:
order_by_specs.append(
OrderBySpec(
time_dimension_spec=TimeDimensionSpec(
element_name=parsed_name.element_name,
entity_links=entity_links,
time_granularity=parsed_name.time_granularity,
date_part=parsed_name.date_part,
time_granularity=time_granularity,
date_part=date_part,
),
descending=descending,
)
Expand All @@ -880,6 +887,7 @@ def _parse_order_by(
partial_time_dimension_spec = PartialTimeDimensionSpec(
element_name=parsed_name.element_name,
entity_links=entity_links,
date_part=date_part,
)

if partial_time_dimension_spec in time_dimension_spec_replacements:
Expand All @@ -891,14 +899,14 @@ def _parse_order_by(
)
else:
raise RequestTimeGranularityException(
f"Order by item '{order_by_name}' does not specify a time granularity and it does not "
f"Order by item '{order}' does not specify a time granularity and it does not "
f"match a requested time dimension"
)

elif EntityReference(element_name=parsed_name.element_name) in self._known_entity_element_references:
if parsed_name.time_granularity:
if time_granularity:
raise InvalidQueryException(
f"Order by item '{order_by_name}' references an entity but has a time granularity"
f"Order by item '{order}' references an entity but has a time granularity"
)
order_by_specs.append(
OrderBySpec(
Expand All @@ -910,6 +918,6 @@ def _parse_order_by(
)
)
else:
raise InvalidQueryException(f"Order by item '{order_by_name}' references an element that is not known")
raise InvalidQueryException(f"Order by item '{order}' references an element that is not known")

return tuple(order_by_specs)

0 comments on commit 0afa3fb

Please sign in to comment.