diff --git a/integration_tests/integration_tests/dbt_project/models/metrics/metrics_incremental.sql b/integration_tests/integration_tests/dbt_project/models/metrics/metrics_incremental.sql new file mode 100644 index 000000000..2b2bc99bd --- /dev/null +++ b/integration_tests/integration_tests/dbt_project/models/metrics/metrics_incremental.sql @@ -0,0 +1,3 @@ +{{ config(materialized="incremental") }} + +select * from {{ source("test_data", "metrics_seed2") }} \ No newline at end of file diff --git a/integration_tests/integration_tests/dbt_project/models/metrics/metrics_table.sql b/integration_tests/integration_tests/dbt_project/models/metrics/metrics_table.sql new file mode 100644 index 000000000..f1a94176c --- /dev/null +++ b/integration_tests/integration_tests/dbt_project/models/metrics/metrics_table.sql @@ -0,0 +1,3 @@ +{{ config(materialized="table") }} + +select * from {{ source("test_data", "metrics_seed1") }} diff --git a/integration_tests/integration_tests/dbt_project/models/metrics/metrics_view.sql b/integration_tests/integration_tests/dbt_project/models/metrics/metrics_view.sql new file mode 100644 index 000000000..737b83ad2 --- /dev/null +++ b/integration_tests/integration_tests/dbt_project/models/metrics/metrics_view.sql @@ -0,0 +1,5 @@ +{{ config(materialized="view") }} + +select * from {{ source("test_data", "metrics_seed1") }} +union all +select * from {{ source("test_data", "metrics_seed2") }} diff --git a/integration_tests/integration_tests/dbt_project/models/test_data.yaml b/integration_tests/integration_tests/dbt_project/models/test_data.yaml index 5a7d83f79..26c84b8f8 100644 --- a/integration_tests/integration_tests/dbt_project/models/test_data.yaml +++ b/integration_tests/integration_tests/dbt_project/models/test_data.yaml @@ -4,4 +4,5 @@ sources: - name: test_data schema: "{{ target.schema }}" tables: - - name: dummy + - name: metrics_seed1 + - name: metrics_seed2 diff --git a/integration_tests/integration_tests/tests/dbt_project.py b/integration_tests/integration_tests/tests/dbt_project.py index 9ffe40ed3..7d7ebb620 100644 --- a/integration_tests/integration_tests/tests/dbt_project.py +++ b/integration_tests/integration_tests/tests/dbt_project.py @@ -18,6 +18,7 @@ "disable_dbt_invocation_autoupload": True, "disable_dbt_artifacts_autoupload": True, "disable_run_results": True, + "collect_metrics": False, } logger = get_logger(__name__) diff --git a/integration_tests/integration_tests/tests/test_metrics.py b/integration_tests/integration_tests/tests/test_metrics.py new file mode 100644 index 000000000..ea78cd482 --- /dev/null +++ b/integration_tests/integration_tests/tests/test_metrics.py @@ -0,0 +1,39 @@ +import random +from datetime import datetime + +from data_generator import DATE_FORMAT, generate_dates +from dbt_project import DbtProject + + +def test_metrics(dbt_project: DbtProject): + now = datetime.utcnow() + dbt_project.dbt_runner.vars["collect_metrics"] = True + data1 = [ + {"updated_at": date.strftime(DATE_FORMAT)} + for date in generate_dates(base_date=now) + for _ in range(random.randint(-5, 20)) + ] + data2 = [ + {"created_at": date.strftime(DATE_FORMAT)} + for date in generate_dates(base_date=now) + for _ in range(random.randint(0, 20)) + ] + dbt_project.seed(data1, "metrics_seed1") + dbt_project.seed(data2, "metrics_seed2") + dbt_project.dbt_runner.run(select="metrics") + + remaining_models_to_row_count = { + "metrics_table": len(data1), + "metrics_incremental": len(data2), + } + for metric in dbt_project.read_table("data_monitoring_metrics"): + for model_name, row_count in remaining_models_to_row_count.items(): + if model_name.upper() in metric["full_table_name"]: + if metric["metric_name"] == "row_count": + assert metric["metric_value"] == row_count + elif metric["metric_name"] == "build_timestamp": + assert metric["metric_value"] > now.timestamp() + remaining_models_to_row_count.pop(model_name) + break + + assert not remaining_models_to_row_count diff --git a/macros/edr/materializations/model/incremental.sql b/macros/edr/materializations/model/incremental.sql new file mode 100644 index 000000000..4dde034ec --- /dev/null +++ b/macros/edr/materializations/model/incremental.sql @@ -0,0 +1,34 @@ +{% materialization incremental, default %} + {% set relations = dbt.materialization_incremental_default() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization incremental, adapter="snowflake" %} + {% set relations = dbt.materialization_incremental_snowflake() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization incremental, adapter="bigquery" %} + {% set relations = dbt.materialization_incremental_bigquery() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization incremental, adapter="spark" %} + {% set relations = dbt.materialization_incremental_spark() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization incremental, adapter="databricks" %} + {% set relations = dbt.materialization_incremental_databricks() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} diff --git a/macros/edr/materializations/model/metrics.sql b/macros/edr/materializations/model/metrics.sql new file mode 100644 index 000000000..c5e89a6ba --- /dev/null +++ b/macros/edr/materializations/model/metrics.sql @@ -0,0 +1,35 @@ +{% macro query_table_metrics() %} + {% set query %} + select + {{ modules.datetime.datetime.utcnow().timestamp() }} as build_timestamp, + count(*) as row_count + from {{ this }} + {% endset %} + + {% set metrics = [] %} + {% for metric_column in elementary.run_query(query).columns %} + {% set metric_name = metric_column.name %} + {% set metric_value = metric_column[0] %} + {% do metrics.append({ + "id": "{}.{}".format(invocation_id, this), + "full_table_name": elementary.relation_to_full_name(this), + "column_name": none, + "metric_name": metric_name, + "metric_value": metric_value, + "updated_at": elementary.datetime_now_utc_as_string() + }) %} + {% endfor %} + {% do return(metrics) %} +{% endmacro %} + +{% macro query_metrics() %} + {% if not elementary.get_config_var('collect_metrics') %} + {% do return([]) %} + {% endif %} + + {% do return(elementary.query_table_metrics()) %} +{% endmacro %} + +{% macro cache_metrics(metrics) %} + {% do elementary.get_cache("tables").get("metrics").extend(metrics) %} +{% endmacro %} diff --git a/macros/edr/materializations/model/table.sql b/macros/edr/materializations/model/table.sql new file mode 100644 index 000000000..4fabcc5ab --- /dev/null +++ b/macros/edr/materializations/model/table.sql @@ -0,0 +1,34 @@ +{% materialization table, default %} + {% set relations = dbt.materialization_table_default() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization table, adapter="snowflake" %} + {% set relations = dbt.materialization_table_snowflake() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization table, adapter="bigquery" %} + {% set relations = dbt.materialization_table_bigquery() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization table, adapter="spark" %} + {% set relations = dbt.materialization_table_spark() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} + +{% materialization table, adapter="databricks" %} + {% set relations = dbt.materialization_table_databricks() %} + {% set metrics = elementary.query_metrics() %} + {% do elementary.cache_metrics(metrics) %} + {% do return(relations) %} +{% endmaterialization %} diff --git a/macros/edr/materializations/tests/failed_row_count.sql b/macros/edr/materializations/test/failed_row_count.sql similarity index 100% rename from macros/edr/materializations/tests/failed_row_count.sql rename to macros/edr/materializations/test/failed_row_count.sql diff --git a/macros/edr/materializations/tests/test.sql b/macros/edr/materializations/test/test.sql similarity index 100% rename from macros/edr/materializations/tests/test.sql rename to macros/edr/materializations/test/test.sql diff --git a/macros/edr/system/hooks/on_run_end.sql b/macros/edr/system/hooks/on_run_end.sql index a8afe505e..73b2f1ebd 100644 --- a/macros/edr/system/hooks/on_run_end.sql +++ b/macros/edr/system/hooks/on_run_end.sql @@ -2,23 +2,27 @@ {%- if execute and flags.WHICH not in ['generate', 'serve'] %} {% set edr_cli_run = elementary.get_config_var('edr_cli_run') %} {% if not execute or edr_cli_run %} - {{ return('') }} + {% do return("") %} + {% endif %} + + {% if flags.WHICH in ['run', 'build'] %} + {% do elementary.insert_metrics() %} {% endif %} {% if not elementary.get_config_var('disable_dbt_artifacts_autoupload') %} - {{ elementary.upload_dbt_artifacts() }} + {% do elementary.upload_dbt_artifacts() %} {% endif %} {% if not elementary.get_config_var('disable_run_results') %} - {{ elementary.upload_run_results() }} + {% do elementary.upload_run_results() %} {% endif %} {% if flags.WHICH in ['test', 'build'] and not elementary.get_config_var('disable_tests_results') %} - {{ elementary.handle_tests_results() }} + {% do elementary.handle_tests_results() %} {% endif %} {% if not elementary.get_config_var('disable_dbt_invocation_autoupload') %} - {{ elementary.upload_dbt_invocation() }} + {% do elementary.upload_dbt_invocation() %} {% endif %} {% endif %} {% endmacro %} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 64ef8a6e1..4dbb5707d 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -55,6 +55,7 @@ 'mute_dbt_upgrade_recommendation': false, 'calculate_failed_count': true, 'tests_use_temp_tables': true, + 'collect_metrics': true, 'upload_dbt_columns': false } %} {{- return(default_config) -}} diff --git a/macros/edr/tests/on_run_end/insert_metrics.sql b/macros/edr/tests/on_run_end/insert_metrics.sql new file mode 100644 index 000000000..f81c2a9ed --- /dev/null +++ b/macros/edr/tests/on_run_end/insert_metrics.sql @@ -0,0 +1,12 @@ +{% macro insert_metrics() %} + {% set metrics = elementary.get_cache("tables").get("metrics") %} + {% set database_name, schema_name = elementary.get_package_database_and_schema() %} + {%- set target_relation = adapter.get_relation(database=database_name, schema=schema_name, identifier='data_monitoring_metrics') -%} + {% if not target_relation %} + {% do exceptions.warn("Couldn't find Elementary's models. Please run `dbt run -s elementary`.") %} + {% do return(none) %} + {% endif %} + + {{ elementary.file_log("Inserting {} metrics into {}.".format(metrics | length, target_relation)) }} + {% do elementary.insert_rows(target_relation, metrics, should_commit=true) %} +{% endmacro %}