From 1e13b6d3c1d044f5a933ff27fed5c83984b6df98 Mon Sep 17 00:00:00 2001 From: Nick Zeolla Date: Thu, 29 Jun 2023 22:55:17 +0100 Subject: [PATCH 1/6] swap batch_id declaration to model config --- dbt/adapters/bigquery/python_submissions.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 29e33032d..acb94ffea 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -115,6 +115,11 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient: client_options=self.client_options, credentials=self.GoogleCredentials ) + def _get_batch_id(self) -> str: + return self.parsed_model["config"].get( + "batch_id", self.credential.batch_id + ) + def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: batch = self._configure_batch() parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}" @@ -122,7 +127,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: request = dataproc_v1.CreateBatchRequest( parent=parent, batch=batch, - batch_id=self.credential.batch_id, + batch_id=self._get_batch_id(), ) # make the request operation = self.job_client.create_batch(request=request) # type: ignore From ef3d398bc0bdf4be319ff2098df31ed4e04326c1 Mon Sep 17 00:00:00 2001 From: Nick Zeolla Date: Fri, 30 Jun 2023 09:26:38 +0100 Subject: [PATCH 2/6] address changie req, fix python submission --- .changes/unreleased/Fixes-20230630-092618.yaml | 6 ++++++ dbt/adapters/bigquery/python_submissions.py | 4 +--- 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230630-092618.yaml diff --git a/.changes/unreleased/Fixes-20230630-092618.yaml b/.changes/unreleased/Fixes-20230630-092618.yaml new file mode 100644 index 000000000..57457e12d --- /dev/null +++ b/.changes/unreleased/Fixes-20230630-092618.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Change batch_id to model override +time: 2023-06-30T09:26:18.854492+01:00 +custom: + Author: nickozilla + Issue: "671" diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index acb94ffea..3d962248b 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -116,9 +116,7 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient: ) def _get_batch_id(self) -> str: - return self.parsed_model["config"].get( - "batch_id", self.credential.batch_id - ) + return self.parsed_model["config"].get("batch_id", self.credential.batch_id) def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: batch = self._configure_batch() From 222ace65066b3e0191dc5ecab5b64d623602d0af Mon Sep 17 00:00:00 2001 From: Doug Beatty <44704949+dbeatty10@users.noreply.github.com> Date: Wed, 12 Jul 2023 13:24:07 -0600 Subject: [PATCH 3/6] Update bug that is being resolved --- .changes/unreleased/Fixes-20230630-092618.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Fixes-20230630-092618.yaml b/.changes/unreleased/Fixes-20230630-092618.yaml index 57457e12d..83da0aa88 100644 --- a/.changes/unreleased/Fixes-20230630-092618.yaml +++ b/.changes/unreleased/Fixes-20230630-092618.yaml @@ -3,4 +3,4 @@ body: Change batch_id to model override time: 2023-06-30T09:26:18.854492+01:00 custom: Author: nickozilla - Issue: "671" + Issue: "822" From 6233c5245dc2d613b4540741786b20195a05a1ff Mon Sep 17 00:00:00 2001 From: Nick Zeolla Date: Sun, 16 Jul 2023 14:48:56 +0100 Subject: [PATCH 4/6] implement dbeatty's suggestion --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 3d962248b..6e5a11e52 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -116,7 +116,7 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient: ) def _get_batch_id(self) -> str: - return self.parsed_model["config"].get("batch_id", self.credential.batch_id) + return self.parsed_model["config"].get("batch_id") def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: batch = self._configure_batch() From 01ff3ca9cd89e275f086cf9fd2b8da2ff6835fe0 Mon Sep 17 00:00:00 2001 From: Nicholas Zeolla Date: Sun, 23 Jul 2023 14:10:53 +0100 Subject: [PATCH 5/6] Update .changes/unreleased/Fixes-20230630-092618.yaml Co-authored-by: Doug Beatty <44704949+dbeatty10@users.noreply.github.com> --- .changes/unreleased/Fixes-20230630-092618.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changes/unreleased/Fixes-20230630-092618.yaml b/.changes/unreleased/Fixes-20230630-092618.yaml index 83da0aa88..16f34bec0 100644 --- a/.changes/unreleased/Fixes-20230630-092618.yaml +++ b/.changes/unreleased/Fixes-20230630-092618.yaml @@ -1,6 +1,6 @@ -kind: Fixes +kind: Features body: Change batch_id to model override time: 2023-06-30T09:26:18.854492+01:00 custom: Author: nickozilla - Issue: "822" + Issue: "671" From 8b57e2a7200758323751f59dc12df5652dae8b3c Mon Sep 17 00:00:00 2001 From: Nick Zeolla Date: Fri, 4 Aug 2023 18:30:44 +0100 Subject: [PATCH 6/6] Add 2 tests --- tests/functional/adapter/test_python_model.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 7f17429f1..241082cdb 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -1,5 +1,6 @@ import os import pytest +import time from dbt.tests.util import run_dbt, run_dbt_and_capture, write_file import dbt.tests.adapter.python_model.test_python_model as dbt_tests @@ -64,6 +65,93 @@ def model(dbt, spark): return spark.createDataFrame(data, schema=['test1', 'test3']) """ +models__python_array_batch_id_python = """ +import pandas + +def model(dbt, spark): + random_array = [ + [9001.3985362160208, -157.9871329592354], + [-817.8786101352823, -528.9769041860632], + [-886.6488625065194, 941.0504221837489], + [6.69525238666165, 919.5903586746183], + [754.3718741592056, -121.25678519054622], + [-352.3158889341157, 254.9985130814921], + [563.0633042715097, 833.2963094260072], + ] + + df = pd.DataFrame(random_array, columns=["A", "B"]) + + df["C"] = df["A"] * df["B"] + + final_df = df[["A", "B", "C"]] + + return final_df +""" + +models__python_array_batch_id_yaml = """ +models: + - name: python_array_batch_id + description: A random table with a calculated column defined in python. + config: + batch_id: '{{ run_started_at.strftime("%Y-%m-%d-%H-%M-%S") }}-python-array' + columns: + - name: A + description: Column A + - name: B + description: Column B + - name: C + description: Column C +""" + +custom_ts_id = str("custom-" + str(time.time()).replace(".", "-")) + +models__bad_python_array_batch_id_yaml = f""" +models: + - name: python_array_batch_id + description: A random table with a calculated column defined in python. + config: + batch_id: {custom_ts_id}-python-array + columns: + - name: A + description: Column A + - name: B + description: Column B + - name: C + description: Column C +""" + + +class TestPythonBatchIdModels: + @pytest.fixture(scope="class") + def models(self): + return { + "python_array_batch_id.py": models__python_array_batch_id_python, + "python_array_batch_id.yml": models__python_array_batch_id_yaml, + } + + def test_multiple_named_python_models(self, project): + result, output = run_dbt_and_capture(["run"], expect_pass=True) + time.sleep(5) # In case both runs are submitted simultaneously + result_two, output_two = run_dbt_and_capture(["run"], expect_pass=True) + assert len(result) == 1 + assert len(result_two) == 1 + + +class TestPythonDuplicateBatchIdModels: + @pytest.fixture(scope="class") + def models(self): + return { + "python_array_batch_id.py": models__python_array_batch_id_python, + "python_array_batch_id.yml": models__bad_python_array_batch_id_yaml, + } + + def test_multiple_python_models_fixed_id(self, project): + result, output = run_dbt_and_capture(["run"], expect_pass=True) + result_two, output_two = run_dbt_and_capture(["run"], expect_pass=False) + assert result_two[0].message.startswith("409 Already exists: Failed to create batch:") + assert len(result) == 1 + assert len(result_two) == 1 + @pytest.mark.skip(reason=TEST_SKIP_MESSAGE) class TestChangingSchemaDataproc: