-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace uses of DataFrame
with MetricflowDataTable
#1235
Merged
+4,119
−4,027
Merged
Changes from 1 commit
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
91bb0fb
/* PR_START p--py312 04 */ Add assertion for intergration tests to ve…
plypaul b291444
Use `.text_format()` for data table to string conversion.
plypaul e464d74
Migrate various uses of dataframes to the data table.
plypaul eeb57c1
Update `SqlTableSnapshot` to use `MetricFlowDataTable`.
plypaul 152b6ce
Update snapshots for new test-table format.
plypaul 3ed8f0c
Rename strings similar to "dataframe".
plypaul 95e0e7e
Update snapshots for `WriteToResultDataframeNode` rename.
plypaul ae53659
Rename `write_to_dataframe.py` to `write_to_data_table.py`.
plypaul 47f4866
Rename `SelectSqlQueryToDataFrameTask` to `SelectSqlQueryToDataTableT…
plypaul 1808631
Update snapshots for rename to `SelectSqlQueryToDataTableTask`.
plypaul b6a313a
Rename `as_df` to `as_data_table`.
plypaul a3c2110
Update `SnowflakeInferenceContextProvider` to use correct types.
plypaul 0ece9ef
Update the CLI to use corresponding methods for the data table.
plypaul 19beaba
Address comments.
plypaul 027afbb
Update snapshots.
plypaul File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,13 @@ | ||
from __future__ import annotations | ||
|
||
import datetime | ||
import logging | ||
import time | ||
from typing import Optional | ||
|
||
import pandas as pd | ||
|
||
from dbt_metricflow.cli.dbt_connectors.adapter_backed_client import AdapterBackedSqlClient | ||
from metricflow.data_table.mf_column import ColumnDescription | ||
from metricflow.data_table.mf_table import MetricFlowDataTable | ||
from metricflow.protocols.sql_client import SqlEngine | ||
from metricflow.sql.sql_table import SqlTable | ||
|
||
|
@@ -19,7 +20,7 @@ class AdapterBackedDDLSqlClient(AdapterBackedSqlClient): | |
def create_table_from_dataframe( | ||
self, | ||
sql_table: SqlTable, | ||
df: pd.DataFrame, | ||
df: MetricFlowDataTable, | ||
chunk_size: Optional[int] = None, | ||
) -> None: | ||
"""Create a table in the data warehouse containing the contents of the dataframe. | ||
|
@@ -31,21 +32,16 @@ def create_table_from_dataframe( | |
df: The Pandas DataFrame object containing the column schema and data to load | ||
chunk_size: The number of rows to insert per transaction | ||
""" | ||
logger.info(f"Creating table '{sql_table.sql}' from a DataFrame with {df.shape[0]} row(s)") | ||
logger.info(f"Creating table '{sql_table.sql}' from a DataFrame with {df.row_count} row(s)") | ||
start_time = time.time() | ||
|
||
with self._adapter.connection_named("MetricFlow_create_from_dataframe"): | ||
# Create table | ||
# update dtypes to convert None to NA in boolean columns. | ||
# This mirrors the SQLAlchemy schema detection logic in pandas.io.sql | ||
df = df.convert_dtypes() | ||
columns = df.columns | ||
|
||
columns_to_insert = [] | ||
for i in range(len(df.columns)): | ||
for column_description in df.column_descriptions: | ||
# Format as "column_name column_type" | ||
columns_to_insert.append( | ||
f"{columns[i]} {self._get_type_from_pandas_dtype(str(df[columns[i]].dtype).lower())}" | ||
) | ||
columns_to_insert.append(f"{column_description.column_name} {self._get_sql_type(column_description)}") | ||
|
||
self._adapter.execute( | ||
f"CREATE TABLE IF NOT EXISTS {sql_table.sql} ({', '.join(columns_to_insert)})", | ||
auto_begin=True, | ||
|
@@ -55,18 +51,18 @@ def create_table_from_dataframe( | |
|
||
# Insert rows | ||
values = [] | ||
for row in df.itertuples(index=False, name=None): | ||
for row in df.rows: | ||
cells = [] | ||
for cell in row: | ||
if pd.isnull(cell): | ||
if cell is None: | ||
# use null keyword instead of isNA/None/etc. | ||
cells.append("null") | ||
elif type(cell) in [str, pd.Timestamp]: | ||
elif type(cell) in [str, datetime.datetime]: | ||
# Wrap cell in quotes & escape existing single quotes | ||
escaped_cell = self._quote_escape_value(str(cell)) | ||
# Trino requires timestamp literals to be wrapped in a timestamp() function. | ||
# There is probably a better way to handle this. | ||
if self.sql_engine_type is SqlEngine.TRINO and type(cell) is pd.Timestamp: | ||
if self.sql_engine_type is SqlEngine.TRINO and type(cell) is datetime.datetime: | ||
cells.append(f"timestamp '{escaped_cell}'") | ||
else: | ||
cells.append(f"'{escaped_cell}'") | ||
|
@@ -88,30 +84,31 @@ def create_table_from_dataframe( | |
# Commit all insert transaction at once | ||
self._adapter.commit_if_has_connection() | ||
|
||
logger.info(f"Created table '{sql_table.sql}' from a DataFrame in {time.time() - start_time:.2f}s") | ||
logger.info(f"Created SQL table '{sql_table.sql}' from an in-memory table in {time.time() - start_time:.2f}s") | ||
|
||
def _get_type_from_pandas_dtype(self, dtype: str) -> str: | ||
def _get_sql_type(self, column_description: ColumnDescription) -> str: | ||
"""Helper method to get the engine-specific type value. | ||
|
||
The dtype dict here is non-exhaustive but should be adequate for our needs. | ||
""" | ||
# TODO: add type handling for string/bool/bigint types for all engines | ||
if dtype == "string" or dtype == "object": | ||
column_type = column_description.column_type | ||
if column_type is str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh this is so much better than the magic string comparisons.... |
||
if self.sql_engine_type is SqlEngine.DATABRICKS or self.sql_engine_type is SqlEngine.BIGQUERY: | ||
return "string" | ||
if self.sql_engine_type is SqlEngine.TRINO: | ||
return "varchar" | ||
return "text" | ||
elif dtype == "boolean" or dtype == "bool": | ||
elif column_type is bool: | ||
return "boolean" | ||
elif dtype == "int64": | ||
elif column_type is int: | ||
return "bigint" | ||
elif dtype == "float64": | ||
elif column_type is float: | ||
return self._sql_query_plan_renderer.expr_renderer.double_data_type | ||
elif dtype == "datetime64[ns]": | ||
elif column_type is datetime.datetime: | ||
return self._sql_query_plan_renderer.expr_renderer.timestamp_data_type | ||
else: | ||
raise ValueError(f"Encountered unexpected Pandas dtype ({dtype})!") | ||
raise ValueError(f"Encountered unexpected {column_type=}!") | ||
|
||
def _quote_escape_value(self, value: str) -> str: | ||
"""Escape single quotes in string-like values for create_table_from_dataframe. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh nice. Do we know for sure the dimension will always come first, or should we get rid of the magic number and do something like
query_result.result_df.column_values_iterator(query_result.result_df.column_name_index(get_group_by_values))
?The pandas operation was skipping all of the metric columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there's a specified order when the SQL is rendered to have the dimension values first
metricflow/metricflow/plan_conversion/select_column_gen.py
Line 36 in bbd2901
It would be better to do a lookup, but mapping the name to the output column is not as straightforward as it should be since the output column name can be different from the input.