Skip to content

Commit

Permalink
fixed merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
DevonFulcher committed Sep 19, 2023
2 parents aafc418 + e285189 commit 27a8b19
Show file tree
Hide file tree
Showing 526 changed files with 92,692 additions and 12,146 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230911-190924.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable DATE PART aggregation for time dimensions
time: 2023-09-11T19:09:24.960342-07:00
custom:
Author: courtneyholcomb
Issue: "770"
110 changes: 110 additions & 0 deletions .github/workflows/cd-sql-engine-populate-persistent-source-schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# See [Persistent Source Schema](/GLOSSARY.md#persistent-source-schema)
# Populating the source schema via this workflow ensures that it's done with the same settings as the tests.

name: Reload Test Data in SQL Engines

# We don't want multiple workflows trying to create the same table.
concurrency:
group: POPULATE_PERSISTENT_SOURCE_SCHEMA
cancel-in-progress: true

on:
pull_request:
types: [labeled]
workflow_dispatch:

env:
# Unclear on how to make 'Reload Test Data in SQL Engines' a constant here as it does not work here.
PYTHON_VERSION: "3.8"

jobs:
snowflake-populate:
environment: DW_INTEGRATION_TESTS
if: >
github.event.action == 'workflow_dispatch'
|| (github.event.action == 'labeled' && github.event.label.name == 'Reload Test Data in SQL Engines')
name: Snowflake
runs-on: ubuntu-latest
steps:
- name: Check-out the repo
uses: actions/checkout@v3

- name: Populate w/Python ${{ env.PYTHON_VERSION }}
uses: ./.github/actions/run-mf-tests
with:
python-version: ${{ env.PYTHON_VERSION }}
mf_sql_engine_url: ${{ secrets.MF_SNOWFLAKE_URL }}
mf_sql_engine_password: ${{ secrets.MF_SNOWFLAKE_PWD }}
parallelism: 1
make-target: "populate-persistent-source-schema-snowflake"

redshift-populate:
environment: DW_INTEGRATION_TESTS
name: Redshift
if: >
github.event.action == 'workflow_dispatch'
|| (github.event.action == 'labeled' && github.event.label.name == 'Reload Test Data in SQL Engines')
runs-on: ubuntu-latest
steps:
- name: Check-out the repo
uses: actions/checkout@v3

- name: Populate w/Python ${{ env.PYTHON_VERSION }}
uses: ./.github/actions/run-mf-tests
with:
python-version: ${{ env.PYTHON_VERSION }}
mf_sql_engine_url: ${{ secrets.MF_REDSHIFT_URL }}
mf_sql_engine_password: ${{ secrets.MF_REDSHIFT_PWD }}
parallelism: 1
make-target: "populate-persistent-source-schema-redshift"

bigquery-populate:
environment: DW_INTEGRATION_TESTS
name: BigQuery
if: >
github.event.action == 'workflow_dispatch'
|| (github.event.action == 'labeled' && github.event.label.name == 'Reload Test Data in SQL Engines')
runs-on: ubuntu-latest
steps:
- name: Check-out the repo
uses: actions/checkout@v3

- name: Populate w/Python ${{ env.PYTHON_VERSION }}
uses: ./.github/actions/run-mf-tests
with:
python-version: ${{ env.PYTHON_VERSION }}
MF_SQL_ENGINE_URL: ${{ secrets.MF_BIGQUERY_URL }}
MF_SQL_ENGINE_PASSWORD: ${{ secrets.MF_BIGQUERY_PWD }}
parallelism: 1
make-target: "populate-persistent-source-schema-bigquery"

databricks-populate:
environment: DW_INTEGRATION_TESTS
name: Databricks SQL Warehouse
if: >
github.event.action == 'workflow_dispatch'
|| (github.event.action == 'labeled' && github.event.label.name == 'Reload Test Data in SQL Engines')
runs-on: ubuntu-latest
steps:
- name: Check-out the repo
uses: actions/checkout@v3

- name: Populate w/Python ${{ env.PYTHON_VERSION }}
uses: ./.github/actions/run-mf-tests
with:
python-version: ${{ env.PYTHON_VERSION }}
mf_sql_engine_url: ${{ secrets.MF_DATABRICKS_SQL_WAREHOUSE_URL }}
mf_sql_engine_password: ${{ secrets.MF_DATABRICKS_PWD }}
parallelism: 1
make-target: "populate-persistent-source-schema-databricks"

remove-label:
name: Remove Label After Populating Test Data
runs-on: ubuntu-latest
needs: [ snowflake-populate, redshift-populate, bigquery-populate, databricks-populate]
if: github.event.action == 'labeled' && github.event.label.name == 'Reload Test Data in SQL Engines'
steps:
- name: Remove Label
uses: actions-ecosystem/action-remove-labels@v1
with:
labels: 'Reload Test Data in SQL Engines'
13 changes: 13 additions & 0 deletions GLOSSARY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Glossary

## Persistent source schema
Many tests generate and execute SQL that depend on tables containing test data. By default, a
pytest fixture creates a temporary schema and populates it with the tables that are required by
the tests. This schema is referred to the source schema. Creating the source schema (and
the associated tables) can be a slow process for some SQL engines. Since these tables generally
do not change often, functionality was added to use a source schema that is assumed to already
exist when running tests and persists between runs (a persistent source schema). In addition,
functionality was added to create the persistent source schema based on table definitions in the
repo. Because the name of the source schema is generated based on the hash of the data that's
supposed to be in the schema, the creating and populating the persistent source schema should
not be done concurrently as there are race conditions when creating tables and inserting data.
25 changes: 23 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ PARALLELISM = "auto"
# Additional command line options to pass to pytest.
ADDITIONAL_PYTEST_OPTIONS = ""

# Pytest that can populate the persistent source schema
USE_PERSISTENT_SOURCE_SCHEMA = "--use-persistent-source-schema"
POPULATE_PERSISTENT_SOURCE_SCHEMA = "metricflow/test/source_schema_tools.py::populate_source_schema"

# Install Hatch package / project manager
.PHONY: install-hatch
install-hatch:
Expand All @@ -21,24 +25,41 @@ test:
test-postgresql:
hatch -v run postgres-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) metricflow/test/

# Engine-specific test environments. In most cases you should run these with
# `make -e ADDITIONAL_PYTEST_OPTIONS="--use-persistent-source-schema" test-<engine_type>`
# Engine-specific test environments.
.PHONY: test-bigquery
test-bigquery:
hatch -v run bigquery-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) metricflow/test/

.PHONY: populate-persistent-source-schema-bigquery
populate-persistent-source-schema-bigquery:
hatch -v run bigquery-env:pytest -vv $(ADDITIONAL_PYTEST_OPTIONS) $(USE_PERSISTENT_SOURCE_SCHEMA) $(POPULATE_PERSISTENT_SOURCE_SCHEMA)

.PHONY: test-databricks
test-databricks:
hatch -v run databricks-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) metricflow/test/

.PHONY: populate-persistent-source-schema-databricks
populate-persistent-source-schema-databricks:
hatch -v run databricks-env:pytest -vv $(ADDITIONAL_PYTEST_OPTIONS) $(USE_PERSISTENT_SOURCE_SCHEMA) $(POPULATE_PERSISTENT_SOURCE_SCHEMA)

.PHONY: test-redshift
test-redshift:
hatch -v run redshift-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) metricflow/test/

.PHONY: populate-persistent-source-schema-redshift
populate-persistent-source-schema-redshift:
hatch -v run redshift-env:pytest -vv $(ADDITIONAL_PYTEST_OPTIONS) $(USE_PERSISTENT_SOURCE_SCHEMA) $(POPULATE_PERSISTENT_SOURCE_SCHEMA)


.PHONY: test-snowflake
test-snowflake:
hatch -v run snowflake-env:pytest -vv -n $(PARALLELISM) $(ADDITIONAL_PYTEST_OPTIONS) metricflow/test/

.PHONY: populate-persistent-source-schema-snowflake
populate-persistent-source-schema-snowflake:
hatch -v run snowflake-env:pytest -vv $(ADDITIONAL_PYTEST_OPTIONS) $(USE_PERSISTENT_SOURCE_SCHEMA) $(POPULATE_PERSISTENT_SOURCE_SCHEMA)


.PHONY: lint
lint:
hatch -v run dev-env:pre-commit run --all-files
Expand Down
1 change: 1 addition & 0 deletions metricflow/dag/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
SQL_EXPR_IS_NULL_PREFIX = "isn"
SQL_EXPR_CAST_TO_TIMESTAMP_PREFIX = "ctt"
SQL_EXPR_DATE_TRUNC = "dt"
SQL_EXPR_EXTRACT = "ex"
SQL_EXPR_RATIO_COMPUTATION = "rc"
SQL_EXPR_BETWEEN_PREFIX = "betw"
SQL_EXPR_WINDOW_FUNCTION_ID_PREFIX = "wfnc"
Expand Down
48 changes: 32 additions & 16 deletions metricflow/dataset/convert_semantic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
SqlColumnReferenceExpression,
SqlDateTruncExpression,
SqlExpressionNode,
SqlExtractExpression,
SqlStringExpression,
)
from metricflow.sql.sql_plan import (
Expand All @@ -46,6 +47,7 @@
SqlSelectStatementNode,
SqlTableFromClauseNode,
)
from metricflow.time.date_part import DatePart

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,12 +104,14 @@ def _create_time_dimension_instance(
time_dimension: Dimension,
entity_links: Tuple[EntityReference, ...],
time_granularity: TimeGranularity = DEFAULT_TIME_GRANULARITY,
date_part: Optional[DatePart] = None,
) -> TimeDimensionInstance:
"""Create a time dimension instance from the dimension object from a semantic model in the model."""
time_dimension_spec = TimeDimensionSpec(
element_name=time_dimension.reference.element_name,
entity_links=entity_links,
time_granularity=time_granularity,
date_part=date_part,
)

return TimeDimensionInstance(
Expand Down Expand Up @@ -219,6 +223,11 @@ def _convert_dimensions(
select_columns = []

for dimension in dimensions or []:
dimension_select_expr = SemanticModelToDataSetConverter._make_element_sql_expr(
table_alias=table_alias,
element_name=dimension.reference.element_name,
element_expr=dimension.expr,
)
if dimension.type == DimensionType.CATEGORICAL:
dimension_instance = self._create_dimension_instance(
semantic_model_name=semantic_model_name,
Expand All @@ -228,11 +237,7 @@ def _convert_dimensions(
dimension_instances.append(dimension_instance)
select_columns.append(
SqlSelectColumn(
expr=SemanticModelToDataSetConverter._make_element_sql_expr(
table_alias=table_alias,
element_name=dimension.reference.element_name,
element_expr=dimension.expr,
),
expr=dimension_select_expr,
column_alias=dimension_instance.associated_column.column_name,
)
)
Expand All @@ -251,11 +256,7 @@ def _convert_dimensions(
time_dimension_instances.append(time_dimension_instance)
select_columns.append(
SqlSelectColumn(
expr=SemanticModelToDataSetConverter._make_element_sql_expr(
table_alias=table_alias,
element_name=dimension.reference.element_name,
element_expr=dimension.expr,
),
expr=dimension_select_expr,
column_alias=time_dimension_instance.associated_column.column_name,
)
)
Expand All @@ -274,16 +275,31 @@ def _convert_dimensions(
select_columns.append(
SqlSelectColumn(
expr=SqlDateTruncExpression(
time_granularity=time_granularity,
arg=SemanticModelToDataSetConverter._make_element_sql_expr(
table_alias=table_alias,
element_name=dimension.reference.element_name,
element_expr=dimension.expr,
),
time_granularity=time_granularity, arg=dimension_select_expr
),
column_alias=time_dimension_instance.associated_column.column_name,
)
)

# Add all date part options for easy query resolution
for date_part in DatePart:
if date_part.to_int() >= defined_time_granularity.to_int():
time_dimension_instance = self._create_time_dimension_instance(
semantic_model_name=semantic_model_name,
time_dimension=dimension,
entity_links=entity_links,
time_granularity=defined_time_granularity,
date_part=date_part,
)
time_dimension_instances.append(time_dimension_instance)

select_columns.append(
SqlSelectColumn(
expr=SqlExtractExpression(date_part=date_part, arg=dimension_select_expr),
column_alias=time_dimension_instance.associated_column.column_name,
)
)

else:
assert False, f"Unhandled dimension type: {dimension.type}"

Expand Down
8 changes: 6 additions & 2 deletions metricflow/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations

import logging
from typing import Sequence
from typing import Optional, Sequence

from dbt_semantic_interfaces.references import TimeDimensionReference
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity
from dbt_semantic_interfaces.validations.unique_valid_name import MetricFlowReservedKeywords

from metricflow.instances import InstanceSet, TimeDimensionInstance
from metricflow.specs.specs import TimeDimensionSpec
from metricflow.time.date_part import DatePart

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,12 +49,15 @@ def metric_time_dimension_name() -> str:
return DataSet.metric_time_dimension_reference().element_name

@staticmethod
def metric_time_dimension_spec(time_granularity: TimeGranularity) -> TimeDimensionSpec:
def metric_time_dimension_spec(
time_granularity: TimeGranularity, date_part: Optional[DatePart] = None
) -> TimeDimensionSpec:
"""Spec that corresponds to DataSet.metric_time_dimension_reference."""
return TimeDimensionSpec(
element_name=DataSet.metric_time_dimension_reference().element_name,
entity_links=(),
time_granularity=time_granularity,
date_part=date_part,
)

def __repr__(self) -> str: # noqa: D
Expand Down
1 change: 1 addition & 0 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ def _create_execution_plan(self, mf_query_request: MetricFlowQueryRequest) -> Me
query_spec = self._query_parser.parse_and_validate_query(
metric_names=mf_query_request.metric_names,
group_by_names=mf_query_request.group_by_names,
group_by=mf_query_request.group_by,
limit=mf_query_request.limit,
time_constraint_start=mf_query_request.time_constraint_start,
time_constraint_end=mf_query_request.time_constraint_end,
Expand Down
Loading

0 comments on commit 27a8b19

Please sign in to comment.