From f891d6c9f0b0f1cbf7cced7ad1d5aa6b91ec3f3d Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Tue, 10 Dec 2024 18:16:33 -0800 Subject: [PATCH 01/10] /* PR_START p--misc 08 */ Rename to `SqlStatement`. --- metricflow/engine/metricflow_engine.py | 8 ++++---- metricflow/execution/dataflow_to_execution.py | 6 +++--- metricflow/execution/execution_plan.py | 10 +++++----- tests_metricflow/execution/test_tasks.py | 6 +++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index d50d792b9a..2b7064bb0a 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -51,7 +51,7 @@ from metricflow.execution.dataflow_to_execution import ( DataflowToExecutionPlanConverter, ) -from metricflow.execution.execution_plan import ExecutionPlan, SqlQuery +from metricflow.execution.execution_plan import ExecutionPlan, SqlStatement from metricflow.execution.executor import SequentialPlanExecutor from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter from metricflow.protocols.sql_client import SqlClient @@ -177,7 +177,7 @@ class MetricFlowExplainResult: output_table: Optional[SqlTable] = None @property - def rendered_sql(self) -> SqlQuery: + def rendered_sql(self) -> SqlStatement: """Return the SQL query that would be run for the given query.""" execution_plan = self.execution_plan if len(execution_plan.tasks) != 1: @@ -194,10 +194,10 @@ def rendered_sql(self) -> SqlQuery: return sql_query @property - def rendered_sql_without_descriptions(self) -> SqlQuery: + def rendered_sql_without_descriptions(self) -> SqlStatement: """Return the SQL query without the inline descriptions.""" sql_query = self.rendered_sql - return SqlQuery( + return SqlStatement( sql_query="\n".join( filter( lambda line: not line.strip().startswith("--"), diff --git a/metricflow/execution/dataflow_to_execution.py b/metricflow/execution/dataflow_to_execution.py index b5369f7350..7ceb0d1a01 100644 --- a/metricflow/execution/dataflow_to_execution.py +++ b/metricflow/execution/dataflow_to_execution.py @@ -36,7 +36,7 @@ ExecutionPlan, SelectSqlQueryToDataTableTask, SelectSqlQueryToTableTask, - SqlQuery, + SqlStatement, ) from metricflow.plan_conversion.convert_to_sql_plan import ConvertToSqlPlanResult from metricflow.plan_conversion.dataflow_to_sql import DataflowToSqlQueryPlanConverter @@ -91,7 +91,7 @@ def visit_write_to_result_data_table_node(self, node: WriteToResultDataTableNode leaf_tasks=( SelectSqlQueryToDataTableTask.create( sql_client=self._sql_client, - sql_query=SqlQuery(render_sql_result.sql, render_sql_result.bind_parameter_set), + sql_query=SqlStatement(render_sql_result.sql, render_sql_result.bind_parameter_set), ), ) ) @@ -109,7 +109,7 @@ def visit_write_to_result_table_node(self, node: WriteToResultTableNode) -> Conv leaf_tasks=( SelectSqlQueryToTableTask.create( sql_client=self._sql_client, - sql_query=SqlQuery( + sql_query=SqlStatement( sql_query=render_sql_result.sql, bind_parameter_set=render_sql_result.bind_parameter_set, ), diff --git a/metricflow/execution/execution_plan.py b/metricflow/execution/execution_plan.py index cae16bad9f..4d8a8b6bad 100644 --- a/metricflow/execution/execution_plan.py +++ b/metricflow/execution/execution_plan.py @@ -30,7 +30,7 @@ class ExecutionPlanTask(DagNode["ExecutionPlanTask"], Visitable, ABC): sql_query: If this runs a SQL query, return the associated SQL. """ - sql_query: Optional[SqlQuery] + sql_query: Optional[SqlStatement] @abstractmethod def execute(self) -> TaskExecutionResult: @@ -44,8 +44,8 @@ def task_id(self) -> NodeId: @dataclass(frozen=True) -class SqlQuery: - """A SQL query that can be run along with bind parameters.""" +class SqlStatement: + """Encapsulates a SQL statement along with the bind parameters that should be used.""" # This field will be renamed as it is confusing given the class name. sql_query: str @@ -90,7 +90,7 @@ class SelectSqlQueryToDataTableTask(ExecutionPlanTask): @staticmethod def create( # noqa: D102 sql_client: SqlClient, - sql_query: SqlQuery, + sql_query: SqlStatement, parent_nodes: Sequence[ExecutionPlanTask] = (), ) -> SelectSqlQueryToDataTableTask: return SelectSqlQueryToDataTableTask( @@ -154,7 +154,7 @@ class SelectSqlQueryToTableTask(ExecutionPlanTask): @staticmethod def create( # noqa: D102 sql_client: SqlClient, - sql_query: SqlQuery, + sql_query: SqlStatement, output_table: SqlTable, parent_nodes: Sequence[ExecutionPlanTask] = (), ) -> SelectSqlQueryToTableTask: diff --git a/tests_metricflow/execution/test_tasks.py b/tests_metricflow/execution/test_tasks.py index b6250d4693..9f071ca586 100644 --- a/tests_metricflow/execution/test_tasks.py +++ b/tests_metricflow/execution/test_tasks.py @@ -11,7 +11,7 @@ ExecutionPlan, SelectSqlQueryToDataTableTask, SelectSqlQueryToTableTask, - SqlQuery, + SqlStatement, ) from metricflow.execution.executor import SequentialPlanExecutor from metricflow.protocols.sql_client import SqlClient, SqlEngine @@ -19,7 +19,7 @@ def test_read_sql_task(sql_client: SqlClient) -> None: # noqa: D103 - task = SelectSqlQueryToDataTableTask.create(sql_client, SqlQuery("SELECT 1 AS foo", SqlBindParameterSet())) + task = SelectSqlQueryToDataTableTask.create(sql_client, SqlStatement("SELECT 1 AS foo", SqlBindParameterSet())) execution_plan = ExecutionPlan(leaf_tasks=[task], dag_id=DagId.from_str("plan0")) results = SequentialPlanExecutor().execute_plan(execution_plan) @@ -44,7 +44,7 @@ def test_write_table_task( # noqa: D103 output_table = SqlTable(schema_name=mf_test_configuration.mf_system_schema, table_name=f"test_table_{random_id()}") task = SelectSqlQueryToTableTask.create( sql_client=sql_client, - sql_query=SqlQuery( + sql_query=SqlStatement( sql_query=f"CREATE TABLE {output_table.sql} AS SELECT 1 AS foo", bind_parameter_set=SqlBindParameterSet(), ), From d0cfe17346673dd41ca14a6676fc44749a9efa5b Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Tue, 10 Dec 2024 18:18:24 -0800 Subject: [PATCH 02/10] Rename `SqlStatement.sql_query` to `SqlStatement.sql`. --- dbt-metricflow/dbt_metricflow/cli/main.py | 4 ++-- metricflow/engine/metricflow_engine.py | 4 ++-- metricflow/execution/dataflow_to_execution.py | 2 +- metricflow/execution/execution_plan.py | 14 +++++++------- .../validation/data_warehouse_model_validator.py | 2 +- tests_metricflow/engine/test_explain.py | 4 ++-- tests_metricflow/execution/test_tasks.py | 2 +- .../integration/test_rendered_query.py | 6 +++--- 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dbt-metricflow/dbt_metricflow/cli/main.py b/dbt-metricflow/dbt_metricflow/cli/main.py index 2b8d3300c6..e5799fd07a 100644 --- a/dbt-metricflow/dbt_metricflow/cli/main.py +++ b/dbt-metricflow/dbt_metricflow/cli/main.py @@ -298,9 +298,9 @@ def query( if explain: assert explain_result sql = ( - explain_result.rendered_sql_without_descriptions.sql_query + explain_result.rendered_sql_without_descriptions.sql if not show_sql_descriptions - else explain_result.rendered_sql.sql_query + else explain_result.rendered_sql.sql ) if show_dataflow_plan: click.echo("🔎 Generated Dataflow Plan + SQL (remove --explain to see data):") diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index 2b7064bb0a..b0f4f729a6 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -198,10 +198,10 @@ def rendered_sql_without_descriptions(self) -> SqlStatement: """Return the SQL query without the inline descriptions.""" sql_query = self.rendered_sql return SqlStatement( - sql_query="\n".join( + sql="\n".join( filter( lambda line: not line.strip().startswith("--"), - sql_query.sql_query.split("\n"), + sql_query.sql.split("\n"), ) ), bind_parameter_set=sql_query.bind_parameter_set, diff --git a/metricflow/execution/dataflow_to_execution.py b/metricflow/execution/dataflow_to_execution.py index 7ceb0d1a01..5f45c0d56b 100644 --- a/metricflow/execution/dataflow_to_execution.py +++ b/metricflow/execution/dataflow_to_execution.py @@ -110,7 +110,7 @@ def visit_write_to_result_table_node(self, node: WriteToResultTableNode) -> Conv SelectSqlQueryToTableTask.create( sql_client=self._sql_client, sql_query=SqlStatement( - sql_query=render_sql_result.sql, + sql=render_sql_result.sql, bind_parameter_set=render_sql_result.bind_parameter_set, ), output_table=node.output_sql_table, diff --git a/metricflow/execution/execution_plan.py b/metricflow/execution/execution_plan.py index 4d8a8b6bad..63e59e69ee 100644 --- a/metricflow/execution/execution_plan.py +++ b/metricflow/execution/execution_plan.py @@ -48,7 +48,7 @@ class SqlStatement: """Encapsulates a SQL statement along with the bind parameters that should be used.""" # This field will be renamed as it is confusing given the class name. - sql_query: str + sql: str bind_parameter_set: SqlBindParameterSet @@ -111,7 +111,7 @@ def description(self) -> str: # noqa: D102 def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 sql_query = self.sql_query assert sql_query is not None, f"{self.sql_query=} should have been set during creation." - return tuple(super().displayed_properties) + (DisplayedProperty(key="sql_query", value=sql_query.sql_query),) + return tuple(super().displayed_properties) + (DisplayedProperty(key="sql_query", value=sql_query.sql),) def execute(self) -> TaskExecutionResult: # noqa: D102 start_time = time.time() @@ -119,7 +119,7 @@ def execute(self) -> TaskExecutionResult: # noqa: D102 assert sql_query is not None, f"{self.sql_query=} should have been set during creation." df = self.sql_client.query( - sql_query.sql_query, + sql_query.sql, sql_bind_parameter_set=sql_query.bind_parameter_set, ) @@ -127,7 +127,7 @@ def execute(self) -> TaskExecutionResult: # noqa: D102 return TaskExecutionResult( start_time=start_time, end_time=end_time, - sql=sql_query.sql_query, + sql=sql_query.sql, bind_params=sql_query.bind_parameter_set, df=df, ) @@ -178,7 +178,7 @@ def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 sql_query = self.sql_query assert sql_query is not None, f"{self.sql_query=} should have been set during creation." return tuple(super().displayed_properties) + ( - DisplayedProperty(key="sql_query", value=sql_query.sql_query), + DisplayedProperty(key="sql_query", value=sql_query.sql), DisplayedProperty(key="output_table", value=self.output_table), DisplayedProperty(key="bind_parameter_set", value=sql_query.bind_parameter_set), ) @@ -191,12 +191,12 @@ def execute(self) -> TaskExecutionResult: # noqa: D102 self.sql_client.execute(f"DROP TABLE IF EXISTS {self.output_table.sql}") logger.debug(LazyFormat(lambda: f"Creating table {self.output_table} using a query")) self.sql_client.execute( - sql_query.sql_query, + sql_query.sql, sql_bind_parameter_set=sql_query.bind_parameter_set, ) end_time = time.time() - return TaskExecutionResult(start_time=start_time, end_time=end_time, sql=sql_query.sql_query) + return TaskExecutionResult(start_time=start_time, end_time=end_time, sql=sql_query.sql) def __repr__(self) -> str: # noqa: D105 return f"{self.__class__.__name__}(sql_query='{self.sql_query}', output_table={self.output_table})" diff --git a/metricflow/validation/data_warehouse_model_validator.py b/metricflow/validation/data_warehouse_model_validator.py index 58b1055d38..b1c69d1d9b 100644 --- a/metricflow/validation/data_warehouse_model_validator.py +++ b/metricflow/validation/data_warehouse_model_validator.py @@ -453,7 +453,7 @@ def _gen_explain_query_task_query_and_params( ) -> Tuple[str, SqlBindParameterSet]: explain_result: MetricFlowExplainResult = mf_engine.explain(mf_request=mf_request) return ( - explain_result.rendered_sql_without_descriptions.sql_query, + explain_result.rendered_sql_without_descriptions.sql, explain_result.rendered_sql_without_descriptions.bind_parameter_set, ) diff --git a/tests_metricflow/engine/test_explain.py b/tests_metricflow/engine/test_explain.py index f23f419981..2b125caa59 100644 --- a/tests_metricflow/engine/test_explain.py +++ b/tests_metricflow/engine/test_explain.py @@ -21,7 +21,7 @@ def _explain_one_query(mf_engine: MetricFlowEngine) -> str: explain_result: MetricFlowExplainResult = mf_engine.explain( MetricFlowQueryRequest.create_with_random_request_id(saved_query_name="p0_booking") ) - return explain_result.rendered_sql.sql_query + return explain_result.rendered_sql.sql def test_concurrent_explain_consistency( @@ -64,7 +64,7 @@ def test_optimization_level( sql_optimization_level=optimization_level, ) ) - results[optimization_level.value] = explain_result.rendered_sql_without_descriptions.sql_query + results[optimization_level.value] = explain_result.rendered_sql_without_descriptions.sql assert_str_snapshot_equal( request=request, diff --git a/tests_metricflow/execution/test_tasks.py b/tests_metricflow/execution/test_tasks.py index 9f071ca586..3653bb93e2 100644 --- a/tests_metricflow/execution/test_tasks.py +++ b/tests_metricflow/execution/test_tasks.py @@ -45,7 +45,7 @@ def test_write_table_task( # noqa: D103 task = SelectSqlQueryToTableTask.create( sql_client=sql_client, sql_query=SqlStatement( - sql_query=f"CREATE TABLE {output_table.sql} AS SELECT 1 AS foo", + sql=f"CREATE TABLE {output_table.sql} AS SELECT 1 AS foo", bind_parameter_set=SqlBindParameterSet(), ), output_table=output_table, diff --git a/tests_metricflow/integration/test_rendered_query.py b/tests_metricflow/integration/test_rendered_query.py index ec5d1c6563..471a123b49 100644 --- a/tests_metricflow/integration/test_rendered_query.py +++ b/tests_metricflow/integration/test_rendered_query.py @@ -31,7 +31,7 @@ def test_render_query( # noqa: D103 request=request, mf_test_configuration=mf_test_configuration, snapshot_id="query0", - sql=result.rendered_sql.sql_query, + sql=result.rendered_sql.sql, sql_engine=it_helpers.sql_client.sql_engine_type, ) @@ -64,7 +64,7 @@ def test_id_enumeration( # noqa: D103 request=request, mf_test_configuration=mf_test_configuration, snapshot_id="query", - sql=result.rendered_sql.sql_query, + sql=result.rendered_sql.sql, sql_engine=sql_client.sql_engine_type, ) @@ -80,6 +80,6 @@ def test_id_enumeration( # noqa: D103 request=request, mf_test_configuration=mf_test_configuration, snapshot_id="query", - sql=result.rendered_sql.sql_query, + sql=result.rendered_sql.sql, sql_engine=sql_client.sql_engine_type, ) From 6469b846501fff0b4cf74a3ce2d822e2107bee68 Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Tue, 10 Dec 2024 18:21:47 -0800 Subject: [PATCH 03/10] Rename `rendered_sql` -> `sql_statement`. --- dbt-metricflow/dbt_metricflow/cli/main.py | 2 +- metricflow/engine/metricflow_engine.py | 4 ++-- tests_metricflow/engine/test_explain.py | 2 +- tests_metricflow/integration/test_rendered_query.py | 6 +++--- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dbt-metricflow/dbt_metricflow/cli/main.py b/dbt-metricflow/dbt_metricflow/cli/main.py index e5799fd07a..7def5c1464 100644 --- a/dbt-metricflow/dbt_metricflow/cli/main.py +++ b/dbt-metricflow/dbt_metricflow/cli/main.py @@ -300,7 +300,7 @@ def query( sql = ( explain_result.rendered_sql_without_descriptions.sql if not show_sql_descriptions - else explain_result.rendered_sql.sql + else explain_result.sql_statement.sql ) if show_dataflow_plan: click.echo("🔎 Generated Dataflow Plan + SQL (remove --explain to see data):") diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index b0f4f729a6..4d6319eca5 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -177,7 +177,7 @@ class MetricFlowExplainResult: output_table: Optional[SqlTable] = None @property - def rendered_sql(self) -> SqlStatement: + def sql_statement(self) -> SqlStatement: """Return the SQL query that would be run for the given query.""" execution_plan = self.execution_plan if len(execution_plan.tasks) != 1: @@ -196,7 +196,7 @@ def rendered_sql(self) -> SqlStatement: @property def rendered_sql_without_descriptions(self) -> SqlStatement: """Return the SQL query without the inline descriptions.""" - sql_query = self.rendered_sql + sql_query = self.sql_statement return SqlStatement( sql="\n".join( filter( diff --git a/tests_metricflow/engine/test_explain.py b/tests_metricflow/engine/test_explain.py index 2b125caa59..339c7bb2d2 100644 --- a/tests_metricflow/engine/test_explain.py +++ b/tests_metricflow/engine/test_explain.py @@ -21,7 +21,7 @@ def _explain_one_query(mf_engine: MetricFlowEngine) -> str: explain_result: MetricFlowExplainResult = mf_engine.explain( MetricFlowQueryRequest.create_with_random_request_id(saved_query_name="p0_booking") ) - return explain_result.rendered_sql.sql + return explain_result.sql_statement.sql def test_concurrent_explain_consistency( diff --git a/tests_metricflow/integration/test_rendered_query.py b/tests_metricflow/integration/test_rendered_query.py index 471a123b49..7bc004a0d5 100644 --- a/tests_metricflow/integration/test_rendered_query.py +++ b/tests_metricflow/integration/test_rendered_query.py @@ -31,7 +31,7 @@ def test_render_query( # noqa: D103 request=request, mf_test_configuration=mf_test_configuration, snapshot_id="query0", - sql=result.rendered_sql.sql, + sql=result.sql_statement.sql, sql_engine=it_helpers.sql_client.sql_engine_type, ) @@ -64,7 +64,7 @@ def test_id_enumeration( # noqa: D103 request=request, mf_test_configuration=mf_test_configuration, snapshot_id="query", - sql=result.rendered_sql.sql, + sql=result.sql_statement.sql, sql_engine=sql_client.sql_engine_type, ) @@ -80,6 +80,6 @@ def test_id_enumeration( # noqa: D103 request=request, mf_test_configuration=mf_test_configuration, snapshot_id="query", - sql=result.rendered_sql.sql, + sql=result.sql_statement.sql, sql_engine=sql_client.sql_engine_type, ) From 931b08719e9087b7aebfe5a11ac715118b31f75d Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Tue, 10 Dec 2024 18:24:40 -0800 Subject: [PATCH 04/10] Move and rename `rendered_sql_without_descriptions`. --- dbt-metricflow/dbt_metricflow/cli/main.py | 2 +- metricflow/execution/execution_plan.py | 13 +++++++++++++ .../validation/data_warehouse_model_validator.py | 4 ++-- tests_metricflow/engine/test_explain.py | 2 +- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/dbt-metricflow/dbt_metricflow/cli/main.py b/dbt-metricflow/dbt_metricflow/cli/main.py index 7def5c1464..7c248654bc 100644 --- a/dbt-metricflow/dbt_metricflow/cli/main.py +++ b/dbt-metricflow/dbt_metricflow/cli/main.py @@ -298,7 +298,7 @@ def query( if explain: assert explain_result sql = ( - explain_result.rendered_sql_without_descriptions.sql + explain_result.sql_statement.without_descriptions.sql if not show_sql_descriptions else explain_result.sql_statement.sql ) diff --git a/metricflow/execution/execution_plan.py b/metricflow/execution/execution_plan.py index 63e59e69ee..a29f956f2b 100644 --- a/metricflow/execution/execution_plan.py +++ b/metricflow/execution/execution_plan.py @@ -51,6 +51,19 @@ class SqlStatement: sql: str bind_parameter_set: SqlBindParameterSet + @property + def without_descriptions(self) -> SqlStatement: + """Return the SQL query without the inline descriptions.""" + return SqlStatement( + sql="\n".join( + filter( + lambda line: not line.strip().startswith("--"), + self.sql.split("\n"), + ) + ), + bind_parameter_set=self.bind_parameter_set, + ) + @dataclass(frozen=True) class TaskExecutionError(Exception): diff --git a/metricflow/validation/data_warehouse_model_validator.py b/metricflow/validation/data_warehouse_model_validator.py index b1c69d1d9b..95efadcb3e 100644 --- a/metricflow/validation/data_warehouse_model_validator.py +++ b/metricflow/validation/data_warehouse_model_validator.py @@ -453,8 +453,8 @@ def _gen_explain_query_task_query_and_params( ) -> Tuple[str, SqlBindParameterSet]: explain_result: MetricFlowExplainResult = mf_engine.explain(mf_request=mf_request) return ( - explain_result.rendered_sql_without_descriptions.sql, - explain_result.rendered_sql_without_descriptions.bind_parameter_set, + explain_result.sql_statement.without_descriptions.sql, + explain_result.sql_statement.without_descriptions.bind_parameter_set, ) @classmethod diff --git a/tests_metricflow/engine/test_explain.py b/tests_metricflow/engine/test_explain.py index 339c7bb2d2..92c5817332 100644 --- a/tests_metricflow/engine/test_explain.py +++ b/tests_metricflow/engine/test_explain.py @@ -64,7 +64,7 @@ def test_optimization_level( sql_optimization_level=optimization_level, ) ) - results[optimization_level.value] = explain_result.rendered_sql_without_descriptions.sql + results[optimization_level.value] = explain_result.sql_statement.without_descriptions.sql assert_str_snapshot_equal( request=request, From 27804870fa1e4417af671ff198399097ea723c88 Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Tue, 10 Dec 2024 18:47:21 -0800 Subject: [PATCH 05/10] Rename `ExecutionPlanTask.sql_query`. --- metricflow/engine/metricflow_engine.py | 2 +- metricflow/execution/dataflow_to_execution.py | 2 +- metricflow/execution/execution_plan.py | 42 +++++++++---------- tests_metricflow/execution/noop_task.py | 2 +- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index 4d6319eca5..f494a4b444 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -185,7 +185,7 @@ def sql_statement(self) -> SqlStatement: f"Multiple tasks in the execution plan not yet supported. Got tasks: {execution_plan.tasks}" ) - sql_query = execution_plan.tasks[0].sql_query + sql_query = execution_plan.tasks[0].sql_statement if not sql_query: raise NotImplementedError( f"Execution plan tasks without a SQL query not yet supported. Got tasks: {execution_plan.tasks}" diff --git a/metricflow/execution/dataflow_to_execution.py b/metricflow/execution/dataflow_to_execution.py index 5f45c0d56b..51710ba278 100644 --- a/metricflow/execution/dataflow_to_execution.py +++ b/metricflow/execution/dataflow_to_execution.py @@ -91,7 +91,7 @@ def visit_write_to_result_data_table_node(self, node: WriteToResultDataTableNode leaf_tasks=( SelectSqlQueryToDataTableTask.create( sql_client=self._sql_client, - sql_query=SqlStatement(render_sql_result.sql, render_sql_result.bind_parameter_set), + sql_statement=SqlStatement(render_sql_result.sql, render_sql_result.bind_parameter_set), ), ) ) diff --git a/metricflow/execution/execution_plan.py b/metricflow/execution/execution_plan.py index a29f956f2b..f35508a5e0 100644 --- a/metricflow/execution/execution_plan.py +++ b/metricflow/execution/execution_plan.py @@ -27,10 +27,10 @@ class ExecutionPlanTask(DagNode["ExecutionPlanTask"], Visitable, ABC): for these nodes as it seems more intuitive. Attributes: - sql_query: If this runs a SQL query, return the associated SQL. + sql_statement: If this runs a SQL query, return the associated SQL. """ - sql_query: Optional[SqlStatement] + sql_statement: Optional[SqlStatement] @abstractmethod def execute(self) -> TaskExecutionResult: @@ -93,7 +93,7 @@ class SelectSqlQueryToDataTableTask(ExecutionPlanTask): Attributes: sql_client: The SQL client used to run the query. - sql_query: The SQL query to run. + sql_statement: The SQL query to run. parent_nodes: The parent tasks for this execution plan task. """ @@ -103,12 +103,12 @@ class SelectSqlQueryToDataTableTask(ExecutionPlanTask): @staticmethod def create( # noqa: D102 sql_client: SqlClient, - sql_query: SqlStatement, + sql_statement: SqlStatement, parent_nodes: Sequence[ExecutionPlanTask] = (), ) -> SelectSqlQueryToDataTableTask: return SelectSqlQueryToDataTableTask( sql_client=sql_client, - sql_query=sql_query, + sql_statement=sql_statement, parent_nodes=tuple(parent_nodes), ) @@ -122,31 +122,31 @@ def description(self) -> str: # noqa: D102 @property def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - sql_query = self.sql_query - assert sql_query is not None, f"{self.sql_query=} should have been set during creation." + sql_query = self.sql_statement + assert sql_query is not None, f"{self.sql_statement=} should have been set during creation." return tuple(super().displayed_properties) + (DisplayedProperty(key="sql_query", value=sql_query.sql),) def execute(self) -> TaskExecutionResult: # noqa: D102 start_time = time.time() - sql_query = self.sql_query - assert sql_query is not None, f"{self.sql_query=} should have been set during creation." + sql_statement = self.sql_statement + assert sql_statement is not None, f"{self.sql_statement=} should have been set during creation." df = self.sql_client.query( - sql_query.sql, - sql_bind_parameter_set=sql_query.bind_parameter_set, + sql_statement.sql, + sql_bind_parameter_set=sql_statement.bind_parameter_set, ) end_time = time.time() return TaskExecutionResult( start_time=start_time, end_time=end_time, - sql=sql_query.sql, - bind_params=sql_query.bind_parameter_set, + sql=sql_statement.sql, + bind_params=sql_statement.bind_parameter_set, df=df, ) def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(sql_query='{self.sql_query}')" + return f"{self.__class__.__name__}(sql_statement={self.sql_statement!r})" @dataclass(frozen=True) @@ -157,7 +157,7 @@ class SelectSqlQueryToTableTask(ExecutionPlanTask): Attributes: sql_client: The SQL client used to run the query. - sql_query: The SQL query to run. + sql_statement: The SQL query to run. output_table: The table where the results will be written. """ @@ -173,7 +173,7 @@ def create( # noqa: D102 ) -> SelectSqlQueryToTableTask: return SelectSqlQueryToTableTask( sql_client=sql_client, - sql_query=sql_query, + sql_statement=sql_query, output_table=output_table, parent_nodes=tuple(parent_nodes), ) @@ -188,8 +188,8 @@ def description(self) -> str: # noqa: D102 @property def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - sql_query = self.sql_query - assert sql_query is not None, f"{self.sql_query=} should have been set during creation." + sql_query = self.sql_statement + assert sql_query is not None, f"{self.sql_statement=} should have been set during creation." return tuple(super().displayed_properties) + ( DisplayedProperty(key="sql_query", value=sql_query.sql), DisplayedProperty(key="output_table", value=self.output_table), @@ -197,8 +197,8 @@ def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 ) def execute(self) -> TaskExecutionResult: # noqa: D102 - sql_query = self.sql_query - assert sql_query is not None, f"{self.sql_query=} should have been set during creation." + sql_query = self.sql_statement + assert sql_query is not None, f"{self.sql_statement=} should have been set during creation." start_time = time.time() logger.debug(LazyFormat(lambda: f"Dropping table {self.output_table} in case it already exists")) self.sql_client.execute(f"DROP TABLE IF EXISTS {self.output_table.sql}") @@ -212,7 +212,7 @@ def execute(self) -> TaskExecutionResult: # noqa: D102 return TaskExecutionResult(start_time=start_time, end_time=end_time, sql=sql_query.sql) def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(sql_query='{self.sql_query}', output_table={self.output_table})" + return f"{self.__class__.__name__}(sql_query='{self.sql_statement}', output_table={self.output_table})" class ExecutionPlan(MetricFlowDag[ExecutionPlanTask]): diff --git a/tests_metricflow/execution/noop_task.py b/tests_metricflow/execution/noop_task.py index 8f65ec049a..ad06af4966 100644 --- a/tests_metricflow/execution/noop_task.py +++ b/tests_metricflow/execution/noop_task.py @@ -35,7 +35,7 @@ def create( # noqa: D102 ) -> NoOpExecutionPlanTask: return NoOpExecutionPlanTask( parent_nodes=tuple(parent_tasks), - sql_query=None, + sql_statement=None, should_error=should_error, ) From abff90cfbc9c8de2644e95aadad860392a525baf Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Thu, 12 Dec 2024 12:21:22 -0800 Subject: [PATCH 06/10] Update other execution plan tasks. --- metricflow/execution/dataflow_to_execution.py | 2 +- metricflow/execution/execution_plan.py | 24 +++++++++---------- tests_metricflow/execution/test_tasks.py | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/metricflow/execution/dataflow_to_execution.py b/metricflow/execution/dataflow_to_execution.py index 51710ba278..1c2aa2fd95 100644 --- a/metricflow/execution/dataflow_to_execution.py +++ b/metricflow/execution/dataflow_to_execution.py @@ -109,7 +109,7 @@ def visit_write_to_result_table_node(self, node: WriteToResultTableNode) -> Conv leaf_tasks=( SelectSqlQueryToTableTask.create( sql_client=self._sql_client, - sql_query=SqlStatement( + sql_statement=SqlStatement( sql=render_sql_result.sql, bind_parameter_set=render_sql_result.bind_parameter_set, ), diff --git a/metricflow/execution/execution_plan.py b/metricflow/execution/execution_plan.py index f35508a5e0..9da7d39a11 100644 --- a/metricflow/execution/execution_plan.py +++ b/metricflow/execution/execution_plan.py @@ -167,13 +167,13 @@ class SelectSqlQueryToTableTask(ExecutionPlanTask): @staticmethod def create( # noqa: D102 sql_client: SqlClient, - sql_query: SqlStatement, + sql_statement: SqlStatement, output_table: SqlTable, parent_nodes: Sequence[ExecutionPlanTask] = (), ) -> SelectSqlQueryToTableTask: return SelectSqlQueryToTableTask( sql_client=sql_client, - sql_statement=sql_query, + sql_statement=sql_statement, output_table=output_table, parent_nodes=tuple(parent_nodes), ) @@ -188,31 +188,31 @@ def description(self) -> str: # noqa: D102 @property def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - sql_query = self.sql_statement - assert sql_query is not None, f"{self.sql_statement=} should have been set during creation." + sql_statement = self.sql_statement + assert sql_statement is not None, f"{self.sql_statement=} should have been set during creation." return tuple(super().displayed_properties) + ( - DisplayedProperty(key="sql_query", value=sql_query.sql), + DisplayedProperty(key="sql_statement", value=sql_statement.sql), DisplayedProperty(key="output_table", value=self.output_table), - DisplayedProperty(key="bind_parameter_set", value=sql_query.bind_parameter_set), + DisplayedProperty(key="bind_parameter_set", value=sql_statement.bind_parameter_set), ) def execute(self) -> TaskExecutionResult: # noqa: D102 - sql_query = self.sql_statement - assert sql_query is not None, f"{self.sql_statement=} should have been set during creation." + sql_statement = self.sql_statement + assert sql_statement is not None, f"{self.sql_statement=} should have been set during creation." start_time = time.time() logger.debug(LazyFormat(lambda: f"Dropping table {self.output_table} in case it already exists")) self.sql_client.execute(f"DROP TABLE IF EXISTS {self.output_table.sql}") logger.debug(LazyFormat(lambda: f"Creating table {self.output_table} using a query")) self.sql_client.execute( - sql_query.sql, - sql_bind_parameter_set=sql_query.bind_parameter_set, + sql_statement.sql, + sql_bind_parameter_set=sql_statement.bind_parameter_set, ) end_time = time.time() - return TaskExecutionResult(start_time=start_time, end_time=end_time, sql=sql_query.sql) + return TaskExecutionResult(start_time=start_time, end_time=end_time, sql=sql_statement.sql) def __repr__(self) -> str: # noqa: D105 - return f"{self.__class__.__name__}(sql_query='{self.sql_statement}', output_table={self.output_table})" + return f"{self.__class__.__name__}(sql_statement={self.sql_statement!r}', output_table={self.output_table})" class ExecutionPlan(MetricFlowDag[ExecutionPlanTask]): diff --git a/tests_metricflow/execution/test_tasks.py b/tests_metricflow/execution/test_tasks.py index 3653bb93e2..a88bcc1478 100644 --- a/tests_metricflow/execution/test_tasks.py +++ b/tests_metricflow/execution/test_tasks.py @@ -44,7 +44,7 @@ def test_write_table_task( # noqa: D103 output_table = SqlTable(schema_name=mf_test_configuration.mf_system_schema, table_name=f"test_table_{random_id()}") task = SelectSqlQueryToTableTask.create( sql_client=sql_client, - sql_query=SqlStatement( + sql_statement=SqlStatement( sql=f"CREATE TABLE {output_table.sql} AS SELECT 1 AS foo", bind_parameter_set=SqlBindParameterSet(), ), From ff9eafb08488061f5946c72a6b4f221ffe5463ea Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Thu, 12 Dec 2024 12:09:13 -0800 Subject: [PATCH 07/10] Update local variable names and fix logging. --- metricflow/engine/metricflow_engine.py | 20 +++++++++++++++----- metricflow/execution/execution_plan.py | 5 ++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index f494a4b444..f0c5825546 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -182,16 +182,26 @@ def sql_statement(self) -> SqlStatement: execution_plan = self.execution_plan if len(execution_plan.tasks) != 1: raise NotImplementedError( - f"Multiple tasks in the execution plan not yet supported. Got tasks: {execution_plan.tasks}" + str( + LazyFormat( + "Multiple tasks in the execution plan not yet supported.", + tasks=[task.task_id for task in execution_plan.tasks], + ) + ) ) - sql_query = execution_plan.tasks[0].sql_statement - if not sql_query: + sql_statement = execution_plan.tasks[0].sql_statement + if not sql_statement: raise NotImplementedError( - f"Execution plan tasks without a SQL query not yet supported. Got tasks: {execution_plan.tasks}" + str( + LazyFormat( + "Execution plan tasks without a SQL statement are not yet supported.", + tasks=[task.task_id for task in execution_plan.tasks], + ) + ) ) - return sql_query + return sql_statement @property def rendered_sql_without_descriptions(self) -> SqlStatement: diff --git a/metricflow/execution/execution_plan.py b/metricflow/execution/execution_plan.py index 9da7d39a11..c3e044a6fb 100644 --- a/metricflow/execution/execution_plan.py +++ b/metricflow/execution/execution_plan.py @@ -122,9 +122,8 @@ def description(self) -> str: # noqa: D102 @property def displayed_properties(self) -> Sequence[DisplayedProperty]: # noqa: D102 - sql_query = self.sql_statement - assert sql_query is not None, f"{self.sql_statement=} should have been set during creation." - return tuple(super().displayed_properties) + (DisplayedProperty(key="sql_query", value=sql_query.sql),) + assert self.sql_statement is not None, f"{self.sql_statement=} should have been set during creation." + return tuple(super().displayed_properties) + (DisplayedProperty(key="sql", value=self.sql_statement.sql),) def execute(self) -> TaskExecutionResult: # noqa: D102 start_time = time.time() From 907b19fa96e8553f7e220e7b1f066cd87c5abc5a Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Thu, 12 Dec 2024 12:09:31 -0800 Subject: [PATCH 08/10] Remove unused code. --- metricflow/engine/metricflow_engine.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index f0c5825546..b4343c527d 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -203,20 +203,6 @@ def sql_statement(self) -> SqlStatement: return sql_statement - @property - def rendered_sql_without_descriptions(self) -> SqlStatement: - """Return the SQL query without the inline descriptions.""" - sql_query = self.sql_statement - return SqlStatement( - sql="\n".join( - filter( - lambda line: not line.strip().startswith("--"), - sql_query.sql.split("\n"), - ) - ), - bind_parameter_set=sql_query.bind_parameter_set, - ) - @property def execution_plan(self) -> ExecutionPlan: # noqa: D102 return self.convert_to_execution_plan_result.execution_plan From 1c2be7c9645bc960df688f145c9c98bcc06b3cec Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Thu, 12 Dec 2024 12:15:51 -0800 Subject: [PATCH 09/10] Mark execution plan tests for only DuckDB. --- .../plan_conversion/test_dataflow_to_execution.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests_metricflow/plan_conversion/test_dataflow_to_execution.py b/tests_metricflow/plan_conversion/test_dataflow_to_execution.py index fa67ffd4c9..08db7d27e0 100644 --- a/tests_metricflow/plan_conversion/test_dataflow_to_execution.py +++ b/tests_metricflow/plan_conversion/test_dataflow_to_execution.py @@ -36,6 +36,7 @@ def make_execution_plan_converter( # noqa: D103 @pytest.mark.sql_engine_snapshot +@pytest.mark.duckdb_only def test_joined_plan( # noqa: D103 request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, @@ -75,6 +76,7 @@ def test_joined_plan( # noqa: D103 @pytest.mark.sql_engine_snapshot +@pytest.mark.duckdb_only def test_small_combined_metrics_plan( # noqa: D103 request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, @@ -112,6 +114,7 @@ def test_small_combined_metrics_plan( # noqa: D103 @pytest.mark.sql_engine_snapshot +@pytest.mark.duckdb_only def test_combined_metrics_plan( # noqa: D103 request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, @@ -151,6 +154,7 @@ def test_combined_metrics_plan( # noqa: D103 @pytest.mark.sql_engine_snapshot +@pytest.mark.duckdb_only def test_multihop_joined_plan( request: FixtureRequest, mf_test_configuration: MetricFlowTestConfiguration, From 1c4e7dd4c34b6f05e407a4ca36ffa03a254723cc Mon Sep 17 00:00:00 2001 From: Paul Yang Date: Thu, 12 Dec 2024 12:16:30 -0800 Subject: [PATCH 10/10] Update snapshots. --- .../ExecutionPlan/DuckDB/test_combined_metrics_plan__ep_0.xml | 2 +- .../ExecutionPlan/DuckDB/test_joined_plan__ep_0.xml | 2 +- .../ExecutionPlan/DuckDB/test_multihop_joined_plan__ep_0.xml | 2 +- .../DuckDB/test_small_combined_metrics_plan__ep_0.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_combined_metrics_plan__ep_0.xml b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_combined_metrics_plan__ep_0.xml index 2d133f4c66..e6b630593b 100644 --- a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_combined_metrics_plan__ep_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_combined_metrics_plan__ep_0.xml @@ -5,7 +5,7 @@ test_filename: test_dataflow_to_execution.py - + diff --git a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_joined_plan__ep_0.xml b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_joined_plan__ep_0.xml index f13d263688..3744e5eccf 100644 --- a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_joined_plan__ep_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_joined_plan__ep_0.xml @@ -5,7 +5,7 @@ test_filename: test_dataflow_to_execution.py - + diff --git a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_multihop_joined_plan__ep_0.xml b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_multihop_joined_plan__ep_0.xml index 684ff89128..7988cd5e69 100644 --- a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_multihop_joined_plan__ep_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_multihop_joined_plan__ep_0.xml @@ -7,7 +7,7 @@ docstring: - + diff --git a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_small_combined_metrics_plan__ep_0.xml b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_small_combined_metrics_plan__ep_0.xml index 4da40aaf0e..d24a6bae71 100644 --- a/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_small_combined_metrics_plan__ep_0.xml +++ b/tests_metricflow/snapshots/test_dataflow_to_execution.py/ExecutionPlan/DuckDB/test_small_combined_metrics_plan__ep_0.xml @@ -5,7 +5,7 @@ test_filename: test_dataflow_to_execution.py - +