From f58fc233fbbc7b9289299313807e706c57360613 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 28 Jul 2022 13:52:18 -0700 Subject: [PATCH 01/14] Feature/python model beta (#377) Co-authored-by: Jeremy Cohen Co-authored-by: Ian Knox --- .github/workflows/main.yml | 1 + CHANGELOG.md | 3 + dbt/adapters/spark/impl.py | 127 +++++++++++++++++- dbt/include/spark/macros/adapters.sql | 61 +++++---- .../incremental/incremental.sql | 77 +++++++---- .../macros/materializations/snapshot.sql | 2 +- .../spark/macros/materializations/table.sql | 24 +++- dev-requirements.txt | 2 + requirements.txt | 2 + tests/conftest.py | 2 + tests/functional/adapter/test_basic.py | 1 - tests/functional/adapter/test_python_model.py | 59 ++++++++ .../test_incremental_strategies.py | 2 + 13 files changed, 297 insertions(+), 66 deletions(-) create mode 100644 tests/functional/adapter/test_python_model.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b45f93776..bf607c379 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -60,6 +60,7 @@ jobs: python -m pip install pre-commit pre-commit --version python -m pip install mypy==0.942 + python -m pip install types-requests mypy --version python -m pip install -r requirements.txt python -m pip install -r dev-requirements.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 28f7e138b..d015a26c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## dbt-spark 1.3.0b1 (Release TBD) +### Features +- support python model through notebook, currently supported materializations are table and incremental. ([#377](https://github.com/dbt-labs/dbt-spark/pull/377)) + ### Fixes - Pin `pyodbc` to version 4.0.32 to prevent overwriting `libodbc.so` and `libltdl.so` on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397/), [#398](https://github.com/dbt-labs/dbt-spark/pull/398/)) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 3fb9978d8..12c42ab98 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,4 +1,7 @@ import re +import requests +import time +import base64 from concurrent.futures import Future from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Union @@ -11,7 +14,8 @@ import dbt.exceptions from dbt.adapters.base import AdapterConfig -from dbt.adapters.base.impl import catch_as_completed +from dbt.adapters.base.impl import catch_as_completed, log_code_execution +from dbt.adapters.base.meta import available from dbt.adapters.sql import SQLAdapter from dbt.adapters.spark import SparkConnectionManager from dbt.adapters.spark import SparkRelation @@ -159,11 +163,9 @@ def list_relations_without_caching( return relations - def get_relation( - self, database: Optional[str], schema: str, identifier: str - ) -> Optional[BaseRelation]: + def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]: if not self.Relation.include_policy.database: - database = None + database = None # type: ignore return super().get_relation(database, schema, identifier) @@ -296,7 +298,12 @@ def get_catalog(self, manifest): for schema in schemas: futures.append( tpe.submit_connected( - self, schema, self._get_one_catalog, info, [schema], manifest + self, + schema, + self._get_one_catalog, + info, + [schema], + manifest, ) ) catalogs, exceptions = catch_as_completed(futures) @@ -380,6 +387,114 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False + @available.parse_none + @log_code_execution + def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None): + # TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead + # of `None` which evaluates to True! + + # TODO limit this function to run only when doing the materialization of python nodes + + # assuming that for python job running over 1 day user would mannually overwrite this + schema = getattr(parsed_model, "schema", self.config.credentials.schema) + identifier = parsed_model["alias"] + if not timeout: + timeout = 60 * 60 * 24 + if timeout <= 0: + raise ValueError("Timeout must larger than 0") + + auth_header = {"Authorization": f"Bearer {self.connections.profile.credentials.token}"} + + # create new dir + if not self.connections.profile.credentials.user: + raise ValueError("Need to supply user in profile to submit python job") + # it is safe to call mkdirs even if dir already exists and have content inside + work_dir = f"/Users/{self.connections.profile.credentials.user}/{schema}" + response = requests.post( + f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/mkdirs", + headers=auth_header, + json={ + "path": work_dir, + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating work_dir for python notebooks\n {response.content!r}" + ) + + # add notebook + b64_encoded_content = base64.b64encode(compiled_code.encode()).decode() + response = requests.post( + f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/import", + headers=auth_header, + json={ + "path": f"{work_dir}/{identifier}", + "content": b64_encoded_content, + "language": "PYTHON", + "overwrite": True, + "format": "SOURCE", + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating python notebook.\n {response.content!r}" + ) + + # submit job + submit_response = requests.post( + f"https://{self.connections.profile.credentials.host}/api/2.1/jobs/runs/submit", + headers=auth_header, + json={ + "run_name": "debug task", + "existing_cluster_id": self.connections.profile.credentials.cluster, + "notebook_task": { + "notebook_path": f"{work_dir}/{identifier}", + }, + }, + ) + if submit_response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating python run.\n {response.content!r}" + ) + + # poll until job finish + state = None + start = time.time() + run_id = submit_response.json()["run_id"] + terminal_states = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"] + while state not in terminal_states and time.time() - start < timeout: + time.sleep(1) + resp = requests.get( + f"https://{self.connections.profile.credentials.host}" + f"/api/2.1/jobs/runs/get?run_id={run_id}", + headers=auth_header, + ) + json_resp = resp.json() + state = json_resp["state"]["life_cycle_state"] + # logger.debug(f"Polling.... in state: {state}") + if state != "TERMINATED": + raise dbt.exceptions.RuntimeException( + "python model run ended in state" + f"{state} with state_message\n{json_resp['state']['state_message']}" + ) + + # get end state to return to user + run_output = requests.get( + f"https://{self.connections.profile.credentials.host}" + f"/api/2.1/jobs/runs/get-output?run_id={run_id}", + headers=auth_header, + ) + json_run_output = run_output.json() + result_state = json_run_output["metadata"]["state"]["result_state"] + if result_state != "SUCCESS": + raise dbt.exceptions.RuntimeException( + "Python model failed with traceback as:\n" + "(Note that the line number here does not " + "match the line number in your code due to dbt templating)\n" + f"{json_run_output['error_trace']}" + ) + return self.connections.get_response(None) + def standardize_grants_dict(self, grants_table: agate.Table) -> dict: grants_dict: Dict[str, List[str]] = {} for row in grants_table: diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index abdeacb7f..05630ede5 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -117,35 +117,46 @@ {%- endmacro %} -{% macro create_temporary_view(relation, sql) -%} - {{ return(adapter.dispatch('create_temporary_view', 'dbt')(relation, sql)) }} +{% macro create_temporary_view(relation, compiled_code) -%} + {{ return(adapter.dispatch('create_temporary_view', 'dbt')(relation, compiled_code)) }} {%- endmacro -%} -{#-- We can't use temporary tables with `create ... as ()` syntax #} -{% macro spark__create_temporary_view(relation, sql) -%} - create temporary view {{ relation.include(schema=false) }} as - {{ sql }} -{% endmacro %} +{#-- We can't use temporary tables with `create ... as ()` syntax --#} +{% macro spark__create_temporary_view(relation, compiled_code) -%} + create temporary view {{ relation.include(schema=false) }} as + {{ compiled_code }} +{%- endmacro -%} -{% macro spark__create_table_as(temporary, relation, sql) -%} - {% if temporary -%} - {{ create_temporary_view(relation, sql) }} - {%- else -%} - {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} - create or replace table {{ relation }} - {% else %} - create table {{ relation }} - {% endif %} - {{ file_format_clause() }} - {{ options_clause() }} - {{ partition_cols(label="partitioned by") }} - {{ clustered_cols(label="clustered by") }} - {{ location_clause() }} - {{ comment_clause() }} - as - {{ sql }} - {%- endif %} +{%- macro spark__create_table_as(temporary, relation, compiled_code, language='sql') -%} + {%- if language == 'sql' -%} + {%- if temporary -%} + {{ create_temporary_view(relation, compiled_code) }} + {%- else -%} + {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} + create or replace table {{ relation }} + {% else %} + create table {{ relation }} + {% endif %} + {{ file_format_clause() }} + {{ options_clause() }} + {{ partition_cols(label="partitioned by") }} + {{ clustered_cols(label="clustered by") }} + {{ location_clause() }} + {{ comment_clause() }} + as + {{ compiled_code }} + {%- endif -%} + {%- elif language == 'python' -%} + {#-- + N.B. Python models _can_ write to temp views HOWEVER they use a different session + and have already expired by the time they need to be used (I.E. in merges for incremental models) + + TODO: Deep dive into spark sessions to see if we can reuse a single session for an entire + dbt invocation. + --#} + {{ py_write_table(compiled_code=compiled_code, target_relation=relation) }} + {%- endif -%} {%- endmacro -%} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 1ca2c149a..91cba9e5f 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -1,5 +1,4 @@ {% materialization incremental, adapter='spark' -%} - {#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#} {%- set raw_file_format = config.get('file_format', default='parquet') -%} {%- set raw_strategy = config.get('incremental_strategy') or 'append' -%} @@ -8,43 +7,63 @@ {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + {#-- Set vars --#} + {%- set unique_key = config.get('unique_key', none) -%} {%- set partition_by = config.get('partition_by', none) -%} - - {%- set full_refresh_mode = (should_full_refresh()) -%} - - {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} - - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - - {% if strategy == 'insert_overwrite' and partition_by %} - {% call statement() %} + {%- set language = model['language'] -%} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} + {%- set target_relation = this -%} + {%- set existing_relation = load_relation(this) -%} + {%- set tmp_relation = make_temp_relation(this) -%} + + {#-- Set Overwrite Mode --#} + {%- if strategy == 'insert_overwrite' and partition_by -%} + {%- call statement() -%} set spark.sql.sources.partitionOverwriteMode = DYNAMIC - {% endcall %} - {% endif %} + {%- endcall -%} + {%- endif -%} + {#-- Run pre-hooks --#} {{ run_hooks(pre_hooks) }} - {% set is_delta = (file_format == 'delta' and existing_relation.is_delta) %} - - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} + {#-- Incremental run logic --#} + {%- if existing_relation is none -%} + {#-- Relation must be created --#} + {%- call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall -%} + {%- elif existing_relation.is_view or should_full_refresh() -%} + {#-- Relation must be dropped & recreated --#} + {% set is_delta = (file_format == 'delta' and existing_relation.is_delta) %} {% if not is_delta %} {#-- If Delta, we will `create or replace` below, so no need to drop --#} {% do adapter.drop_relation(existing_relation) %} {% endif %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% else %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} - {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} - {% endif %} - - {%- call statement('main') -%} - {{ build_sql }} - {%- endcall -%} + {%- call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall -%} + {%- else -%} + {#-- Relation must be merged --#} + {%- call statement('create_tmp_relation', language=language) -%} + {{ create_table_as(True, tmp_relation, compiled_code, language) }} + {%- endcall -%} + {%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} + {%- call statement('main') -%} + {{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) }} + {%- endcall -%} + {%- if language == 'python' -%} + {#-- + This is yucky. + See note in dbt-spark/dbt/include/spark/macros/adapters.sql + re: python models and temporary views. + + Also, why doesn't either drop_relation or adapter.drop_relation work here?! + --#} + {% call statement('drop_relation') -%} + drop table if exists {{ tmp_relation }} + {%- endcall %} + {%- endif -%} + {%- endif -%} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke) %} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index a5304682e..6cf2358fe 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -117,7 +117,7 @@ {% if not target_relation_exists %} - {% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %} + {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} {% set final_sql = create_table_as(False, target_relation, build_sql) %} {% else %} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 3462d3332..6a02ea164 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -1,5 +1,5 @@ {% materialization table, adapter = 'spark' %} - + {%- set language = model['language'] -%} {%- set identifier = model['alias'] -%} {%- set grant_config = config.get('grants') -%} @@ -19,9 +19,10 @@ {%- endif %} -- build model - {% call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall %} + + {%- call statement('main', language=language) -%} + {{ create_table_as(False, target_relation, compiled_code, language) }} + {%- endcall -%} {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke) %} @@ -33,3 +34,18 @@ {{ return({'relations': [target_relation]})}} {% endmaterialization %} + + +{% macro py_write_table(compiled_code, target_relation) %} +{{ compiled_code }} +# --- Autogenerated dbt materialization code. --- # +dbt = dbtObj(spark.table) +df = model(dbt, spark) +df.write.mode("overwrite").format("delta").saveAsTable("{{ target_relation }}") +{%- endmacro -%} + +{%macro py_script_comment()%} +# how to execute python model in notebook +# dbt = dbtObj(spark.table) +# df = model(dbt, spark) +{%endmacro%} diff --git a/dev-requirements.txt b/dev-requirements.txt index b94cb8b6b..5b29e5e9d 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -3,6 +3,8 @@ git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter + + black==22.3.0 bumpversion click~=8.0.4 diff --git a/requirements.txt b/requirements.txt index c64512aeb..5d774e4f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ PyHive[hive]>=0.6.0,<0.7.0 +requests[python]>=2.28.1 + pyodbc==4.0.32 sqlparams>=3.0.0 thrift>=0.13.0 diff --git a/tests/conftest.py b/tests/conftest.py index 0771566b7..2fa50d6c7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,6 +60,7 @@ def databricks_cluster_target(): "connect_retries": 3, "connect_timeout": 5, "retry_all": True, + "user": os.getenv('DBT_DATABRICKS_USER'), } @@ -91,6 +92,7 @@ def databricks_http_cluster_target(): "connect_retries": 5, "connect_timeout": 60, "retry_all": bool(os.getenv('DBT_DATABRICKS_RETRY_ALL', False)), + "user": os.getenv('DBT_DATABRICKS_USER'), } diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index e1a57fd3f..bdccf169d 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -79,7 +79,6 @@ def project_config_update(self): } } - @pytest.mark.skip_profile('spark_session') class TestBaseAdapterMethod(BaseAdapterMethod): pass diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py new file mode 100644 index 000000000..059412f10 --- /dev/null +++ b/tests/functional/adapter/test_python_model.py @@ -0,0 +1,59 @@ +import os +import pytest +from dbt.tests.util import run_dbt, write_file, run_dbt_and_capture +from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests, BasePythonIncrementalTests + +@pytest.mark.skip_profile("apache_spark", "spark_session", "databricks_sql_endpoint") +class TestPythonModelSpark(BasePythonModelTests): + pass + +@pytest.mark.skip_profile("apache_spark", "spark_session", "databricks_sql_endpoint") +class TestPythonIncrementalModelSpark(BasePythonIncrementalTests): + @pytest.fixture(scope="class") + def project_config_update(self): + return {} + + +models__simple_python_model = """ +import pandas + +def model(dbt, spark): + dbt.config( + materialized='table', + ) + data = [[1,2]] * 10 + return spark.createDataFrame(data, schema=['test', 'test2']) +""" +models__simple_python_model_v2 = """ +import pandas + +def model(dbt, spark): + dbt.config( + materialized='table', + ) + data = [[1,2]] * 10 + return spark.createDataFrame(data, schema=['test1', 'test3']) +""" + + +@pytest.mark.skip_profile("apache_spark", "spark_session", "databricks_sql_endpoint") +class TestChangingSchemaSpark: + @pytest.fixture(scope="class") + def models(self): + return {"simple_python_model.py": models__simple_python_model} + + def test_changing_schema_with_log_validation(self, project, logs_dir): + run_dbt(["run"]) + write_file( + models__simple_python_model_v2, + project.project_root + "/models", + "simple_python_model.py", + ) + run_dbt(["run"]) + log_file = os.path.join(logs_dir, "dbt.log") + with open(log_file, "r") as f: + log = f.read() + # validate #5510 log_code_execution works + assert "On model.test.simple_python_model:" in log + assert "spark.createDataFrame(data, schema=['test1', 'test3'])" in log + assert "Execution status: OK in" in log diff --git a/tests/integration/incremental_strategies/test_incremental_strategies.py b/tests/integration/incremental_strategies/test_incremental_strategies.py index 839f167e6..3848d11ae 100644 --- a/tests/integration/incremental_strategies/test_incremental_strategies.py +++ b/tests/integration/incremental_strategies/test_incremental_strategies.py @@ -60,6 +60,8 @@ def run_and_test(self): def test_insert_overwrite_apache_spark(self): self.run_and_test() + # This test requires settings on the test cluster + # more info at https://docs.getdbt.com/reference/resource-configs/spark-configs#the-insert_overwrite-strategy @use_profile("databricks_cluster") def test_insert_overwrite_databricks_cluster(self): self.run_and_test() From 7f6cffecf38b7c41aa441eb020d464ba1e20bf9e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 29 Jul 2022 15:27:56 -0400 Subject: [PATCH 02/14] Bumping version to 1.3.0b1 (#412) * Bumping version to 1.3.0b1 * Update CHANGELOG.md * Fix whitespace * Fixing whitespace Co-authored-by: Github Build Bot Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- .bumpversion.cfg | 2 +- CHANGELOG.md | 9 +++++++-- dbt/adapters/spark/__version__.py | 2 +- setup.py | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 605b6f378..ef3954f4c 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.3.0a1 +current_version = 1.3.0b1 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d015a26c7..5948429a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,16 @@ -## dbt-spark 1.3.0b1 (Release TBD) +## dbt-spark 1.3.0b2 (Release TBD) + +## dbt-spark 1.3.0b1 (July 29, 2022) ### Features -- support python model through notebook, currently supported materializations are table and incremental. ([#377](https://github.com/dbt-labs/dbt-spark/pull/377)) +- Support python model through notebook, currently supported materializations are table and incremental. ([#377](https://github.com/dbt-labs/dbt-spark/pull/377)) ### Fixes - Pin `pyodbc` to version 4.0.32 to prevent overwriting `libodbc.so` and `libltdl.so` on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397/), [#398](https://github.com/dbt-labs/dbt-spark/pull/398/)) +### Under the hood +- Support core incremental refactor ([#394](https://github.com/dbt-labs/dbt-spark/issues/394)) + ### Contributors - [@barberscott](https://github.com/barberscott) ([#398](https://github.com/dbt-labs/dbt-spark/pull/398/)) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index a9fe3c3ee..4b49b750d 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.3.0a1" +version = "1.3.0b1" diff --git a/setup.py b/setup.py index cb0c40aec..229e89a17 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.3.0a1" +package_version = "1.3.0b1" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt""" From 9b00895a51d14745d896ac17d08e6c2423a4703a Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Tue, 9 Aug 2022 15:34:57 -0500 Subject: [PATCH 03/14] init pr push for ct-1005 (#418) * init pr push for ct-1005 * add changelog * change pointer to spark * eof fix * remove ref to dbt-core in changelog existence * typo fix * typo and change of ref * add condtional logic for kinds security and dependency and change NO ISSUE ref to spark issue #417 --- .changes/0.0.0.md | 5 + .changes/1.3.0-b1.md | 11 + .changes/1.3.0/Features-20220808-141141.yaml | 8 + .changes/1.3.0/Fixes-20220808-141623.yaml | 8 + .../1.3.0/Under the Hood-20220808-141320.yaml | 7 + .changes/README.md | 3 + .changes/header.tpl.md | 6 + .changes/unreleased/.gitkeep | 0 .../unreleased/Features-20220808-142118.yaml | 7 + .changie.yaml | 62 ++++ .github/pull_request_template.md | 2 +- .github/workflows/bot-changelog.yml | 61 ++++ .github/workflows/changelog-existence.yml | 41 +++ CHANGELOG.md | 275 +----------------- CONTRIBUTING.md | 10 + 15 files changed, 246 insertions(+), 260 deletions(-) create mode 100644 .changes/0.0.0.md create mode 100644 .changes/1.3.0-b1.md create mode 100644 .changes/1.3.0/Features-20220808-141141.yaml create mode 100644 .changes/1.3.0/Fixes-20220808-141623.yaml create mode 100644 .changes/1.3.0/Under the Hood-20220808-141320.yaml create mode 100644 .changes/README.md create mode 100644 .changes/header.tpl.md create mode 100644 .changes/unreleased/.gitkeep create mode 100644 .changes/unreleased/Features-20220808-142118.yaml create mode 100644 .changie.yaml create mode 100644 .github/workflows/bot-changelog.yml create mode 100644 .github/workflows/changelog-existence.yml diff --git a/.changes/0.0.0.md b/.changes/0.0.0.md new file mode 100644 index 000000000..5acfb3dbc --- /dev/null +++ b/.changes/0.0.0.md @@ -0,0 +1,5 @@ +## Previous Releases +For information on prior major and minor releases, see their changelogs: +- [1.2](https://github.com/dbt-labs/dbt-spark/blob/1.2.latest/CHANGELOG.md) +- [1.1](https://github.com/dbt-labs/dbt-spark/blob/1.1.latest/CHANGELOG.md) +- [1.0](https://github.com/dbt-labs/dbt-spark/blob/1.0.latest/CHANGELOG.md) diff --git a/.changes/1.3.0-b1.md b/.changes/1.3.0-b1.md new file mode 100644 index 000000000..ef64f4395 --- /dev/null +++ b/.changes/1.3.0-b1.md @@ -0,0 +1,11 @@ +## dbt-spark 1.3.0-b1 - July 29, 2022 + +### Features +- Support python model through notebook, currently supported materializations are table and incremental ([#417](https://github.com/dbt-labs/dbt-spark/issues/417), [#377](https://github.com/dbt-labs/dbt-spark/pull/377)) +### Fixes +- Pin pyodbc to version 4.0.32 to prevent overwriting libodbc.so and libltdl.so on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397), [#398](https://github.com/dbt-labs/dbt-spark/pull/398)) +### Under the Hood +- Support core incremental refactor ([#4402](https://github.com/dbt-labs/dbt-spark/issues/4402), [#394](https://github.com/dbt-labs/dbt-spark/pull/394)) + +### Contributors +- [@barberscot](https://github.com/barberscot) ([#398](https://github.com/dbt-labs/dbt-spark/pull/398)) diff --git a/.changes/1.3.0/Features-20220808-141141.yaml b/.changes/1.3.0/Features-20220808-141141.yaml new file mode 100644 index 000000000..444a3062b --- /dev/null +++ b/.changes/1.3.0/Features-20220808-141141.yaml @@ -0,0 +1,8 @@ +kind: Features +body: Support python model through notebook, currently supported materializations + are table and incremental +time: 2022-08-08T14:11:41.906131-05:00 +custom: + Author: ChenyuLInx + Issue: "417" + PR: "377" diff --git a/.changes/1.3.0/Fixes-20220808-141623.yaml b/.changes/1.3.0/Fixes-20220808-141623.yaml new file mode 100644 index 000000000..793e3e5b2 --- /dev/null +++ b/.changes/1.3.0/Fixes-20220808-141623.yaml @@ -0,0 +1,8 @@ +kind: Fixes +body: Pin pyodbc to version 4.0.32 to prevent overwriting libodbc.so and libltdl.so + on Linux +time: 2022-08-08T14:16:23.846876-05:00 +custom: + Author: barberscot + Issue: "397" + PR: "398" diff --git a/.changes/1.3.0/Under the Hood-20220808-141320.yaml b/.changes/1.3.0/Under the Hood-20220808-141320.yaml new file mode 100644 index 000000000..82535f926 --- /dev/null +++ b/.changes/1.3.0/Under the Hood-20220808-141320.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Support core incremental refactor +time: 2022-08-08T14:13:20.576155-05:00 +custom: + Author: gshank + Issue: "4402" + PR: "394" diff --git a/.changes/README.md b/.changes/README.md new file mode 100644 index 000000000..dc6106dfe --- /dev/null +++ b/.changes/README.md @@ -0,0 +1,3 @@ +# CHANGELOG + +To view information about the changelog operation we suggest reading this [README](https://github.com/dbt-labs/dbt-spark/blob/main/.changes/README.md) found in `dbt-spark`. diff --git a/.changes/header.tpl.md b/.changes/header.tpl.md new file mode 100644 index 000000000..251ea5d51 --- /dev/null +++ b/.changes/header.tpl.md @@ -0,0 +1,6 @@ +# dbt-spark Changelog + +- This file provides a full account of all changes to `dbt-spark`. +- Changes are listed under the (pre)release in which they first appear. Subsequent releases include changes from previous releases. +- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version. +- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.md#adding-changelog-entry) diff --git a/.changes/unreleased/.gitkeep b/.changes/unreleased/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/.changes/unreleased/Features-20220808-142118.yaml b/.changes/unreleased/Features-20220808-142118.yaml new file mode 100644 index 000000000..9c110e937 --- /dev/null +++ b/.changes/unreleased/Features-20220808-142118.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Add changie to dbt-spark +time: 2022-08-08T14:21:18.569756-05:00 +custom: + Author: mcknight-42 + Issue: "416" + PR: "418" diff --git a/.changie.yaml b/.changie.yaml new file mode 100644 index 000000000..f5800f324 --- /dev/null +++ b/.changie.yaml @@ -0,0 +1,62 @@ +changesDir: .changes +unreleasedDir: unreleased +headerPath: header.tpl.md +versionHeaderPath: "" +changelogPath: CHANGELOG.md +versionExt: md +versionFormat: '## dbt-spark {{.Version}} - {{.Time.Format "January 02, 2006"}}' +kindFormat: '### {{.Kind}}' +changeFormat: '- {{.Body}} ([#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-spark/issues/{{.Custom.Issue}}), [#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-spark/pull/{{.Custom.PR}}))' +kinds: +- label: Breaking Changes +- label: Features +- label: Fixes +- label: Under the Hood +- label: Dependencies + changeFormat: '- {{.Body}} ({{if ne .Custom.Issue ""}}[#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-spark/issues/{{.Custom.Issue}}), {{end}}[#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-spark/pull/{{.Custom.PR}}))' +- label: Security + changeFormat: '- {{.Body}} ({{if ne .Custom.Issue ""}}[#{{.Custom.Issue}}](https://github.com/dbt-labs/dbt-spark/issues/{{.Custom.Issue}}), {{end}}[#{{.Custom.PR}}](https://github.com/dbt-labs/dbt-spark/pull/{{.Custom.PR}}))' +custom: +- key: Author + label: GitHub Username(s) (separated by a single space if multiple) + type: string + minLength: 3 +- key: Issue + label: GitHub Issue Number + type: int + minLength: 4 +- key: PR + label: GitHub Pull Request Number + type: int + minLength: 4 +footerFormat: | + {{- $contributorDict := dict }} + {{- /* any names added to this list should be all lowercase for later matching purposes */}} + {{- $core_team := list "emmyoop" "nathaniel-may" "gshank" "leahwicz" "chenyulinx" "stu-k" "iknox-fa" "versusfacit" "mcknight-42" "jtcohen6" "dependabot[bot]" "snyk-bot" }} + {{- range $change := .Changes }} + {{- $authorList := splitList " " $change.Custom.Author }} + {{- /* loop through all authors for a PR */}} + {{- range $author := $authorList }} + {{- $authorLower := lower $author }} + {{- /* we only want to include non-core team contributors */}} + {{- if not (has $authorLower $core_team)}} + {{- $pr := $change.Custom.PR }} + {{- /* check if this contributor has other PRs associated with them already */}} + {{- if hasKey $contributorDict $author }} + {{- $prList := get $contributorDict $author }} + {{- $prList = append $prList $pr }} + {{- $contributorDict := set $contributorDict $author $prList }} + {{- else }} + {{- $prList := list $change.Custom.PR }} + {{- $contributorDict := set $contributorDict $author $prList }} + {{- end }} + {{- end}} + {{- end}} + {{- end }} + {{- /* no indentation here for formatting so the final markdown doesn't have unneeded indentations */}} + {{- if $contributorDict}} + ### Contributors + {{- range $k,$v := $contributorDict }} + - [@{{$k}}](https://github.com/{{$k}}) ({{ range $index, $element := $v }}{{if $index}}, {{end}}[#{{$element}}](https://github.com/dbt-labs/dbt-spark/pull/{{$element}}){{end}}) + {{- end }} + {{- end }} diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 5928b1cbf..c4a5c53b4 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -18,4 +18,4 @@ resolves # - [ ] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) - [ ] I have run this code in development and it appears to resolve the stated issue - [ ] This PR includes tests, or tests are not required/relevant for this PR -- [ ] I have updated the `CHANGELOG.md` and added information about my change to the "dbt-spark next" section. +- [ ] I have run `changie new` to [create a changelog entry](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.md#Adding-CHANGELOG-Entry) diff --git a/.github/workflows/bot-changelog.yml b/.github/workflows/bot-changelog.yml new file mode 100644 index 000000000..d8056efe4 --- /dev/null +++ b/.github/workflows/bot-changelog.yml @@ -0,0 +1,61 @@ +# **what?** +# When bots create a PR, this action will add a corresponding changie yaml file to that +# PR when a specific label is added. +# +# The file is created off a template: +# +# kind: +# body: +# time: +# custom: +# Author: +# Issue: 4904 +# PR: +# +# **why?** +# Automate changelog generation for more visability with automated bot PRs. +# +# **when?** +# Once a PR is created, label should be added to PR before or after creation. You can also +# manually trigger this by adding the appropriate label at any time. +# +# **how to add another bot?** +# Add the label and changie kind to the include matrix. That's it! +# + +name: Bot Changelog + +on: + pull_request: + # catch when the PR is opened with the label or when the label is added + types: [opened, labeled] + +permissions: + contents: write + pull-requests: read + +jobs: + generate_changelog: + strategy: + matrix: + include: + - label: "dependencies" + changie_kind: "Dependency" + - label: "snyk" + changie_kind: "Security" + runs-on: ubuntu-latest + + steps: + + - name: Create and commit changelog on bot PR + if: "contains(github.event.pull_request.labels.*.name, ${{ matrix.label }})" + id: bot_changelog + uses: emmyoop/changie_bot@v1.0 + with: + GITHUB_TOKEN: ${{ secrets.FISHTOWN_BOT_PAT }} + commit_author_name: "Github Build Bot" + commit_author_email: "" + commit_message: "Add automated changelog yaml from template for bot PR" + changie_kind: ${{ matrix.changie_kind }} + label: ${{ matrix.label }} + custom_changelog_string: "custom:\n Author: ${{ github.event.pull_request.user.login }}\n Issue: 417\n PR: ${{ github.event.pull_request.number }}\n" diff --git a/.github/workflows/changelog-existence.yml b/.github/workflows/changelog-existence.yml new file mode 100644 index 000000000..6e51e8afc --- /dev/null +++ b/.github/workflows/changelog-existence.yml @@ -0,0 +1,41 @@ +# **what?** +# Checks that a file has been committed under the /.changes directory +# as a new CHANGELOG entry. Cannot check for a specific filename as +# it is dynamically generated by change type and timestamp. +# This workflow should not require any secrets since it runs for PRs +# from forked repos. +# By default, secrets are not passed to workflows running from +# a forked repo. + +# **why?** +# Ensure code change gets reflected in the CHANGELOG. + +# **when?** +# This will run for all PRs going into main and *.latest. It will +# run when they are opened, reopened, when any label is added or removed +# and when new code is pushed to the branch. The action will then get +# skipped if the 'Skip Changelog' label is present is any of the labels. + +name: Check Changelog Entry + +on: + pull_request: + types: [opened, reopened, labeled, unlabeled, synchronize] + workflow_dispatch: + +defaults: + run: + shell: bash + +permissions: + contents: read + pull-requests: write + + +jobs: + changelog: + uses: dbt-labs/actions/.github/workflows/changelog-existence.yml@main + with: + changelog_comment: 'Thank you for your pull request! We could not find a changelog entry for this change. For details on how to document a change, see the [dbt-spark contributing guide](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.MD).' + skip_label: 'Skip Changelog' + secrets: inherit # this is only acceptable because we own the action we're calling diff --git a/CHANGELOG.md b/CHANGELOG.md index 5948429a7..4f187e31e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,266 +1,23 @@ -## dbt-spark 1.3.0b2 (Release TBD) +# dbt-spark Changelog -## dbt-spark 1.3.0b1 (July 29, 2022) +- This file provides a full account of all changes to `dbt-spark`. +- Changes are listed under the (pre)release in which they first appear. Subsequent releases include changes from previous releases. +- "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version. +- Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.md#adding-changelog-entry) -### Features -- Support python model through notebook, currently supported materializations are table and incremental. ([#377](https://github.com/dbt-labs/dbt-spark/pull/377)) - -### Fixes -- Pin `pyodbc` to version 4.0.32 to prevent overwriting `libodbc.so` and `libltdl.so` on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397/), [#398](https://github.com/dbt-labs/dbt-spark/pull/398/)) - -### Under the hood -- Support core incremental refactor ([#394](https://github.com/dbt-labs/dbt-spark/issues/394)) - -### Contributors -- [@barberscott](https://github.com/barberscott) ([#398](https://github.com/dbt-labs/dbt-spark/pull/398/)) - -## dbt-spark 1.2.0rc1 (July 12, 2022) - -### Fixes -- Incremental materialization updated to not drop table first if full refresh for delta lake format, as it already runs _create or replace table_ ([#286](https://github.com/dbt-labs/dbt-spark/issues/286), [#287](https://github.com/dbt-labs/dbt-spark/pull/287/)) -- Apache Spark version upgraded to 3.1.1 ([#348](https://github.com/dbt-labs/dbt-spark/issues/348), [#349](https://github.com/dbt-labs/dbt-spark/pull/349)) - -### Features -- Add grants to materializations ([#366](https://github.com/dbt-labs/dbt-spark/issues/366), [#381](https://github.com/dbt-labs/dbt-spark/pull/381)) - -### Under the hood -- Update `SparkColumn.numeric_type` to return `decimal` instead of `numeric`, since SparkSQL exclusively supports the former ([#380](https://github.com/dbt-labs/dbt-spark/pull/380)) -- Make minimal changes to support dbt Core incremental materialization refactor ([#402](https://github.com/dbt-labs/dbt-spark/issue/402), [#394](httpe://github.com/dbt-labs/dbt-spark/pull/394)) - -### Contributors -- [@grindheim](https://github.com/grindheim) ([#287](https://github.com/dbt-labs/dbt-spark/pull/287/)) -- [@nssalian](https://github.com/nssalian) ([#349](https://github.com/dbt-labs/dbt-spark/pull/349)) - -## dbt-spark 1.2.0b1 (June 24, 2022) - -### Fixes -- `adapter.get_columns_in_relation` (method) and `get_columns_in_relation` (macro) now return identical responses. The previous behavior of `get_columns_in_relation` (macro) is now represented by a new macro, `get_columns_in_relation_raw` ([#354](https://github.com/dbt-labs/dbt-spark/issues/354), [#355](https://github.com/dbt-labs/dbt-spark/pull/355)) - -### Under the hood -- Initialize lift + shift for cross-db macros ([#359](https://github.com/dbt-labs/dbt-spark/pull/359)) -- Add invocation env to user agent string ([#367](https://github.com/dbt-labs/dbt-spark/pull/367)) -- Use dispatch pattern for get_columns_in_relation_raw macro ([#365](https://github.com/dbt-labs/dbt-spark/pull/365)) - -### Contributors -- [@ueshin](https://github.com/ueshin) ([#365](https://github.com/dbt-labs/dbt-spark/pull/365)) -- [@dbeatty10](https://github.com/dbeatty10) ([#359](https://github.com/dbt-labs/dbt-spark/pull/359)) - -## dbt-spark 1.1.0 (April 28, 2022) - -### Features -- Add session connection method ([#272](https://github.com/dbt-labs/dbt-spark/issues/272), [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) -- rename file to match reference to dbt-core ([#344](https://github.com/dbt-labs/dbt-spark/pull/344)) - -### Under the hood -- Add precommit tooling to this repo ([#356](https://github.com/dbt-labs/dbt-spark/pull/356)) -- Use dbt.tests.adapter.basic in test suite ([#298](https://github.com/dbt-labs/dbt-spark/issues/298), [#299](https://github.com/dbt-labs/dbt-spark/pull/299)) -- Make internal macros use macro dispatch to be overridable in child adapters ([#319](https://github.com/dbt-labs/dbt-spark/issues/319), [#320](https://github.com/dbt-labs/dbt-spark/pull/320)) -- Override adapter method 'run_sql_for_tests' ([#323](https://github.com/dbt-labs/dbt-spark/issues/323), [#324](https://github.com/dbt-labs/dbt-spark/pull/324)) -- when a table or view doesn't exist, 'adapter.get_columns_in_relation' will return empty list instead of fail ([#328]https://github.com/dbt-labs/dbt-spark/pull/328) - -### Contributors -- [@JCZuurmond](https://github.com/dbt-labs/dbt-spark/pull/279) ( [#279](https://github.com/dbt-labs/dbt-spark/pull/279)) -- [@ueshin](https://github.com/ueshin) ([#320](https://github.com/dbt-labs/dbt-spark/pull/320)) - -## dbt-spark 1.1.0b1 (March 23, 2022) - -### Features -- Adds new integration test to check against new ability to allow unique_key to be a list. ([#282](https://github.com/dbt-labs/dbt-spark/issues/282)), [#291](https://github.com/dbt-labs/dbt-spark/pull/291)) - -### Fixes -- Closes the connection properly ([#280](https://github.com/dbt-labs/dbt-spark/issues/280), [#285](https://github.com/dbt-labs/dbt-spark/pull/285)) - -### Under the hood -- get_response -> AdapterResponse ([#265](https://github.com/dbt-labs/dbt-spark/pull/265)) -- Adding stale Actions workflow ([#275](https://github.com/dbt-labs/dbt-spark/pull/275)) -- Update plugin author name (`fishtown-analytics` → `dbt-labs`) in ODBC user agent ([#288](https://github.com/dbt-labs/dbt-spark/pull/288)) -- Configure insert_overwrite models to use parquet ([#301](https://github.com/dbt-labs/dbt-spark/pull/301)) - -### Contributors -- [@amychen1776](https://github.com/amychen1776) ([#288](https://github.com/dbt-labs/dbt-spark/pull/288)) -- [@ueshin](https://github.com/ueshin) ([#285](https://github.com/dbt-labs/dbt-spark/pull/285)) - -## dbt-spark 1.0.1rc0 (Release TBD) - -### Fixes -- Closes the connection properly ([#280](https://github.com/dbt-labs/dbt-spark/issues/280), [#285](https://github.com/dbt-labs/dbt-spark/pull/285)) - -### Contributors -- [@ueshin](https://github.com/ueshin) ([#285](https://github.com/dbt-labs/dbt-spark/pull/285)) - -## dbt-spark 1.0.0 (December 3, 2021) - -### Fixes -- Incremental materialization corrected to respect `full_refresh` config, by using `should_full_refresh()` macro ([#260](https://github.com/dbt-labs/dbt-spark/issues/260), [#262](https://github.com/dbt-labs/dbt-spark/pull/262/)) - -### Contributors -- [@grindheim](https://github.com/grindheim) ([#262](https://github.com/dbt-labs/dbt-spark/pull/262/)) - -## dbt-spark 1.0.0rc2 (November 24, 2021) - -### Features -- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies ([#187](https://github.com/dbt-labs/dbt-spark/issues/187), [#210](https://github.com/dbt-labs/dbt-spark/pull/210)) - -### Under the hood -- Refactor seed macros: remove duplicated code from dbt-core, and provide clearer logging of SQL parameters that differ by connection method ([#249](https://github.com/dbt-labs/dbt-spark/issues/249), [#250](https://github.com/dbt-labs/dbt-snowflake/pull/250)) -- Replace `sample_profiles.yml` with `profile_template.yml`, for use with new `dbt init` ([#247](https://github.com/dbt-labs/dbt-spark/pull/247)) - -### Contributors -- [@vingov](https://github.com/vingov) ([#210](https://github.com/dbt-labs/dbt-spark/pull/210)) - -## dbt-spark 1.0.0rc1 (November 10, 2021) - -### Under the hood -- Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https://github.com/dbt-labs/dbt-core/issues/4134), [#253](https://github.com/dbt-labs/dbt-snowflake/pull/253)) -- Add support for structured logging ([#251](https://github.com/dbt-labs/dbt-spark/pull/251)) - -## dbt-spark 0.21.1 (Release TBD) - -## dbt-spark 0.21.1rc1 (November 3, 2021) - -### Fixes -- Fix `--store-failures` for tests, by suppressing irrelevant error in `comment_clause()` macro ([#232](https://github.com/dbt-labs/dbt-spark/issues/232), [#233](https://github.com/dbt-labs/dbt-spark/pull/233)) -- Add support for `on_schema_change` config in incremental models: `ignore`, `fail`, `append_new_columns`. For `sync_all_columns`, removing columns is not supported by Apache Spark or Delta Lake ([#198](https://github.com/dbt-labs/dbt-spark/issues/198), [#226](https://github.com/dbt-labs/dbt-spark/issues/226), [#229](https://github.com/dbt-labs/dbt-spark/pull/229)) -- Add `persist_docs` call to incremental model ([#224](https://github.com/dbt-labs/dbt-spark/issues/224), [#234](https://github.com/dbt-labs/dbt-spark/pull/234)) - -### Contributors -- [@binhnefits](https://github.com/binhnefits) ([#234](https://github.com/dbt-labs/dbt-spark/pull/234)) - -## dbt-spark 0.21.0 (October 4, 2021) - -### Fixes -- Enhanced get_columns_in_relation method to handle a bug in open source deltalake which doesnt return schema details in `show table extended in databasename like '*'` query output. This impacts dbt snapshots if file format is open source deltalake ([#207](https://github.com/dbt-labs/dbt-spark/pull/207)) -- Parse properly columns when there are struct fields to avoid considering inner fields: Issue ([#202](https://github.com/dbt-labs/dbt-spark/issues/202)) - -### Under the hood -- Add `unique_field` to better understand adapter adoption in anonymous usage tracking ([#211](https://github.com/dbt-labs/dbt-spark/pull/211)) - -### Contributors -- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207)) -- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204)) - -## dbt-spark 0.21.0b2 (August 20, 2021) - -### Fixes -- Add pyodbc import error message to dbt.exceptions.RuntimeException to get more detailed information when running `dbt debug` ([#192](https://github.com/dbt-labs/dbt-spark/pull/192)) -- Add support for ODBC Server Side Parameters, allowing options that need to be set with the `SET` statement to be used ([#201](https://github.com/dbt-labs/dbt-spark/pull/201)) -- Add `retry_all` configuration setting to retry all connection issues, not just when the `_is_retryable_error` function determines ([#194](https://github.com/dbt-labs/dbt-spark/pull/194)) - -### Contributors -- [@JCZuurmond](https://github.com/JCZuurmond) ([#192](https://github.com/fishtown-analytics/dbt-spark/pull/192)) -- [@jethron](https://github.com/jethron) ([#201](https://github.com/fishtown-analytics/dbt-spark/pull/201)) -- [@gregingenii](https://github.com/gregingenii) ([#194](https://github.com/dbt-labs/dbt-spark/pull/194)) - -## dbt-spark 0.21.0b1 (August 3, 2021) - -## dbt-spark 0.20.1 (August 2, 2021) - -## dbt-spark 0.20.1rc1 (August 2, 2021) - -### Fixes -- Fix `get_columns_in_relation` when called on models created in the same run ([#196](https://github.com/dbt-labs/dbt-spark/pull/196), [#197](https://github.com/dbt-labs/dbt-spark/pull/197)) - -### Contributors -- [@ali-tny](https://github.com/ali-tny) ([#197](https://github.com/fishtown-analytics/dbt-spark/pull/197)) - - -## dbt-spark 0.20.0 (July 12, 2021) - -## dbt-spark 0.20.0rc2 (July 7, 2021) +## dbt-spark 1.3.0-b1 - July 29, 2022 ### Features - -- Add support for `merge_update_columns` config in `merge`-strategy incremental models ([#183](https://github.com/fishtown-analytics/dbt-spark/pull/183), [#184](https://github.com/fishtown-analytics/dbt-spark/pull/184)) - +- Support python model through notebook, currently supported materializations are table and incremental ([#417](https://github.com/dbt-labs/dbt-spark/issues/417), [#377](https://github.com/dbt-labs/dbt-spark/pull/377)) ### Fixes - -- Fix column-level `persist_docs` on Delta tables, add tests ([#180](https://github.com/fishtown-analytics/dbt-spark/pull/180)) - -## dbt-spark 0.20.0rc1 (June 8, 2021) - -### Features - -- Allow user to specify `use_ssl` ([#169](https://github.com/fishtown-analytics/dbt-spark/pull/169)) -- Allow setting table `OPTIONS` using `config` ([#171](https://github.com/fishtown-analytics/dbt-spark/pull/171)) -- Add support for column-level `persist_docs` on Delta tables ([#84](https://github.com/fishtown-analytics/dbt-spark/pull/84), [#170](https://github.com/fishtown-analytics/dbt-spark/pull/170)) - -### Fixes -- Cast `table_owner` to string to avoid errors generating docs ([#158](https://github.com/fishtown-analytics/dbt-spark/pull/158), [#159](https://github.com/fishtown-analytics/dbt-spark/pull/159)) -- Explicitly cast column types when inserting seeds ([#139](https://github.com/fishtown-analytics/dbt-spark/pull/139), [#166](https://github.com/fishtown-analytics/dbt-spark/pull/166)) - -### Under the hood -- Parse information returned by `list_relations_without_caching` macro to speed up catalog generation ([#93](https://github.com/fishtown-analytics/dbt-spark/issues/93), [#160](https://github.com/fishtown-analytics/dbt-spark/pull/160)) -- More flexible host passing, https:// can be omitted ([#153](https://github.com/fishtown-analytics/dbt-spark/issues/153)) +- Pin pyodbc to version 4.0.32 to prevent overwriting libodbc.so and libltdl.so on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397), [#398](https://github.com/dbt-labs/dbt-spark/pull/398)) +### Under the Hood +- Support core incremental refactor ([#4402](https://github.com/dbt-labs/dbt-spark/issues/4402), [#394](https://github.com/dbt-labs/dbt-spark/pull/394)) ### Contributors -- [@friendofasquid](https://github.com/friendofasquid) ([#159](https://github.com/fishtown-analytics/dbt-spark/pull/159)) -- [@franloza](https://github.com/franloza) ([#160](https://github.com/fishtown-analytics/dbt-spark/pull/160)) -- [@Fokko](https://github.com/Fokko) ([#165](https://github.com/fishtown-analytics/dbt-spark/pull/165)) -- [@rahulgoyal2987](https://github.com/rahulgoyal2987) ([#169](https://github.com/fishtown-analytics/dbt-spark/pull/169)) -- [@JCZuurmond](https://github.com/JCZuurmond) ([#171](https://github.com/fishtown-analytics/dbt-spark/pull/171)) -- [@cristianoperez](https://github.com/cristianoperez) ([#170](https://github.com/fishtown-analytics/dbt-spark/pull/170)) - - -## dbt-spark 0.19.1 (April 2, 2021) - -## dbt-spark 0.19.1b2 (February 26, 2021) - -### Under the hood -- Update serialization calls to use new API in dbt-core `0.19.1b2` ([#150](https://github.com/fishtown-analytics/dbt-spark/pull/150)) - -## dbt-spark 0.19.0.1 (February 26, 2021) - -### Fixes -- Fix package distribution to include incremental model materializations ([#151](https://github.com/fishtown-analytics/dbt-spark/pull/151), [#152](https://github.com/fishtown-analytics/dbt-spark/issues/152)) - -## dbt-spark 0.19.0 (February 21, 2021) - -### Breaking changes -- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) - -### Fixes -- Capture hard-deleted records in snapshot merge, when `invalidate_hard_deletes` config is set ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/143), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/144)) - -## dbt-spark 0.19.0rc1 (January 8, 2021) - -### Breaking changes -- Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) - -### Under the hood -- Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. -- Add changelog, issue templates ([#119](https://github.com/fishtown-analytics/dbt-spark/pull/119), [#120](https://github.com/fishtown-analytics/dbt-spark/pull/120)) - -### Fixes -- Handle case of 0 retries better for HTTP Spark Connections ([#132](https://github.com/fishtown-analytics/dbt-spark/pull/132)) - -### Contributors -- [@danielvdende](https://github.com/danielvdende) ([#132](https://github.com/fishtown-analytics/dbt-spark/pull/132)) -- [@Fokko](https://github.com/Fokko) ([#125](https://github.com/fishtown-analytics/dbt-spark/pull/125)) - -## dbt-spark 0.18.1.1 (November 13, 2020) - -### Fixes -- Fix `extras_require` typo to enable `pip install dbt-spark[ODBC]` (([#121](https://github.com/fishtown-analytics/dbt-spark/pull/121)), ([#122](https://github.com/fishtown-analytics/dbt-spark/pull/122))) - -## dbt-spark 0.18.1 (November 6, 2020) - -### Features -- Allows users to specify `auth` and `kerberos_service_name` ([#107](https://github.com/fishtown-analytics/dbt-spark/pull/107)) -- Add support for ODBC driver connections to Databricks clusters and endpoints ([#116](https://github.com/fishtown-analytics/dbt-spark/pull/116)) - -### Under the hood -- Updated README links ([#115](https://github.com/fishtown-analytics/dbt-spark/pull/115)) -- Support complete atomic overwrite of non-partitioned incremental models ([#117](https://github.com/fishtown-analytics/dbt-spark/pull/117)) -- Update to support dbt-core 0.18.1 ([#110](https://github.com/fishtown-analytics/dbt-spark/pull/110), [#118](https://github.com/fishtown-analytics/dbt-spark/pull/118)) - -### Contributors -- [@danielhstahl](https://github.com/danielhstahl) ([#107](https://github.com/fishtown-analytics/dbt-spark/pull/107)) -- [@collinprather](https://github.com/collinprather) ([#115](https://github.com/fishtown-analytics/dbt-spark/pull/115)) -- [@charlottevdscheun](https://github.com/charlottevdscheun) ([#117](https://github.com/fishtown-analytics/dbt-spark/pull/117)) -- [@Fokko](https://github.com/Fokko) ([#117](https://github.com/fishtown-analytics/dbt-spark/pull/117)) - -## dbt-spark 0.18.0 (September 18, 2020) - -### Under the hood -- Make a number of changes to support dbt-adapter-tests ([#103](https://github.com/fishtown-analytics/dbt-spark/pull/103)) -- Update to support dbt-core 0.18.0. Run CI tests against local Spark, Databricks ([#105](https://github.com/fishtown-analytics/dbt-spark/pull/105)) +- [@barberscot](https://github.com/barberscot) ([#398](https://github.com/dbt-labs/dbt-spark/pull/398)) +## Previous Releases +For information on prior major and minor releases, see their changelogs: +- [1.2](https://github.com/dbt-labs/dbt-spark/blob/1.2.latest/CHANGELOG.md) +- [1.1](https://github.com/dbt-labs/dbt-spark/blob/1.1.latest/CHANGELOG.md) +- [1.0](https://github.com/dbt-labs/dbt-spark/blob/1.0.latest/CHANGELOG.md) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c0d9bb3d2..1d6e76d31 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -90,6 +90,16 @@ Many changes will require and update to the `dbt-spark` docs here are some usefu - The changes made are likely to impact one or both of [Spark Profile](https://docs.getdbt.com/reference/warehouse-profiles/spark-profile), or [Saprk Configs](https://docs.getdbt.com/reference/resource-configs/spark-configs). - We ask every community member who makes a user-facing change to open an issue or PR regarding doc changes. +## Adding CHANGELOG Entry + +We use [changie](https://changie.dev) to generate `CHANGELOG` entries. **Note:** Do not edit the `CHANGELOG.md` directly. Your modifications will be lost. + +Follow the steps to [install `changie`](https://changie.dev/guide/installation/) for your system. + +Once changie is installed and your PR is created, simply run `changie new` and changie will walk you through the process of creating a changelog entry. Commit the file that's created and your changelog entry is complete! + +You don't need to worry about which `dbt-spark` version your change will go into. Just create the changelog entry with `changie`, and open your PR against the `main` branch. All merged changes will be included in the next minor version of `dbt-spark`. The Core maintainers _may_ choose to "backport" specific changes in order to patch older minor versions. In that case, a maintainer will take care of that backport after merging your PR, before releasing the new version of `dbt-spark`. + ## Submitting a Pull Request dbt Labs provides a CI environment to test changes to the `dbt-spark` adapter, and periodic checks against the development version of `dbt-core` through Github Actions. From 24e796d52d0201bdb4c45fac2e99a2a848cbe853 Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Mon, 22 Aug 2022 10:23:05 -0500 Subject: [PATCH 04/14] Add ref to pre 1.0.0 in 0.0.0.md (#428) * init pr push for ct-1005 * add changelog * change pointer to spark * eof fix * remove ref to dbt-core in changelog existence * typo fix * typo and change of ref * add condtional logic for kinds security and dependency and change NO ISSUE ref to spark issue #417 * add ref to pre 1.0.0 changes * add ref to pre 1.0.0 changes * fix eof fail on test * fix eof fail on test * expand out ref to past 1.0.0 * run changie merge * repush changes * remove excess spacing --- .changes/0.0.0.md | 3 +++ CHANGELOG.md | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/.changes/0.0.0.md b/.changes/0.0.0.md index 5acfb3dbc..14c2cf9e7 100644 --- a/.changes/0.0.0.md +++ b/.changes/0.0.0.md @@ -3,3 +3,6 @@ For information on prior major and minor releases, see their changelogs: - [1.2](https://github.com/dbt-labs/dbt-spark/blob/1.2.latest/CHANGELOG.md) - [1.1](https://github.com/dbt-labs/dbt-spark/blob/1.1.latest/CHANGELOG.md) - [1.0](https://github.com/dbt-labs/dbt-spark/blob/1.0.latest/CHANGELOG.md) +- [0.21](https://github.com/dbt-labs/dbt-spark/blob/0.21.latest/CHANGELOG.md) +- [0.20](https://github.com/dbt-labs/dbt-spark/blob/0.20.latest/CHANGELOG.md) +- [0.19 and earlier](https://github.com/dbt-labs/dbt-spark/blob/0.19.latest/CHANGELOG.md) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f187e31e..0491a7b5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,15 +9,21 @@ ### Features - Support python model through notebook, currently supported materializations are table and incremental ([#417](https://github.com/dbt-labs/dbt-spark/issues/417), [#377](https://github.com/dbt-labs/dbt-spark/pull/377)) + ### Fixes - Pin pyodbc to version 4.0.32 to prevent overwriting libodbc.so and libltdl.so on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397), [#398](https://github.com/dbt-labs/dbt-spark/pull/398)) + ### Under the Hood - Support core incremental refactor ([#4402](https://github.com/dbt-labs/dbt-spark/issues/4402), [#394](https://github.com/dbt-labs/dbt-spark/pull/394)) ### Contributors - [@barberscot](https://github.com/barberscot) ([#398](https://github.com/dbt-labs/dbt-spark/pull/398)) + ## Previous Releases For information on prior major and minor releases, see their changelogs: - [1.2](https://github.com/dbt-labs/dbt-spark/blob/1.2.latest/CHANGELOG.md) - [1.1](https://github.com/dbt-labs/dbt-spark/blob/1.1.latest/CHANGELOG.md) - [1.0](https://github.com/dbt-labs/dbt-spark/blob/1.0.latest/CHANGELOG.md) +- [0.21](https://github.com/dbt-labs/dbt-spark/blob/0.21.latest/CHANGELOG.md) +- [0.20](https://github.com/dbt-labs/dbt-spark/blob/0.20.latest/CHANGELOG.md) +- [0.19 and earlier](https://github.com/dbt-labs/dbt-spark/blob/0.19.latest/CHANGELOG.md) From c9698f62118b9c5408b53bb8cc3be03ae5d3d8a4 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 25 Aug 2022 10:02:50 -0700 Subject: [PATCH 05/14] add supported language (#440) * add supported language * add changelog --- .changes/unreleased/Under the Hood-20220825-073413.yaml | 7 +++++++ .../macros/materializations/incremental/incremental.sql | 2 +- dbt/include/spark/macros/materializations/table.sql | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20220825-073413.yaml diff --git a/.changes/unreleased/Under the Hood-20220825-073413.yaml b/.changes/unreleased/Under the Hood-20220825-073413.yaml new file mode 100644 index 000000000..71e187ca7 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220825-073413.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: specify supported_languages for materialization that support python models +time: 2022-08-25T07:34:13.397367-07:00 +custom: + Author: ChenyuLInx + Issue: "437" + PR: "440" diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 91cba9e5f..1a92351ce 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -1,4 +1,4 @@ -{% materialization incremental, adapter='spark' -%} +{% materialization incremental, adapter='spark', supported_languages=['sql', 'python'] -%} {#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#} {%- set raw_file_format = config.get('file_format', default='parquet') -%} {%- set raw_strategy = config.get('incremental_strategy') or 'append' -%} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 6a02ea164..d39ba0b44 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -1,4 +1,4 @@ -{% materialization table, adapter = 'spark' %} +{% materialization table, adapter = 'spark', supported_languages=['sql', 'python'] %} {%- set language = model['language'] -%} {%- set identifier = model['alias'] -%} {%- set grant_config = config.get('grants') -%} From 5297b9225263fb33338fc54f004365ec1ad47104 Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Thu, 25 Aug 2022 15:27:29 -0500 Subject: [PATCH 06/14] version bump, changie. and backports (#434) --- .github/workflows/backport.yml | 42 +++++++++++++ .github/workflows/version-bump.yml | 97 ++++-------------------------- 2 files changed, 53 insertions(+), 86 deletions(-) create mode 100644 .github/workflows/backport.yml diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml new file mode 100644 index 000000000..8c0355bda --- /dev/null +++ b/.github/workflows/backport.yml @@ -0,0 +1,42 @@ + + +# **what?** +# When a PR is merged, if it has the backport label, it will create +# a new PR to backport those changes to the given branch. If it can't +# cleanly do a backport, it will comment on the merged PR of the failure. +# +# Label naming convention: "backport " +# Example: backport 1.0.latest +# +# You MUST "Squash and merge" the original PR or this won't work. + +# **why?** +# Changes sometimes need to be backported to release branches. +# This automates the backporting process + +# **when?** +# Once a PR is "Squash and merge"'d, by adding a backport label, this is triggered + +name: Backport +on: + pull_request: + types: + - labeled + +permissions: + contents: write + pull-requests: write + +jobs: + backport: + name: Backport + runs-on: ubuntu-latest + # Only react to merged PRs for security reasons. + # See https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#pull_request_target. + if: > + github.event.pull_request.merged + && contains(github.event.label.name, 'backport') + steps: + - uses: tibdex/backport@v2.0.2 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/version-bump.yml b/.github/workflows/version-bump.yml index a8b3236ce..bde34d683 100644 --- a/.github/workflows/version-bump.yml +++ b/.github/workflows/version-bump.yml @@ -1,18 +1,15 @@ # **what?** -# This workflow will take a version number and a dry run flag. With that +# This workflow will take the new version number to bump to. With that # it will run versionbump to update the version number everywhere in the -# code base and then generate an update Docker requirements file. If this -# is a dry run, a draft PR will open with the changes. If this isn't a dry -# run, the changes will be committed to the branch this is run on. +# code base and then run changie to create the corresponding changelog. +# A PR will be created with the changes that can be reviewed before committing. # **why?** # This is to aid in releasing dbt and making sure we have updated -# the versions and Docker requirements in all places. +# the version in all places and generated the changelog. # **when?** -# This is triggered either manually OR -# from the repository_dispatch event "version-bump" which is sent from -# the dbt-release repo Action +# This is triggered manually name: Version Bump @@ -20,84 +17,12 @@ on: workflow_dispatch: inputs: version_number: - description: 'The version number to bump to' + description: 'The version number to bump to (ex. 1.2.0, 1.3.0b1)' required: true - is_dry_run: - description: 'Creates a draft PR to allow testing instead of committing to a branch' - required: true - default: 'true' - repository_dispatch: - types: [version-bump] jobs: - bump: - runs-on: ubuntu-latest - steps: - - name: Check out the repository - uses: actions/checkout@v2 - - - name: Set version and dry run values - id: variables - env: - VERSION_NUMBER: "${{ github.event.client_payload.version_number == '' && github.event.inputs.version_number || github.event.client_payload.version_number }}" - IS_DRY_RUN: "${{ github.event.client_payload.is_dry_run == '' && github.event.inputs.is_dry_run || github.event.client_payload.is_dry_run }}" - run: | - echo Repository dispatch event version: ${{ github.event.client_payload.version_number }} - echo Repository dispatch event dry run: ${{ github.event.client_payload.is_dry_run }} - echo Workflow dispatch event version: ${{ github.event.inputs.version_number }} - echo Workflow dispatch event dry run: ${{ github.event.inputs.is_dry_run }} - echo ::set-output name=VERSION_NUMBER::$VERSION_NUMBER - echo ::set-output name=IS_DRY_RUN::$IS_DRY_RUN - - - uses: actions/setup-python@v2 - with: - python-version: "3.8" - - - name: Install python dependencies - run: | - sudo apt-get install libsasl2-dev - python3 -m venv env - source env/bin/activate - pip install --upgrade pip - - - name: Create PR branch - if: ${{ steps.variables.outputs.IS_DRY_RUN == 'true' }} - run: | - git checkout -b bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID - git push origin bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID - git branch --set-upstream-to=origin/bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_$GITHUB_RUN_ID - - - name: Bumping version - run: | - source env/bin/activate - pip install -r dev-requirements.txt - env/bin/bumpversion --allow-dirty --new-version ${{steps.variables.outputs.VERSION_NUMBER}} major - git status - - - name: Commit version bump directly - uses: EndBug/add-and-commit@v7 - if: ${{ steps.variables.outputs.IS_DRY_RUN == 'false' }} - with: - author_name: 'Github Build Bot' - author_email: 'buildbot@fishtownanalytics.com' - message: 'Bumping version to ${{steps.variables.outputs.VERSION_NUMBER}}' - - - name: Commit version bump to branch - uses: EndBug/add-and-commit@v7 - if: ${{ steps.variables.outputs.IS_DRY_RUN == 'true' }} - with: - author_name: 'Github Build Bot' - author_email: 'buildbot@fishtownanalytics.com' - message: 'Bumping version to ${{steps.variables.outputs.VERSION_NUMBER}}' - branch: 'bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_${{GITHUB.RUN_ID}}' - push: 'origin origin/bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_${{GITHUB.RUN_ID}}' - - - name: Create Pull Request - uses: peter-evans/create-pull-request@v3 - if: ${{ steps.variables.outputs.IS_DRY_RUN == 'true' }} - with: - author: 'Github Build Bot ' - draft: true - base: ${{github.ref}} - title: 'Bumping version to ${{steps.variables.outputs.VERSION_NUMBER}}' - branch: 'bumping-version/${{steps.variables.outputs.VERSION_NUMBER}}_${{GITHUB.RUN_ID}}' + version_bump_and_changie: + uses: dbt-labs/actions/.github/workflows/version-bump.yml@main + with: + version_number: ${{ inputs.version_number }} + secrets: inherit # ok since what we are calling is internally maintained From 224cc28004122f478a965acb9f5deff788bbdd72 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 30 Aug 2022 11:20:35 -0400 Subject: [PATCH 07/14] Bumping version to 1.3.0b2 and generate changelog (#443) * Bumping version to 1.3.0b2 and generate CHANGELOG * Remove newline Co-authored-by: Github Build Bot Co-authored-by: leahwicz <60146280+leahwicz@users.noreply.github.com> --- .bumpversion.cfg | 2 +- .changes/1.3.0-b2.md | 5 +++++ .../{unreleased => 1.3.0}/Features-20220808-142118.yaml | 0 .../Under the Hood-20220825-073413.yaml | 0 CHANGELOG.md | 8 +++++--- dbt/adapters/spark/__version__.py | 2 +- setup.py | 2 +- 7 files changed, 13 insertions(+), 6 deletions(-) create mode 100644 .changes/1.3.0-b2.md rename .changes/{unreleased => 1.3.0}/Features-20220808-142118.yaml (100%) rename .changes/{unreleased => 1.3.0}/Under the Hood-20220825-073413.yaml (100%) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index ef3954f4c..f93a02ae6 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.3.0b1 +current_version = 1.3.0b2 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/.changes/1.3.0-b2.md b/.changes/1.3.0-b2.md new file mode 100644 index 000000000..8f7ea1e62 --- /dev/null +++ b/.changes/1.3.0-b2.md @@ -0,0 +1,5 @@ +## dbt-spark 1.3.0-b2 - August 30, 2022 +### Features +- Add changie to dbt-spark ([#416](https://github.com/dbt-labs/dbt-spark/issues/416), [#418](https://github.com/dbt-labs/dbt-spark/pull/418)) +### Under the Hood +- specify supported_languages for materialization that support python models ([#437](https://github.com/dbt-labs/dbt-spark/issues/437), [#440](https://github.com/dbt-labs/dbt-spark/pull/440)) diff --git a/.changes/unreleased/Features-20220808-142118.yaml b/.changes/1.3.0/Features-20220808-142118.yaml similarity index 100% rename from .changes/unreleased/Features-20220808-142118.yaml rename to .changes/1.3.0/Features-20220808-142118.yaml diff --git a/.changes/unreleased/Under the Hood-20220825-073413.yaml b/.changes/1.3.0/Under the Hood-20220825-073413.yaml similarity index 100% rename from .changes/unreleased/Under the Hood-20220825-073413.yaml rename to .changes/1.3.0/Under the Hood-20220825-073413.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 0491a7b5f..de20a0738 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,21 +4,23 @@ - Changes are listed under the (pre)release in which they first appear. Subsequent releases include changes from previous releases. - "Breaking changes" listed under a version may require action from end users or external maintainers when upgrading to that version. - Do not edit this file directly. This file is auto-generated using [changie](https://github.com/miniscruff/changie). For details on how to document a change, see [the contributing guide](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.md#adding-changelog-entry) +## dbt-spark 1.3.0-b2 - August 30, 2022 +### Features +- Add changie to dbt-spark ([#416](https://github.com/dbt-labs/dbt-spark/issues/416), [#418](https://github.com/dbt-labs/dbt-spark/pull/418)) +### Under the Hood +- specify supported_languages for materialization that support python models ([#437](https://github.com/dbt-labs/dbt-spark/issues/437), [#440](https://github.com/dbt-labs/dbt-spark/pull/440)) ## dbt-spark 1.3.0-b1 - July 29, 2022 ### Features - Support python model through notebook, currently supported materializations are table and incremental ([#417](https://github.com/dbt-labs/dbt-spark/issues/417), [#377](https://github.com/dbt-labs/dbt-spark/pull/377)) - ### Fixes - Pin pyodbc to version 4.0.32 to prevent overwriting libodbc.so and libltdl.so on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397), [#398](https://github.com/dbt-labs/dbt-spark/pull/398)) - ### Under the Hood - Support core incremental refactor ([#4402](https://github.com/dbt-labs/dbt-spark/issues/4402), [#394](https://github.com/dbt-labs/dbt-spark/pull/394)) ### Contributors - [@barberscot](https://github.com/barberscot) ([#398](https://github.com/dbt-labs/dbt-spark/pull/398)) - ## Previous Releases For information on prior major and minor releases, see their changelogs: - [1.2](https://github.com/dbt-labs/dbt-spark/blob/1.2.latest/CHANGELOG.md) diff --git a/dbt/adapters/spark/__version__.py b/dbt/adapters/spark/__version__.py index 4b49b750d..e2c1a233c 100644 --- a/dbt/adapters/spark/__version__.py +++ b/dbt/adapters/spark/__version__.py @@ -1 +1 @@ -version = "1.3.0b1" +version = "1.3.0b2" diff --git a/setup.py b/setup.py index 229e89a17..05e814490 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ def _get_dbt_core_version(): package_name = "dbt-spark" -package_version = "1.3.0b1" +package_version = "1.3.0b2" dbt_core_version = _get_dbt_core_version() description = """The Apache Spark adapter plugin for dbt""" From cef098f5181c51e9a6ae06c157ec6863852bcd22 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 30 Aug 2022 16:49:12 -0700 Subject: [PATCH 08/14] refactor submission method and add command API as defualt (#442) * refactor submission method and add command API as defualt * update run_name and add changelog * fix format * pr feedback --- .../Under the Hood-20220829-164426.yaml | 7 + dbt/adapters/spark/impl.py | 108 +------ dbt/adapters/spark/python_submissions.py | 284 ++++++++++++++++++ 3 files changed, 300 insertions(+), 99 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20220829-164426.yaml create mode 100644 dbt/adapters/spark/python_submissions.py diff --git a/.changes/unreleased/Under the Hood-20220829-164426.yaml b/.changes/unreleased/Under the Hood-20220829-164426.yaml new file mode 100644 index 000000000..bf58971f2 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220829-164426.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Submit python model with Command API by default. Adjusted run name +time: 2022-08-29T16:44:26.509138-07:00 +custom: + Author: ChenyuLInx + Issue: "424" + PR: "442" diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 12c42ab98..6e97ce1f5 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,7 +1,4 @@ import re -import requests -import time -import base64 from concurrent.futures import Future from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Union @@ -20,6 +17,7 @@ from dbt.adapters.spark import SparkConnectionManager from dbt.adapters.spark import SparkRelation from dbt.adapters.spark import SparkColumn +from dbt.adapters.spark.python_submissions import PYTHON_SUBMISSION_HELPERS from dbt.adapters.base import BaseRelation from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER from dbt.events import AdapterLogger @@ -394,105 +392,17 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None # of `None` which evaluates to True! # TODO limit this function to run only when doing the materialization of python nodes - # assuming that for python job running over 1 day user would mannually overwrite this - schema = getattr(parsed_model, "schema", self.config.credentials.schema) - identifier = parsed_model["alias"] - if not timeout: - timeout = 60 * 60 * 24 - if timeout <= 0: - raise ValueError("Timeout must larger than 0") - - auth_header = {"Authorization": f"Bearer {self.connections.profile.credentials.token}"} - - # create new dir - if not self.connections.profile.credentials.user: - raise ValueError("Need to supply user in profile to submit python job") - # it is safe to call mkdirs even if dir already exists and have content inside - work_dir = f"/Users/{self.connections.profile.credentials.user}/{schema}" - response = requests.post( - f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/mkdirs", - headers=auth_header, - json={ - "path": work_dir, - }, - ) - if response.status_code != 200: - raise dbt.exceptions.RuntimeException( - f"Error creating work_dir for python notebooks\n {response.content!r}" + submission_method = parsed_model["config"].get("submission_method", "commands") + if submission_method not in PYTHON_SUBMISSION_HELPERS: + raise NotImplementedError( + "Submission method {} is not supported".format(submission_method) ) - - # add notebook - b64_encoded_content = base64.b64encode(compiled_code.encode()).decode() - response = requests.post( - f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/import", - headers=auth_header, - json={ - "path": f"{work_dir}/{identifier}", - "content": b64_encoded_content, - "language": "PYTHON", - "overwrite": True, - "format": "SOURCE", - }, + job_helper = PYTHON_SUBMISSION_HELPERS[submission_method]( + parsed_model, self.connections.profile.credentials ) - if response.status_code != 200: - raise dbt.exceptions.RuntimeException( - f"Error creating python notebook.\n {response.content!r}" - ) - - # submit job - submit_response = requests.post( - f"https://{self.connections.profile.credentials.host}/api/2.1/jobs/runs/submit", - headers=auth_header, - json={ - "run_name": "debug task", - "existing_cluster_id": self.connections.profile.credentials.cluster, - "notebook_task": { - "notebook_path": f"{work_dir}/{identifier}", - }, - }, - ) - if submit_response.status_code != 200: - raise dbt.exceptions.RuntimeException( - f"Error creating python run.\n {response.content!r}" - ) - - # poll until job finish - state = None - start = time.time() - run_id = submit_response.json()["run_id"] - terminal_states = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"] - while state not in terminal_states and time.time() - start < timeout: - time.sleep(1) - resp = requests.get( - f"https://{self.connections.profile.credentials.host}" - f"/api/2.1/jobs/runs/get?run_id={run_id}", - headers=auth_header, - ) - json_resp = resp.json() - state = json_resp["state"]["life_cycle_state"] - # logger.debug(f"Polling.... in state: {state}") - if state != "TERMINATED": - raise dbt.exceptions.RuntimeException( - "python model run ended in state" - f"{state} with state_message\n{json_resp['state']['state_message']}" - ) - - # get end state to return to user - run_output = requests.get( - f"https://{self.connections.profile.credentials.host}" - f"/api/2.1/jobs/runs/get-output?run_id={run_id}", - headers=auth_header, - ) - json_run_output = run_output.json() - result_state = json_run_output["metadata"]["state"]["result_state"] - if result_state != "SUCCESS": - raise dbt.exceptions.RuntimeException( - "Python model failed with traceback as:\n" - "(Note that the line number here does not " - "match the line number in your code due to dbt templating)\n" - f"{json_run_output['error_trace']}" - ) + job_helper.submit(compiled_code) + # we don't really get any useful information back from the job submission other than success return self.connections.get_response(None) def standardize_grants_dict(self, grants_table: agate.Table) -> dict: diff --git a/dbt/adapters/spark/python_submissions.py b/dbt/adapters/spark/python_submissions.py new file mode 100644 index 000000000..ea172ef03 --- /dev/null +++ b/dbt/adapters/spark/python_submissions.py @@ -0,0 +1,284 @@ +import base64 +import time +import requests +from typing import Any, Dict +import uuid + +import dbt.exceptions + +DEFAULT_POLLING_INTERVAL = 3 +SUBMISSION_LANGUAGE = "python" +DEFAULT_TIMEOUT = 60 * 60 * 24 + + +class BasePythonJobHelper: + def __init__(self, parsed_model, credentials): + self.check_credentials(credentials) + self.credentials = credentials + self.identifier = parsed_model["alias"] + self.schema = getattr(parsed_model, "schema", self.credentials.schema) + self.parsed_model = parsed_model + self.timeout = self.get_timeout() + self.polling_interval = DEFAULT_POLLING_INTERVAL + + def get_timeout(self): + timeout = self.parsed_model["config"].get("timeout", DEFAULT_TIMEOUT) + if timeout <= 0: + raise ValueError("Timeout must be a positive integer") + return timeout + + def check_credentials(self, credentials): + raise NotImplementedError( + "Overwrite this method to check specific requirement for current submission method" + ) + + def submit(self, compiled_code): + raise NotImplementedError( + "BasePythonJobHelper is an abstract class and you should implement submit method." + ) + + def polling( + self, + status_func, + status_func_kwargs, + get_state_func, + terminal_states, + expected_end_state, + get_state_msg_func, + ): + state = None + start = time.time() + exceeded_timeout = False + response = {} + while state not in terminal_states: + if time.time() - start > self.timeout: + exceeded_timeout = True + break + # TODO should we do exponential backoff? + time.sleep(self.polling_interval) + response = status_func(**status_func_kwargs) + state = get_state_func(response) + if exceeded_timeout: + raise dbt.exceptions.RuntimeException("python model run timed out") + if state != expected_end_state: + raise dbt.exceptions.RuntimeException( + "python model run ended in state" + f"{state} with state_message\n{get_state_msg_func(response)}" + ) + return response + + +class DBNotebookPythonJobHelper(BasePythonJobHelper): + def __init__(self, parsed_model, credentials): + super().__init__(parsed_model, credentials) + self.auth_header = {"Authorization": f"Bearer {self.credentials.token}"} + + def check_credentials(self, credentials): + if not credentials.user: + raise ValueError("Databricks user is required for notebook submission method.") + + def _create_work_dir(self, path): + response = requests.post( + f"https://{self.credentials.host}/api/2.0/workspace/mkdirs", + headers=self.auth_header, + json={ + "path": path, + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating work_dir for python notebooks\n {response.content!r}" + ) + + def _upload_notebook(self, path, compiled_code): + b64_encoded_content = base64.b64encode(compiled_code.encode()).decode() + response = requests.post( + f"https://{self.credentials.host}/api/2.0/workspace/import", + headers=self.auth_header, + json={ + "path": path, + "content": b64_encoded_content, + "language": "PYTHON", + "overwrite": True, + "format": "SOURCE", + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating python notebook.\n {response.content!r}" + ) + + def _submit_notebook(self, path): + submit_response = requests.post( + f"https://{self.credentials.host}/api/2.1/jobs/runs/submit", + headers=self.auth_header, + json={ + "run_name": f"{self.schema}-{self.identifier}-{uuid.uuid4()}", + "existing_cluster_id": self.credentials.cluster, + "notebook_task": { + "notebook_path": path, + }, + }, + ) + if submit_response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating python run.\n {submit_response.content!r}" + ) + return submit_response.json()["run_id"] + + def submit(self, compiled_code): + # it is safe to call mkdirs even if dir already exists and have content inside + work_dir = f"/Users/{self.credentials.user}/{self.schema}/" + self._create_work_dir(work_dir) + + # add notebook + whole_file_path = f"{work_dir}{self.identifier}" + self._upload_notebook(whole_file_path, compiled_code) + + # submit job + run_id = self._submit_notebook(whole_file_path) + + self.polling( + status_func=requests.get, + status_func_kwargs={ + "url": f"https://{self.credentials.host}/api/2.1/jobs/runs/get?run_id={run_id}", + "headers": self.auth_header, + }, + get_state_func=lambda response: response.json()["state"]["life_cycle_state"], + terminal_states=("TERMINATED", "SKIPPED", "INTERNAL_ERROR"), + expected_end_state="TERMINATED", + get_state_msg_func=lambda response: response.json()["state"]["state_message"], + ) + + # get end state to return to user + run_output = requests.get( + f"https://{self.credentials.host}" f"/api/2.1/jobs/runs/get-output?run_id={run_id}", + headers=self.auth_header, + ) + json_run_output = run_output.json() + result_state = json_run_output["metadata"]["state"]["result_state"] + if result_state != "SUCCESS": + raise dbt.exceptions.RuntimeException( + "Python model failed with traceback as:\n" + "(Note that the line number here does not " + "match the line number in your code due to dbt templating)\n" + f"{json_run_output['error_trace']}" + ) + + +class DBContext: + def __init__(self, credentials): + self.auth_header = {"Authorization": f"Bearer {credentials.token}"} + self.cluster = credentials.cluster + self.host = credentials.host + + def create(self) -> str: + # https://docs.databricks.com/dev-tools/api/1.2/index.html#create-an-execution-context + response = requests.post( + f"https://{self.host}/api/1.2/contexts/create", + headers=self.auth_header, + json={ + "clusterId": self.cluster, + "language": SUBMISSION_LANGUAGE, + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating an execution context.\n {response.content!r}" + ) + return response.json()["id"] + + def destroy(self, context_id: str) -> str: + # https://docs.databricks.com/dev-tools/api/1.2/index.html#delete-an-execution-context + response = requests.post( + f"https://{self.host}/api/1.2/contexts/destroy", + headers=self.auth_header, + json={ + "clusterId": self.cluster, + "contextId": context_id, + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error deleting an execution context.\n {response.content!r}" + ) + return response.json()["id"] + + +class DBCommand: + def __init__(self, credentials): + self.auth_header = {"Authorization": f"Bearer {credentials.token}"} + self.cluster = credentials.cluster + self.host = credentials.host + + def execute(self, context_id: str, command: str) -> str: + # https://docs.databricks.com/dev-tools/api/1.2/index.html#run-a-command + response = requests.post( + f"https://{self.host}/api/1.2/commands/execute", + headers=self.auth_header, + json={ + "clusterId": self.cluster, + "contextId": context_id, + "language": SUBMISSION_LANGUAGE, + "command": command, + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error creating a command.\n {response.content!r}" + ) + return response.json()["id"] + + def status(self, context_id: str, command_id: str) -> Dict[str, Any]: + # https://docs.databricks.com/dev-tools/api/1.2/index.html#get-information-about-a-command + response = requests.get( + f"https://{self.host}/api/1.2/commands/status", + headers=self.auth_header, + params={ + "clusterId": self.cluster, + "contextId": context_id, + "commandId": command_id, + }, + ) + if response.status_code != 200: + raise dbt.exceptions.RuntimeException( + f"Error getting status of command.\n {response.content!r}" + ) + return response.json() + + +class DBCommandsApiPythonJobHelper(BasePythonJobHelper): + def check_credentials(self, credentials): + if not credentials.cluster: + raise ValueError("Databricks cluster is required for commands submission method.") + + def submit(self, compiled_code): + context = DBContext(self.credentials) + command = DBCommand(self.credentials) + context_id = context.create() + try: + command_id = command.execute(context_id, compiled_code) + # poll until job finish + response = self.polling( + status_func=command.status, + status_func_kwargs={ + "context_id": context_id, + "command_id": command_id, + }, + get_state_func=lambda response: response["status"], + terminal_states=("Cancelled", "Error", "Finished"), + expected_end_state="Finished", + get_state_msg_func=lambda response: response.json()["results"]["data"], + ) + if response["results"]["resultType"] == "error": + raise dbt.exceptions.RuntimeException( + f"Python model failed with traceback as:\n" f"{response['results']['cause']}" + ) + finally: + context.destroy(context_id) + + +PYTHON_SUBMISSION_HELPERS = { + "notebook": DBNotebookPythonJobHelper, + "commands": DBCommandsApiPythonJobHelper, +} From ebd011ea71ba533e065cb167f6c8213753fa6e9e Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 31 Aug 2022 07:21:14 -0700 Subject: [PATCH 09/14] set tmp relation with proper schema (#445) * set tmp relation with proper schema * add changelog --- .changes/unreleased/Fixes-20220830-140224.yaml | 7 +++++++ dbt/include/spark/macros/adapters.sql | 7 +++---- .../macros/materializations/incremental/incremental.sql | 5 +++++ .../macros/materializations/incremental/strategies.sql | 6 +++--- 4 files changed, 18 insertions(+), 7 deletions(-) create mode 100644 .changes/unreleased/Fixes-20220830-140224.yaml diff --git a/.changes/unreleased/Fixes-20220830-140224.yaml b/.changes/unreleased/Fixes-20220830-140224.yaml new file mode 100644 index 000000000..9e3da3ea6 --- /dev/null +++ b/.changes/unreleased/Fixes-20220830-140224.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: python incremental model tmp table using correct schema +time: 2022-08-30T14:02:24.603033-07:00 +custom: + Author: ChenyuLInx + Issue: "441" + PR: "445" diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 05630ede5..88190cc04 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -123,7 +123,7 @@ {#-- We can't use temporary tables with `create ... as ()` syntax --#} {% macro spark__create_temporary_view(relation, compiled_code) -%} - create temporary view {{ relation.include(schema=false) }} as + create temporary view {{ relation }} as {{ compiled_code }} {%- endmacro -%} @@ -185,7 +185,7 @@ {% macro spark__get_columns_in_relation_raw(relation) -%} {% call statement('get_columns_in_relation_raw', fetch_result=True) %} - describe extended {{ relation.include(schema=(schema is not none)) }} + describe extended {{ relation }} {% endcall %} {% do return(load_result('get_columns_in_relation_raw').table) %} {% endmacro %} @@ -263,8 +263,7 @@ {% macro spark__make_temp_relation(base_relation, suffix) %} {% set tmp_identifier = base_relation.identifier ~ suffix %} {% set tmp_relation = base_relation.incorporate(path = { - "identifier": tmp_identifier, - "schema": None + "identifier": tmp_identifier }) -%} {% do return(tmp_relation) %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 1a92351ce..e293441b8 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -17,6 +17,11 @@ {%- set existing_relation = load_relation(this) -%} {%- set tmp_relation = make_temp_relation(this) -%} + {#-- for SQL model we will create temp view that doesn't have database and schema --#} + {%- if language == 'sql'-%} + {%- set tmp_relation = tmp_relation.include(database=false, schema=false) -%} + {%- endif -%} + {#-- Set Overwrite Mode --#} {%- if strategy == 'insert_overwrite' and partition_by -%} {%- call statement() -%} diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index 28b8f2001..d98e1f692 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -4,7 +4,7 @@ {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} insert overwrite table {{ target_relation }} {{ partition_cols(label="partition") }} - select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + select {{dest_cols_csv}} from {{ source_relation }} {% endmacro %} @@ -14,7 +14,7 @@ {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} insert into table {{ target_relation }} - select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + select {{dest_cols_csv}} from {{ source_relation }} {% endmacro %} @@ -45,7 +45,7 @@ {{ sql_header if sql_header is not none }} merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE + using {{ source }} as DBT_INTERNAL_SOURCE on {{ predicates | join(' and ') }} when matched then update set From a9c1d8c256956d40e5b6d25e6f0e7b7f7c9b5700 Mon Sep 17 00:00:00 2001 From: leahwicz <60146280+leahwicz@users.noreply.github.com> Date: Tue, 13 Sep 2022 15:44:11 -0400 Subject: [PATCH 10/14] Update repo templates (#448) --- .github/ISSUE_TEMPLATE/bug-report.yml | 84 ++++++++++++++++++++ .github/ISSUE_TEMPLATE/bug_report.md | 33 -------- .github/ISSUE_TEMPLATE/config.yml | 14 ++++ .github/ISSUE_TEMPLATE/feature-request.yml | 59 ++++++++++++++ .github/ISSUE_TEMPLATE/feature_request.md | 23 ------ .github/ISSUE_TEMPLATE/regression-report.yml | 82 +++++++++++++++++++ .github/ISSUE_TEMPLATE/release.md | 10 --- .github/{ISSUE_TEMPLATE => }/dependabot.yml | 0 .github/pull_request_template.md | 7 +- 9 files changed, 245 insertions(+), 67 deletions(-) create mode 100644 .github/ISSUE_TEMPLATE/bug-report.yml delete mode 100644 .github/ISSUE_TEMPLATE/bug_report.md create mode 100644 .github/ISSUE_TEMPLATE/config.yml create mode 100644 .github/ISSUE_TEMPLATE/feature-request.yml delete mode 100644 .github/ISSUE_TEMPLATE/feature_request.md create mode 100644 .github/ISSUE_TEMPLATE/regression-report.yml delete mode 100644 .github/ISSUE_TEMPLATE/release.md rename .github/{ISSUE_TEMPLATE => }/dependabot.yml (100%) diff --git a/.github/ISSUE_TEMPLATE/bug-report.yml b/.github/ISSUE_TEMPLATE/bug-report.yml new file mode 100644 index 000000000..f5494b313 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug-report.yml @@ -0,0 +1,84 @@ +name: 🐞 Bug +description: Report a bug or an issue you've found with dbt-spark +title: "[Bug] " +labels: ["bug", "triage"] +body: + - type: markdown + attributes: + value: | + Thanks for taking the time to fill out this bug report! + - type: checkboxes + attributes: + label: Is this a new bug in dbt-spark? + description: > + In other words, is this an error, flaw, failure or fault in our software? + + If this is a bug that broke existing functionality that used to work, please open a regression issue. + If this is a bug in the dbt-core logic, please open an issue in the dbt-core repository. + If this is a bug experienced while using dbt Cloud, please report to [support](mailto:support@getdbt.com). + If this is a request for help or troubleshooting code in your own dbt project, please join our [dbt Community Slack](https://www.getdbt.com/community/join-the-community/) or open a [Discussion question](https://github.com/dbt-labs/docs.getdbt.com/discussions). + + Please search to see if an issue already exists for the bug you encountered. + options: + - label: I believe this is a new bug in dbt-spark + required: true + - label: I have searched the existing issues, and I could not find an existing issue for this bug + required: true + - type: textarea + attributes: + label: Current Behavior + description: A concise description of what you're experiencing. + validations: + required: true + - type: textarea + attributes: + label: Expected Behavior + description: A concise description of what you expected to happen. + validations: + required: true + - type: textarea + attributes: + label: Steps To Reproduce + description: Steps to reproduce the behavior. + placeholder: | + 1. In this environment... + 2. With this config... + 3. Run '...' + 4. See error... + validations: + required: true + - type: textarea + id: logs + attributes: + label: Relevant log output + description: | + If applicable, log output to help explain your problem. + render: shell + validations: + required: false + - type: textarea + attributes: + label: Environment + description: | + examples: + - **OS**: Ubuntu 20.04 + - **Python**: 3.9.12 (`python3 --version`) + - **dbt-core**: 1.1.1 (`dbt --version`) + - **dbt-spark**: 1.1.0 (`dbt --version`) + value: | + - OS: + - Python: + - dbt-core: + - dbt-spark: + render: markdown + validations: + required: false + - type: textarea + attributes: + label: Additional Context + description: | + Links? References? Anything that will give us more context about the issue you are encountering! + + Tip: You can attach images or log files by clicking this area to highlight it and then dragging files in. + validations: + required: false diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md deleted file mode 100644 index 43f19a154..000000000 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ /dev/null @@ -1,33 +0,0 @@ ---- -name: Bug report -about: Report a bug or an issue you've found with dbt-spark -title: '' -labels: bug, triage -assignees: '' - ---- - -### Describe the bug -A clear and concise description of what the bug is. What command did you run? What happened? - -### Steps To Reproduce -In as much detail as possible, please provide steps to reproduce the issue. Sample data that triggers the issue, example model code, etc is all very helpful here. - -### Expected behavior -A clear and concise description of what you expected to happen. - -### Screenshots and log output -If applicable, add screenshots or log output to help explain your problem. - -### System information -**The output of `dbt --version`:** -``` -<output goes here> -``` - -**The operating system you're using:** - -**The output of `python --version`:** - -### Additional context -Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 000000000..129ea7779 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,14 @@ +blank_issues_enabled: false +contact_links: + - name: Ask the community for help + url: https://github.com/dbt-labs/docs.getdbt.com/discussions + about: Need help troubleshooting? Check out our guide on how to ask + - name: Contact dbt Cloud support + url: mailto:support@getdbt.com + about: Are you using dbt Cloud? Contact our support team for help! + - name: Participate in Discussions + url: https://github.com/dbt-labs/dbt-spark/discussions + about: Do you have a Big Idea for dbt-spark? Read open discussions, or start a new one + - name: Create an issue for dbt-core + url: https://github.com/dbt-labs/dbt-core/issues/new/choose + about: Report a bug or request a feature for dbt-core diff --git a/.github/ISSUE_TEMPLATE/feature-request.yml b/.github/ISSUE_TEMPLATE/feature-request.yml new file mode 100644 index 000000000..8c123ba51 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature-request.yml @@ -0,0 +1,59 @@ +name: ✨ Feature +description: Propose a straightforward extension of dbt-spark functionality +title: "[Feature] <title>" +labels: ["enhancement", "triage"] +body: + - type: markdown + attributes: + value: | + Thanks for taking the time to fill out this feature request! + - type: checkboxes + attributes: + label: Is this your first time submitting a feature request? + description: > + We want to make sure that features are distinct and discoverable, + so that other members of the community can find them and offer their thoughts. + + Issues are the right place to request straightforward extensions of existing dbt-spark functionality. + For "big ideas" about future capabilities of dbt-spark, we ask that you open a + [discussion](https://github.com/dbt-labs/dbt-spark/discussions) in the "Ideas" category instead. + options: + - label: I have read the [expectations for open source contributors](https://docs.getdbt.com/docs/contributing/oss-expectations) + required: true + - label: I have searched the existing issues, and I could not find an existing issue for this feature + required: true + - label: I am requesting a straightforward extension of existing dbt-spark functionality, rather than a Big Idea better suited to a discussion + required: true + - type: textarea + attributes: + label: Describe the feature + description: A clear and concise description of what you want to happen. + validations: + required: true + - type: textarea + attributes: + label: Describe alternatives you've considered + description: | + A clear and concise description of any alternative solutions or features you've considered. + validations: + required: false + - type: textarea + attributes: + label: Who will this benefit? + description: | + What kind of use case will this feature be useful for? Please be specific and provide examples, this will help us prioritize properly. + validations: + required: false + - type: input + attributes: + label: Are you interested in contributing this feature? + description: Let us know if you want to write some code, and how we can help. + validations: + required: false + - type: textarea + attributes: + label: Anything else? + description: | + Links? References? Anything that will give us more context about the feature you are suggesting! + validations: + required: false diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md deleted file mode 100644 index 5edc9f6ca..000000000 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ /dev/null @@ -1,23 +0,0 @@ ---- -name: Feature request -about: Suggest an idea for dbt-spark -title: '' -labels: enhancement, triage -assignees: '' - ---- - -### Describe the feature -A clear and concise description of what you want to happen. - -### Describe alternatives you've considered -A clear and concise description of any alternative solutions or features you've considered. - -### Additional context -Please include any other relevant context here. - -### Who will this benefit? -What kind of use case will this feature be useful for? Please be specific and provide examples, this will help us prioritize properly. - -### Are you interested in contributing this feature? -Let us know if you want to write some code, and how we can help. diff --git a/.github/ISSUE_TEMPLATE/regression-report.yml b/.github/ISSUE_TEMPLATE/regression-report.yml new file mode 100644 index 000000000..8b65d6a26 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/regression-report.yml @@ -0,0 +1,82 @@ +name: ☣️ Regression +description: Report a regression you've observed in a newer version of dbt-spark +title: "[Regression] <title>" +labels: ["bug", "regression", "triage"] +body: + - type: markdown + attributes: + value: | + Thanks for taking the time to fill out this regression report! + - type: checkboxes + attributes: + label: Is this a regression in a recent version of dbt-spark? + description: > + A regression is when documented functionality works as expected in an older version of dbt-spark, + and no longer works after upgrading to a newer version of dbt-spark + options: + - label: I believe this is a regression in dbt-spark functionality + required: true + - label: I have searched the existing issues, and I could not find an existing issue for this regression + required: true + - type: textarea + attributes: + label: Current Behavior + description: A concise description of what you're experiencing. + validations: + required: true + - type: textarea + attributes: + label: Expected/Previous Behavior + description: A concise description of what you expected to happen. + validations: + required: true + - type: textarea + attributes: + label: Steps To Reproduce + description: Steps to reproduce the behavior. + placeholder: | + 1. In this environment... + 2. With this config... + 3. Run '...' + 4. See error... + validations: + required: true + - type: textarea + id: logs + attributes: + label: Relevant log output + description: | + If applicable, log output to help explain your problem. + render: shell + validations: + required: false + - type: textarea + attributes: + label: Environment + description: | + examples: + - **OS**: Ubuntu 20.04 + - **Python**: 3.9.12 (`python3 --version`) + - **dbt-core (working version)**: 1.1.1 (`dbt --version`) + - **dbt-spark (working version)**: 1.1.0 (`dbt --version`) + - **dbt-core (regression version)**: 1.2.0 (`dbt --version`) + - **dbt-spark (regression version)**: 1.2.0 (`dbt --version`) + value: | + - OS: + - Python: + - dbt-core (working version): + - dbt-spark (working version): + - dbt-core (regression version): + - dbt-spark (regression version): + render: markdown + validations: + required: true + - type: textarea + attributes: + label: Additional Context + description: | + Links? References? Anything that will give us more context about the issue you are encountering! + + Tip: You can attach images or log files by clicking this area to highlight it and then dragging files in. + validations: + required: false diff --git a/.github/ISSUE_TEMPLATE/release.md b/.github/ISSUE_TEMPLATE/release.md deleted file mode 100644 index a69349f54..000000000 --- a/.github/ISSUE_TEMPLATE/release.md +++ /dev/null @@ -1,10 +0,0 @@ ---- -name: Release -about: Release a new version of dbt-spark -title: '' -labels: release -assignees: '' - ---- - -### TBD diff --git a/.github/ISSUE_TEMPLATE/dependabot.yml b/.github/dependabot.yml similarity index 100% rename from .github/ISSUE_TEMPLATE/dependabot.yml rename to .github/dependabot.yml diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index c4a5c53b4..11381456a 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -11,11 +11,16 @@ resolves # ### Description -<!--- Describe the Pull Request here --> +<!--- + Describe the Pull Request here. Add any references and info to help reviewers + understand your changes. Include any tradeoffs you considered. +--> ### Checklist +- [ ] I have read [the contributing guide](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.md) and understand what's expected of me - [ ] I have signed the [CLA](https://docs.getdbt.com/docs/contributor-license-agreements) - [ ] I have run this code in development and it appears to resolve the stated issue - [ ] This PR includes tests, or tests are not required/relevant for this PR +- [ ] I have [opened an issue to add/update docs](https://github.com/dbt-labs/docs.getdbt.com/issues/new/choose), or docs changes are not required/relevant for this PR - [ ] I have run `changie new` to [create a changelog entry](https://github.com/dbt-labs/dbt-spark/blob/main/CONTRIBUTING.md#Adding-CHANGELOG-Entry) From b310c4c81a8aeedc6ed188854d92c36410ba70ba Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Tue, 13 Sep 2022 15:30:30 -0700 Subject: [PATCH 11/14] add mypy ignore to column, connections and init (#462) * add mypy ignore to column, connections and init * changie file --- .changes/unreleased/Under the Hood-20220913-152004.yaml | 7 +++++++ dbt/adapters/spark/__init__.py | 2 +- dbt/adapters/spark/column.py | 4 ++-- dbt/adapters/spark/connections.py | 2 +- dbt/adapters/spark/impl.py | 2 +- 5 files changed, 12 insertions(+), 5 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20220913-152004.yaml diff --git a/.changes/unreleased/Under the Hood-20220913-152004.yaml b/.changes/unreleased/Under the Hood-20220913-152004.yaml new file mode 100644 index 000000000..4c372db01 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220913-152004.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: ignore mypy typing issues +time: 2022-09-13T15:20:04.459783-07:00 +custom: + Author: colin-rogers-dbt + Issue: "461" + PR: "462" diff --git a/dbt/adapters/spark/__init__.py b/dbt/adapters/spark/__init__.py index 6ecc5eccf..91ad54768 100644 --- a/dbt/adapters/spark/__init__.py +++ b/dbt/adapters/spark/__init__.py @@ -5,7 +5,7 @@ from dbt.adapters.spark.impl import SparkAdapter from dbt.adapters.base import AdapterPlugin -from dbt.include import spark +from dbt.include import spark # type: ignore Plugin = AdapterPlugin( adapter=SparkAdapter, credentials=SparkCredentials, include_path=spark.PACKAGE_PATH diff --git a/dbt/adapters/spark/column.py b/dbt/adapters/spark/column.py index dcf7590e9..8100fa450 100644 --- a/dbt/adapters/spark/column.py +++ b/dbt/adapters/spark/column.py @@ -9,7 +9,7 @@ @dataclass -class SparkColumn(dbtClassMixin, Column): +class SparkColumn(dbtClassMixin, Column): # type: ignore table_database: Optional[str] = None table_schema: Optional[str] = None table_name: Optional[str] = None @@ -22,7 +22,7 @@ class SparkColumn(dbtClassMixin, Column): def translate_type(cls, dtype: str) -> str: return dtype - def can_expand_to(self: Self, other_column: Self) -> bool: + def can_expand_to(self: Self, other_column: Self) -> bool: # type: ignore """returns True if both columns are strings""" return self.is_string() and other_column.is_string() diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 59ceb9dd8..80e014a2f 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -59,7 +59,7 @@ class SparkConnectionMethod(StrEnum): class SparkCredentials(Credentials): host: str method: SparkConnectionMethod - database: Optional[str] + database: Optional[str] # type: ignore driver: Optional[str] = None cluster: Optional[str] = None endpoint: Optional[str] = None diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 6e97ce1f5..b89793805 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -118,7 +118,7 @@ def add_schema_to_cache(self, schema) -> str: dbt.exceptions.raise_compiler_error( "Attempted to cache a null schema for {}".format(name) ) - if dbt.flags.USE_CACHE: + if dbt.flags.USE_CACHE: # type: ignore self.cache.add_schema(None, schema) # so jinja doesn't render things return "" From 571a6ef43763d0ae37d84e2a6eba7c32028e21dd Mon Sep 17 00:00:00 2001 From: leahwicz <60146280+leahwicz@users.noreply.github.com> Date: Wed, 14 Sep 2022 10:23:53 -0400 Subject: [PATCH 12/14] Update changelog bot (#463) * Update changelog bot * Updating correct issue number --- .github/workflows/bot-changelog.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/bot-changelog.yml b/.github/workflows/bot-changelog.yml index d8056efe4..39eacf9a6 100644 --- a/.github/workflows/bot-changelog.yml +++ b/.github/workflows/bot-changelog.yml @@ -28,7 +28,7 @@ name: Bot Changelog on: pull_request: # catch when the PR is opened with the label or when the label is added - types: [opened, labeled] + types: [labeled] permissions: contents: write @@ -48,9 +48,9 @@ jobs: steps: - name: Create and commit changelog on bot PR - if: "contains(github.event.pull_request.labels.*.name, ${{ matrix.label }})" + if: ${{ contains(github.event.pull_request.labels.*.name, matrix.label) }} id: bot_changelog - uses: emmyoop/changie_bot@v1.0 + uses: emmyoop/changie_bot@v1.0.1 with: GITHUB_TOKEN: ${{ secrets.FISHTOWN_BOT_PAT }} commit_author_name: "Github Build Bot" @@ -58,4 +58,4 @@ jobs: commit_message: "Add automated changelog yaml from template for bot PR" changie_kind: ${{ matrix.changie_kind }} label: ${{ matrix.label }} - custom_changelog_string: "custom:\n Author: ${{ github.event.pull_request.user.login }}\n Issue: 417\n PR: ${{ github.event.pull_request.number }}\n" + custom_changelog_string: "custom:\n Author: ${{ github.event.pull_request.user.login }}\n Issue: 417\n PR: ${{ github.event.pull_request.number }}" From 60f47d5acf7e2788725529e4bf349120551eb66b Mon Sep 17 00:00:00 2001 From: Matthew McKnight <91097623+McKnight-42@users.noreply.github.com> Date: Thu, 15 Sep 2022 09:37:05 -0500 Subject: [PATCH 13/14] [CT-1114] remove Cache call from get_columns_in_relation (#451) * init push for change to get_columns_in_relation to fix cache inconsistencies during on_schema_change * trying to clear mypy issues * changelog * add ref to columns before called on by macro --- .../unreleased/Fixes-20220914-010520.yaml | 8 ++++ .pre-commit-config.yaml | 2 +- dbt/adapters/spark/impl.py | 42 ++++++------------- 3 files changed, 22 insertions(+), 30 deletions(-) create mode 100644 .changes/unreleased/Fixes-20220914-010520.yaml diff --git a/.changes/unreleased/Fixes-20220914-010520.yaml b/.changes/unreleased/Fixes-20220914-010520.yaml new file mode 100644 index 000000000..f8584f05f --- /dev/null +++ b/.changes/unreleased/Fixes-20220914-010520.yaml @@ -0,0 +1,8 @@ +kind: Fixes +body: change to get_columns_in_relation to fix cache inconsistencies to fix cache + issues in incremental models causing failure on on_schema_change +time: 2022-09-14T01:05:20.312981-05:00 +custom: + Author: McKnight-42 + Issue: "447" + PR: "451" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e70156dcd..e85b1dc8b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ exclude: '^tests/.*' # Force all unspecified python hooks to run python 3.8 default_language_version: - python: python3.8 + python: python3 repos: - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index b89793805..c228fc03d 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -207,36 +207,20 @@ def find_table_information_separator(rows: List[dict]) -> int: return pos def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: - cached_relations = self.cache.get_relations(relation.database, relation.schema) - cached_relation = next( - ( - cached_relation - for cached_relation in cached_relations - if str(cached_relation) == str(relation) - ), - None, - ) columns = [] - if cached_relation and cached_relation.information: - columns = self.parse_columns_from_information(cached_relation) - if not columns: - # in open source delta 'show table extended' query output doesnt - # return relation's schema. if columns are empty from cache, - # use get_columns_in_relation spark macro - # which would execute 'describe extended tablename' query - try: - rows: List[agate.Row] = self.execute_macro( - GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} - ) - columns = self.parse_describe_extended(relation, rows) - except dbt.exceptions.RuntimeException as e: - # spark would throw error when table doesn't exist, where other - # CDW would just return and empty list, normalizing the behavior here - errmsg = getattr(e, "msg", "") - if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: - pass - else: - raise e + try: + rows: List[agate.Row] = self.execute_macro( + GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation} + ) + columns = self.parse_describe_extended(relation, rows) + except dbt.exceptions.RuntimeException as e: + # spark would throw error when table doesn't exist, where other + # CDW would just return and empty list, normalizing the behavior here + errmsg = getattr(e, "msg", "") + if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg: + pass + else: + raise e # strip hudi metadata columns. columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS] From 36bbe0de5ec6069384c9a754ada85588fe032511 Mon Sep 17 00:00:00 2001 From: Chenyu Li <chenyu.li@dbtlabs.com> Date: Thu, 15 Sep 2022 08:20:42 -0700 Subject: [PATCH 14/14] Enhancement/refactor python submission (#452) * refactor and move common logic to core --- .../Under the Hood-20220912-104517.yaml | 7 +++ dbt/adapters/spark/impl.py | 43 ++++++++--------- dbt/adapters/spark/python_submissions.py | 48 +++++++++---------- 3 files changed, 49 insertions(+), 49 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20220912-104517.yaml diff --git a/.changes/unreleased/Under the Hood-20220912-104517.yaml b/.changes/unreleased/Under the Hood-20220912-104517.yaml new file mode 100644 index 000000000..e45c97bf0 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220912-104517.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Better interface for python submission +time: 2022-09-12T10:45:17.226481-07:00 +custom: + Author: ChenyuLInx + Issue: "452" + PR: "452" diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index c228fc03d..77b1e4b5a 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,7 +1,7 @@ import re from concurrent.futures import Future from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, Iterable, List, Optional, Union, Type from typing_extensions import TypeAlias import agate @@ -10,14 +10,17 @@ import dbt import dbt.exceptions -from dbt.adapters.base import AdapterConfig -from dbt.adapters.base.impl import catch_as_completed, log_code_execution -from dbt.adapters.base.meta import available +from dbt.adapters.base import AdapterConfig, PythonJobHelper +from dbt.adapters.base.impl import catch_as_completed +from dbt.contracts.connection import AdapterResponse from dbt.adapters.sql import SQLAdapter from dbt.adapters.spark import SparkConnectionManager from dbt.adapters.spark import SparkRelation from dbt.adapters.spark import SparkColumn -from dbt.adapters.spark.python_submissions import PYTHON_SUBMISSION_HELPERS +from dbt.adapters.spark.python_submissions import ( + DBNotebookPythonJobHelper, + DBCommandsApiPythonJobHelper, +) from dbt.adapters.base import BaseRelation from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER from dbt.events import AdapterLogger @@ -369,26 +372,20 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False - @available.parse_none - @log_code_execution - def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None): - # TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead - # of `None` which evaluates to True! - - # TODO limit this function to run only when doing the materialization of python nodes - # assuming that for python job running over 1 day user would mannually overwrite this - submission_method = parsed_model["config"].get("submission_method", "commands") - if submission_method not in PYTHON_SUBMISSION_HELPERS: - raise NotImplementedError( - "Submission method {} is not supported".format(submission_method) - ) - job_helper = PYTHON_SUBMISSION_HELPERS[submission_method]( - parsed_model, self.connections.profile.credentials - ) - job_helper.submit(compiled_code) - # we don't really get any useful information back from the job submission other than success + def generate_python_submission_response(self, submission_result: Any) -> AdapterResponse: return self.connections.get_response(None) + @property + def default_python_submission_method(self) -> str: + return "commands" + + @property + def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]: + return { + "notebook": DBNotebookPythonJobHelper, + "commands": DBCommandsApiPythonJobHelper, + } + def standardize_grants_dict(self, grants_table: agate.Table) -> dict: grants_dict: Dict[str, List[str]] = {} for row in grants_table: diff --git a/dbt/adapters/spark/python_submissions.py b/dbt/adapters/spark/python_submissions.py index ea172ef03..5ee4adb18 100644 --- a/dbt/adapters/spark/python_submissions.py +++ b/dbt/adapters/spark/python_submissions.py @@ -5,14 +5,16 @@ import uuid import dbt.exceptions +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.spark import SparkCredentials -DEFAULT_POLLING_INTERVAL = 3 +DEFAULT_POLLING_INTERVAL = 5 SUBMISSION_LANGUAGE = "python" DEFAULT_TIMEOUT = 60 * 60 * 24 -class BasePythonJobHelper: - def __init__(self, parsed_model, credentials): +class BaseDatabricksHelper(PythonJobHelper): + def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None: self.check_credentials(credentials) self.credentials = credentials self.identifier = parsed_model["alias"] @@ -21,18 +23,18 @@ def __init__(self, parsed_model, credentials): self.timeout = self.get_timeout() self.polling_interval = DEFAULT_POLLING_INTERVAL - def get_timeout(self): + def get_timeout(self) -> int: timeout = self.parsed_model["config"].get("timeout", DEFAULT_TIMEOUT) if timeout <= 0: raise ValueError("Timeout must be a positive integer") return timeout - def check_credentials(self, credentials): + def check_credentials(self, credentials: SparkCredentials) -> None: raise NotImplementedError( "Overwrite this method to check specific requirement for current submission method" ) - def submit(self, compiled_code): + def submit(self, compiled_code: str) -> None: raise NotImplementedError( "BasePythonJobHelper is an abstract class and you should implement submit method." ) @@ -45,7 +47,7 @@ def polling( terminal_states, expected_end_state, get_state_msg_func, - ): + ) -> Dict: state = None start = time.time() exceeded_timeout = False @@ -54,7 +56,7 @@ def polling( if time.time() - start > self.timeout: exceeded_timeout = True break - # TODO should we do exponential backoff? + # should we do exponential backoff? time.sleep(self.polling_interval) response = status_func(**status_func_kwargs) state = get_state_func(response) @@ -68,16 +70,16 @@ def polling( return response -class DBNotebookPythonJobHelper(BasePythonJobHelper): - def __init__(self, parsed_model, credentials): +class DBNotebookPythonJobHelper(BaseDatabricksHelper): + def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None: super().__init__(parsed_model, credentials) self.auth_header = {"Authorization": f"Bearer {self.credentials.token}"} - def check_credentials(self, credentials): + def check_credentials(self, credentials) -> None: if not credentials.user: raise ValueError("Databricks user is required for notebook submission method.") - def _create_work_dir(self, path): + def _create_work_dir(self, path: str) -> None: response = requests.post( f"https://{self.credentials.host}/api/2.0/workspace/mkdirs", headers=self.auth_header, @@ -90,7 +92,7 @@ def _create_work_dir(self, path): f"Error creating work_dir for python notebooks\n {response.content!r}" ) - def _upload_notebook(self, path, compiled_code): + def _upload_notebook(self, path: str, compiled_code: str) -> None: b64_encoded_content = base64.b64encode(compiled_code.encode()).decode() response = requests.post( f"https://{self.credentials.host}/api/2.0/workspace/import", @@ -108,7 +110,7 @@ def _upload_notebook(self, path, compiled_code): f"Error creating python notebook.\n {response.content!r}" ) - def _submit_notebook(self, path): + def _submit_notebook(self, path: str) -> str: submit_response = requests.post( f"https://{self.credentials.host}/api/2.1/jobs/runs/submit", headers=self.auth_header, @@ -126,7 +128,7 @@ def _submit_notebook(self, path): ) return submit_response.json()["run_id"] - def submit(self, compiled_code): + def submit(self, compiled_code: str) -> None: # it is safe to call mkdirs even if dir already exists and have content inside work_dir = f"/Users/{self.credentials.user}/{self.schema}/" self._create_work_dir(work_dir) @@ -167,7 +169,7 @@ def submit(self, compiled_code): class DBContext: - def __init__(self, credentials): + def __init__(self, credentials: SparkCredentials) -> None: self.auth_header = {"Authorization": f"Bearer {credentials.token}"} self.cluster = credentials.cluster self.host = credentials.host @@ -206,7 +208,7 @@ def destroy(self, context_id: str) -> str: class DBCommand: - def __init__(self, credentials): + def __init__(self, credentials: SparkCredentials) -> None: self.auth_header = {"Authorization": f"Bearer {credentials.token}"} self.cluster = credentials.cluster self.host = credentials.host @@ -247,12 +249,12 @@ def status(self, context_id: str, command_id: str) -> Dict[str, Any]: return response.json() -class DBCommandsApiPythonJobHelper(BasePythonJobHelper): - def check_credentials(self, credentials): +class DBCommandsApiPythonJobHelper(BaseDatabricksHelper): + def check_credentials(self, credentials: SparkCredentials) -> None: if not credentials.cluster: raise ValueError("Databricks cluster is required for commands submission method.") - def submit(self, compiled_code): + def submit(self, compiled_code: str) -> None: context = DBContext(self.credentials) command = DBCommand(self.credentials) context_id = context.create() @@ -276,9 +278,3 @@ def submit(self, compiled_code): ) finally: context.destroy(context_id) - - -PYTHON_SUBMISSION_HELPERS = { - "notebook": DBNotebookPythonJobHelper, - "commands": DBCommandsApiPythonJobHelper, -}