From 0fb5be61c682f0358cbdecee8c5149482f88e26d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 09:49:24 +0200 Subject: [PATCH 01/17] Decouple hook executor from the operator, add Cloud Build execution and adjust testing --- airflow_dbt/hooks/dbt_hook.py | 223 ++++++++++++++------------ airflow_dbt/operators/dbt_operator.py | 102 ++++++++---- setup.py | 3 + tests/hooks/test_dbt_hook.py | 65 +++----- tests/operators/test_dbt_operator.py | 39 +++-- 5 files changed, 227 insertions(+), 205 deletions(-) diff --git a/airflow_dbt/hooks/dbt_hook.py b/airflow_dbt/hooks/dbt_hook.py index 0b4caf0..bf2c3fc 100644 --- a/airflow_dbt/hooks/dbt_hook.py +++ b/airflow_dbt/hooks/dbt_hook.py @@ -1,142 +1,157 @@ from __future__ import print_function -import os -import signal -import subprocess + import json -from airflow.exceptions import AirflowException +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Union + from airflow.hooks.base_hook import BaseHook +from airflow.hooks.subprocess import SubprocessHook -class DbtCliHook(BaseHook): +class DbtBaseHook(BaseHook, ABC): """ Simple wrapper around the dbt CLI. - :param profiles_dir: If set, passed as the `--profiles-dir` argument to the `dbt` command - :type profiles_dir: str - :param target: If set, passed as the `--target` argument to the `dbt` command :type dir: str :param dir: The directory to run the CLI in - :type vars: str - :param vars: If set, passed as the `--vars` argument to the `dbt` command - :type vars: dict - :param full_refresh: If `True`, will fully-refresh incremental models. - :type full_refresh: bool - :param models: If set, passed as the `--models` argument to the `dbt` command - :type models: str - :param warn_error: If `True`, treat warnings as errors. - :type warn_error: bool - :param exclude: If set, passed as the `--exclude` argument to the `dbt` command - :type exclude: str - :param select: If set, passed as the `--select` argument to the `dbt` command - :type select: str - :param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your `PATH` + :type env: dict + :param env: If set, passed to the dbt executor + :param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your + `PATH` :type dbt_bin: str - :param output_encoding: Output encoding of bash command. Defaults to utf-8 - :type output_encoding: str - :param verbose: The operator will log verbosely to the Airflow logs - :type verbose: bool """ - def __init__(self, - profiles_dir=None, - target=None, - dir='.', - vars=None, - full_refresh=False, - data=False, - schema=False, - models=None, - exclude=None, - select=None, - dbt_bin='dbt', - output_encoding='utf-8', - verbose=True, - warn_error=False): - self.profiles_dir = profiles_dir + def __init__(self, dir: str = '.', env: Dict = None, dbt_bin='dbt'): + super().__init__() self.dir = dir - self.target = target - self.vars = vars - self.full_refresh = full_refresh - self.data = data - self.schema = schema - self.models = models - self.exclude = exclude - self.select = select + self.env = env if env is not None else {} self.dbt_bin = dbt_bin - self.verbose = verbose - self.warn_error = warn_error - self.output_encoding = output_encoding - - def _dump_vars(self): - # The dbt `vars` parameter is defined using YAML. Unfortunately the standard YAML library - # for Python isn't very good and I couldn't find an easy way to have it formatted - # correctly. However, as YAML is a super-set of JSON, this works just fine. - return json.dumps(self.vars) - def run_cli(self, *command): + def generate_dbt_cli_command( + self, + base_command: str, + profiles_dir: str = None, + target: str = None, + vars: Dict[str, str] = None, + full_refresh: bool = False, + data: bool = False, + schema: bool = False, + models: str = None, + exclude: str = None, + select: str = None, + warn_error: bool = False, + ) -> List[str]: """ - Run the dbt cli - - :param command: The dbt command to run - :type command: str + Generate the command that will be run based on class properties, + presets and dbt commands + + :param base_command: The dbt sub-command to run + :type base_command: str + :param profiles_dir: If set, passed as the `--profiles-dir` argument to + the `dbt` command + :type profiles_dir: str + :param target: If set, passed as the `--target` argument to the `dbt` + command + :type vars: Union[str, dict] + :param vars: If set, passed as the `--vars` argument to the `dbt` + command + :param full_refresh: If `True`, will fully-refresh incremental models. + :type full_refresh: bool + :param data: + :type data: bool + :param schema: + :type schema: bool + :param models: If set, passed as the `--models` argument to the `dbt` + command + :type models: str + :param warn_error: If `True`, treat warnings as errors. + :type warn_error: bool + :param exclude: If set, passed as the `--exclude` argument to the `dbt` + command + :type exclude: str + :param select: If set, passed as the `--select` argument to the `dbt` + command + :type select: str """ + dbt_cmd = [self.dbt_bin, base_command] - dbt_cmd = [self.dbt_bin, *command] - - if self.profiles_dir is not None: - dbt_cmd.extend(['--profiles-dir', self.profiles_dir]) + if profiles_dir is not None: + dbt_cmd.extend(['--profiles-dir', profiles_dir]) - if self.target is not None: - dbt_cmd.extend(['--target', self.target]) + if target is not None: + dbt_cmd.extend(['--target', target]) - if self.vars is not None: - dbt_cmd.extend(['--vars', self._dump_vars()]) + if vars is not None: + dbt_cmd.extend(['--vars', json.dumps(vars)]) - if self.data: + if data: dbt_cmd.extend(['--data']) - if self.schema: + if schema: dbt_cmd.extend(['--schema']) - if self.models is not None: - dbt_cmd.extend(['--models', self.models]) + if models is not None: + dbt_cmd.extend(['--models', models]) - if self.exclude is not None: - dbt_cmd.extend(['--exclude', self.exclude]) + if exclude is not None: + dbt_cmd.extend(['--exclude', self]) - if self.select is not None: - dbt_cmd.extend(['--select', self.select]) + if select is not None: + dbt_cmd.extend(['--select', select]) - if self.full_refresh: + if full_refresh: dbt_cmd.extend(['--full-refresh']) - if self.warn_error: + if warn_error: dbt_cmd.insert(1, '--warn-error') - if self.verbose: - self.log.info(" ".join(dbt_cmd)) + return dbt_cmd + + @abstractmethod + def run_dbt(self, dbt_cmd: Union[str, List[str]]): + """Run the dbt command""" + + +class DbtCliHook(DbtBaseHook): + """ + Run the dbt command in the same airflow worker the task is being run. + This requires the `dbt` python package to be installed in it first. Also + the dbt_bin path might not be set in the `PATH` variable, so it could be + necessary to set it in the constructor. + + :type dir: str + :param dir: The directory to run the CLI in + :type env: dict + :param env: If set, passed to the dbt executor + :param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your + `PATH` + :type dbt_bin: str + """ + + def __init__(self, dir: str = '.', env: Dict = None, dbt_bin='dbt'): + self.sp = SubprocessHook() + super().__init__(dir=dir, env=env, dbt_bin=dbt_bin) - sp = subprocess.Popen( - dbt_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, + def get_conn(self) -> Any: + """ + Return the subprocess connection, which isn't implemented, just for + conformity + """ + return self.sp.get_conn() + + def run_dbt(self, dbt_cmd: Union[str, List[str]]): + """ + Run the dbt cli + + :param dbt_cmd: The dbt whole command to run + :type dbt_cmd: List[str] + """ + self.sp.run_command( + command=dbt_cmd, + env=self.env, cwd=self.dir, - close_fds=True) - self.sp = sp - self.log.info("Output:") - line = '' - for line in iter(sp.stdout.readline, b''): - line = line.decode(self.output_encoding).rstrip() - self.log.info(line) - sp.wait() - self.log.info( - "Command exited with return code %s", - sp.returncode ) - if sp.returncode: - raise AirflowException("dbt command failed") - def on_kill(self): - self.log.info('Sending SIGTERM signal to dbt command') - os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) + """Kill the open subprocess if the task gets killed by Airflow""" + self.sp.send_sigterm() diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 6233d8d..7be49e9 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -1,7 +1,10 @@ -from airflow_dbt.hooks.dbt_hook import DbtCliHook +from typing import Any + from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults +from airflow_dbt.hooks.dbt_hook import DbtCliHook + class DbtBaseOperator(BaseOperator): """ @@ -30,6 +33,16 @@ class DbtBaseOperator(BaseOperator): :type dbt_bin: str :param verbose: The operator will log verbosely to the Airflow logs :type verbose: bool + :param dbt_hook: The dbt hook to use as executor. For now the + implemented ones are: DbtCliHook, DbtCloudBuildHook. It should be an + instance of one of those, or another that inherits from DbtBaseHook. If + not provided by default a DbtCliHook will be instantiated with the + provided params + :type dbt_hook: DbtBaseHook + :param base_command: The dbt sub command to run, for example for `dbt + run` the base_command will be `run`. If any other flag not + contemplated must be included it can also be added to this string + :type base_command: str """ ui_color = '#d6522a' @@ -51,6 +64,8 @@ def __init__(self, full_refresh=False, data=False, schema=False, + dbt_hook=None, + base_command=None, *args, **kwargs): super(DbtBaseOperator, self).__init__(*args, **kwargs) @@ -68,13 +83,15 @@ def __init__(self, self.dbt_bin = dbt_bin self.verbose = verbose self.warn_error = warn_error - self.create_hook() + self.base_command = base_command + self.hook = dbt_hook if dbt_hook is not None else DbtCliHook() - def create_hook(self): - self.hook = DbtCliHook( + def execute(self, context: Any): + """Runs the provided command in the provided execution environment""" + dbt_cli_command = self.hook.generate_dbt_cli_command( + base_command=self.base_command, profiles_dir=self.profiles_dir, target=self.target, - dir=self.dir, vars=self.vars, full_refresh=self.full_refresh, data=self.data, @@ -82,63 +99,78 @@ def create_hook(self): models=self.models, exclude=self.exclude, select=self.select, - dbt_bin=self.dbt_bin, - verbose=self.verbose, - warn_error=self.warn_error) - - return self.hook + warn_error=self.warn_error, + ) + self.hook.run_dbt(dbt_cli_command) class DbtRunOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtRunOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) - - def execute(self, context): - self.create_hook().run_cli('run') + super().__init__( + profiles_dir=profiles_dir, + target=target, + base_command='run', + *args, + **kwargs + ) class DbtTestOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtTestOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) - - def execute(self, context): - self.create_hook().run_cli('test') + super().__init__( + profiles_dir=profiles_dir, + target=target, + base_command='test', + *args, + **kwargs + ) class DbtDocsGenerateOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDocsGenerateOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, - **kwargs) - - def execute(self, context): - self.create_hook().run_cli('docs', 'generate') + super().__init__( + profiles_dir=profiles_dir, + target=target, + base_command='docs generate', + *args, + **kwargs + ) class DbtSnapshotOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtSnapshotOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) - - def execute(self, context): - self.create_hook().run_cli('snapshot') + super().__init__( + profiles_dir=profiles_dir, + target=target, + base_command='snapshot', + *args, + **kwargs + ) class DbtSeedOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtSeedOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) - - def execute(self, context): - self.create_hook().run_cli('seed') + super().__init__( + profiles_dir=profiles_dir, + target=target, + base_command='seed', + *args, + **kwargs + ) class DbtDepsOperator(DbtBaseOperator): @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): - super(DbtDepsOperator, self).__init__(profiles_dir=profiles_dir, target=target, *args, **kwargs) - - def execute(self, context): - self.create_hook().run_cli('deps') + super().__init__( + profiles_dir=profiles_dir, + target=target, + base_command='deps', + *args, + **kwargs + ) diff --git a/setup.py b/setup.py index 7d3b2f8..325e51b 100644 --- a/setup.py +++ b/setup.py @@ -78,4 +78,7 @@ def run(self): cmdclass={ 'upload': UploadCommand, }, + extras_require={ + 'google': 'apache-airflow-providers-google==5.0.0' + }, ) diff --git a/tests/hooks/test_dbt_hook.py b/tests/hooks/test_dbt_hook.py index 4dd39ed..e1990b8 100644 --- a/tests/hooks/test_dbt_hook.py +++ b/tests/hooks/test_dbt_hook.py @@ -1,54 +1,27 @@ -from unittest import TestCase -from unittest import mock -import subprocess -from airflow_dbt.hooks.dbt_hook import DbtCliHook +from unittest import TestCase, mock +from airflow.hooks.subprocess import SubprocessHook -class TestDbtHook(TestCase): +from airflow_dbt.hooks.dbt_hook import DbtCliHook - @mock.patch('subprocess.Popen') - def test_sub_commands(self, mock_subproc_popen): - mock_subproc_popen.return_value \ - .communicate.return_value = ('output', 'error') - mock_subproc_popen.return_value.returncode = 0 - mock_subproc_popen.return_value \ - .stdout.readline.side_effect = [b"placeholder"] +class TestDbtHook(TestCase): + @mock.patch.object(SubprocessHook, 'run_command') + def test_sub_commands(self, mock_run_command): hook = DbtCliHook() - hook.run_cli('docs', 'generate') - - mock_subproc_popen.assert_called_once_with( - [ - 'dbt', - 'docs', - 'generate' - ], - close_fds=True, + hook.run_dbt(['dbt', 'docs', 'generate']) + mock_run_command.assert_called_once_with( + command=['dbt', 'docs', 'generate'], + env={}, cwd='.', - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT - ) - - @mock.patch('subprocess.Popen') - def test_vars(self, mock_subproc_popen): - mock_subproc_popen.return_value \ - .communicate.return_value = ('output', 'error') - mock_subproc_popen.return_value.returncode = 0 - mock_subproc_popen.return_value \ - .stdout.readline.side_effect = [b"placeholder"] + ) - hook = DbtCliHook(vars={"foo": "bar", "baz": "true"}) - hook.run_cli('run') + def test_vars(self): + hook = DbtCliHook() + generated_command = hook.generate_dbt_cli_command( + 'run', + vars={"foo": "bar", "baz": "true"} + ) - mock_subproc_popen.assert_called_once_with( - [ - 'dbt', - 'run', - '--vars', - '{"foo": "bar", "baz": "true"}' - ], - close_fds=True, - cwd='.', - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT - ) + assert generated_command == ['dbt', 'run', '--vars', + '{"foo": "bar", "baz": "true"}'] diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index 8ce2c5f..236396d 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -1,17 +1,16 @@ import datetime from unittest import TestCase, mock + from airflow import DAG, configuration + from airflow_dbt.hooks.dbt_hook import DbtCliHook from airflow_dbt.operators.dbt_operator import ( - DbtSeedOperator, - DbtSnapshotOperator, - DbtRunOperator, + DbtDepsOperator, DbtRunOperator, DbtSeedOperator, DbtSnapshotOperator, DbtTestOperator, - DbtDepsOperator ) -class TestDbtOperator(TestCase): +class TestDbtCliOperator(TestCase): def setUp(self): configuration.conf.load_test_config() args = { @@ -20,47 +19,47 @@ def setUp(self): } self.dag = DAG('test_dag_id', default_args=args) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_run(self, mock_run_cli): + @mock.patch.object(DbtCliHook, 'run_dbt') + def test_dbt_run(self, mock_run_dbt): operator = DbtRunOperator( task_id='run', dag=self.dag ) operator.execute(None) - mock_run_cli.assert_called_once_with('run') + mock_run_dbt.assert_called_once_with(['dbt', 'run']) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_test(self, mock_run_cli): + @mock.patch.object(DbtCliHook, 'run_dbt') + def test_dbt_test(self, mock_run_dbt): operator = DbtTestOperator( task_id='test', dag=self.dag ) operator.execute(None) - mock_run_cli.assert_called_once_with('test') + mock_run_dbt.assert_called_once_with(['dbt', 'test']) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_snapshot(self, mock_run_cli): + @mock.patch.object(DbtCliHook, 'run_dbt') + def test_dbt_snapshot(self, mock_run_dbt): operator = DbtSnapshotOperator( task_id='snapshot', dag=self.dag ) operator.execute(None) - mock_run_cli.assert_called_once_with('snapshot') + mock_run_dbt.assert_called_once_with(['dbt', 'snapshot']) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_seed(self, mock_run_cli): + @mock.patch.object(DbtCliHook, 'run_dbt') + def test_dbt_seed(self, mock_run_dbt): operator = DbtSeedOperator( task_id='seed', dag=self.dag ) operator.execute(None) - mock_run_cli.assert_called_once_with('seed') + mock_run_dbt.assert_called_once_with(['dbt', 'seed']) - @mock.patch.object(DbtCliHook, 'run_cli') - def test_dbt_deps(self, mock_run_cli): + @mock.patch.object(DbtCliHook, 'run_dbt') + def test_dbt_deps(self, mock_run_dbt): operator = DbtDepsOperator( task_id='deps', dag=self.dag ) operator.execute(None) - mock_run_cli.assert_called_once_with('deps') + mock_run_dbt.assert_called_once_with(['dbt', 'deps']) From e53b7442f1c548b60b569f30067706fa6ca42cd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 13:01:38 +0200 Subject: [PATCH 02/17] Add hook to run in google cloud build --- airflow_dbt/hooks/dbt_google_hook.py | 161 +++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 airflow_dbt/hooks/dbt_google_hook.py diff --git a/airflow_dbt/hooks/dbt_google_hook.py b/airflow_dbt/hooks/dbt_google_hook.py new file mode 100644 index 0000000..ff63527 --- /dev/null +++ b/airflow_dbt/hooks/dbt_google_hook.py @@ -0,0 +1,161 @@ +import logging +import os +import tarfile +from tempfile import NamedTemporaryFile +from typing import Any, Dict, List +from uuid import uuid4 + +from airflow.exceptions import AirflowException +from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook +from airflow.providers.google.cloud.hooks.gcs import ( + GCSHook, _parse_gcs_url, gcs_object_is_directory, +) + +from hooks.dbt_hook import DbtBaseHook + + +class DbtCloudBuildHook(DbtBaseHook): + """ + Runs the dbt command in a Cloud Build job in GCP + + :type dir: str + :param dir: The directory containing the DBT files. The logic is, if this + is a GCS blob compressed in tar.gz then it will be used in the build + process without re-uploading. Otherwise we expect "dir" to point to a + local path to be uploaded to the GCS prefix set in + "gcs_staging_location". + :type env: dict + :param env: If set, passed to the dbt executor + :param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your + `PATH` + :type dbt_bin: str + + :param project_id: GCP Project ID as stated in the console + :type project_id: str + :param timeout: Default is set in Cloud Build itself as ten minutes. A + duration in seconds with up to nine fractional digits, terminated by + 's'. Example: "3.5s" + :type timeout: str + :param wait: Waits for the cloud build process to finish. That is waiting + for the DBT command to finish running or run asynchronously + :type wait: bool + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param gcs_staging_location: Where to store the sources to be fetch later + by the cloud build job. It should be the GCS url for a folder. For + example: `gs://my-bucket/stored. A sub-folder will be generated to + avoid collision between possible different concurrent runs. + :param gcs_staging_location: str + :param dbt_version: the DBT version to be fetched from dockerhub. Defaults + to '0.21.0' + :type dbt_version: str + """ + + def __init__( + self, + project_id: str, + dir: str = '.', + gcs_staging_location: str = None, + timeout: str = None, + wait: bool = True, + gcp_conn_id: str = "google_cloud_default", + dbt_version: str = '0.21.0', + env: Dict = None, + dbt_bin='dbt', + ): + if dir is not None and gcs_staging_location is not None: + logging.info(f'Files in "{dir}" will be uploaded to GCS with the ' + f'prefix "{gcs_staging_location}"') + # check the destination is a gcs directory and extract bucket and + # folder + if not gcs_object_is_directory(gcs_staging_location): + raise ValueError( + f'The provided "gcs_sources_location": "' + f'{gcs_staging_location}"' + f' is not a valid gcs folder' + ) + slb, slp = _parse_gcs_url(gcs_staging_location) + # we have provided something similar to 'gs:///' + self.gcs_staging_bucket = slb + self.gcs_staging_blob = f'{slp}dbt_staging_{uuid4().hex}.tar.gz' + self.upload = True + + elif dir is not None and gcs_staging_location is None: + logging.info('Files in the "{dir}" blob will be used') + staging_bucket, staging_blob = _parse_gcs_url(dir) + if not staging_blob.endswith('.tar.gz'): + raise AirflowException( + f'The provided blob "{dir}" to a compressed file does not '+ + f'have the right extension ".tar.gz' + ) + self.gcs_staging_bucket = staging_bucket + self.gcs_staging_blob = staging_blob + self.upload = False + + self.dbt_version = dbt_version + self.cloud_build_hook = CloudBuildHook(gcp_conn_id=gcp_conn_id) + self.gcp_conn_id = gcp_conn_id + self.project_id = project_id + self.timeout = timeout + self.wait = wait + + super().__init__(dir=dir, env=env, dbt_bin=dbt_bin) + + def get_conn(self) -> Any: + """Returns the cloud build connection, which is a gcp connection""" + return self.cloud_build_hook.get_conn() + + def upload_dbt_sources(self) -> None: + """Upload sources from local to a staging location""" + gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) + with \ + NamedTemporaryFile() as compressed_file, \ + tarfile.open(compressed_file.name, "w:gz") as tar: + tar.add(self.dir, arcname=os.path.basename(self.dir)) + gcs_hook.upload( + bucket_name=self.gcs_staging_bucket, + object_name=self.gcs_staging_blob, + filename=compressed_file.name, + ) + + def run_dbt(self, dbt_cmd: List[str]): + """ + Run the dbt cli + + :param dbt_cmd: The dbt whole command to run + :type dbt_cmd: List[str] + """ + """See: https://cloud.google.com/cloud-build/docs/api/reference/rest + /v1/projects.builds""" + + if self.upload: + self.upload_dbt_sources() + + results = self.cloud_build_hook.create_build( + build={ + 'steps': [{ + 'name': f'fishtownanalytics/dbt:{self.dbt_version}', + 'entrypoint': '/bin/sh', + 'args': ['-c', *dbt_cmd], + 'env': [f'{k}={v}' for k, v in self.env.items()] + }], + 'source': { + 'storageSource': { + "bucket": self.gcs_staging_bucket, + "object": self.gcs_staging_blob, + } + } + }, + project_id=self.project_id, + wait=True, + timeout=self.timeout, + metadata=self.env, + ) + logging.info( + f'Triggered build {results["id"]}\nYou can find the logs at ' + f'{results["logUrl"]}' + ) + + def on_kill(self): + """Stopping the build is not implemented until google providers v6""" + raise NotImplementedError From b922f21880453e331fdb1de663b282e242569652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 13:01:59 +0200 Subject: [PATCH 03/17] Fix base operator --- airflow_dbt/operators/dbt_operator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 7be49e9..f4f2eaa 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -84,7 +84,10 @@ def __init__(self, self.verbose = verbose self.warn_error = warn_error self.base_command = base_command - self.hook = dbt_hook if dbt_hook is not None else DbtCliHook() + self.hook = dbt_hook if dbt_hook is not None else DbtCliHook( + dir=dir, + dbt_bin=dbt_bin + ) def execute(self, context: Any): """Runs the provided command in the provided execution environment""" From f2fb0188a14559ab49654c353f874e60700193c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 13:02:23 +0200 Subject: [PATCH 04/17] Test GCP Cloud Build hook --- tests/operators/test_dbt_operator.py | 45 ++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index 236396d..bb4dc31 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -1,8 +1,10 @@ import datetime from unittest import TestCase, mock +from unittest.mock import patch from airflow import DAG, configuration +from airflow_dbt.hooks.dbt_google_hook import DbtCloudBuildHook from airflow_dbt.hooks.dbt_hook import DbtCliHook from airflow_dbt.operators.dbt_operator import ( DbtDepsOperator, DbtRunOperator, DbtSeedOperator, DbtSnapshotOperator, @@ -63,3 +65,46 @@ def test_dbt_deps(self, mock_run_dbt): ) operator.execute(None) mock_run_dbt.assert_called_once_with(['dbt', 'deps']) + + +class TestDbtRunWithCloudBuild(TestCase): + def setUp(self): + configuration.conf.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2020, 2, 27) + } + self.dag = DAG('test_dag_id', default_args=args) + + @patch('airflow_dbt.hooks.dbt_google_hook.CloudBuildHook') + @patch('airflow_dbt.hooks.dbt_google_hook.GCSHook') + def test_dbt_deps(self, MockLocalCloudBuildHook, MockGCSHook): + operator = DbtRunOperator( + task_id='test_dbt_run_on_cloud_build', + dbt_hook=DbtCloudBuildHook( + project_id='my-project-id', + gcp_conn_id='my_conn_id', + gcs_staging_location='gs://my-bucket/certain-folder/' + ), + dag=self.dag + ) + operator.execute(None) + MockLocalCloudBuildHook.assert_called_once_with(gcp_conn_id='my_conn_id') + MockGCSHook.assert_called_once_with(gcp_conn_id='my_conn_id') + MockGCSHook().upload.assert_called_once() + + +{ + 'steps': [{ + 'name': 'fishtownanalytics/dbt:0.21.0', 'entrypoint': '/bin/sh', + 'args': ['-c', 'dbt', 'run'], 'env': [] + }], + 'source': { + 'storageSource': { + 'bucket': 'my-bucket', + 'object': + 'certain-folder/dbt_staging_c5013d9965b54386bcf84ab76e42c848' + '.tar.gz' + } + } +} From ad837eef8a9c74f043deafe79850636401770a00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 14:44:30 +0200 Subject: [PATCH 05/17] Fix cloudbuild logic --- airflow_dbt/hooks/dbt_google_hook.py | 54 ++++++++++------------------ 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/airflow_dbt/hooks/dbt_google_hook.py b/airflow_dbt/hooks/dbt_google_hook.py index ff63527..5babc41 100644 --- a/airflow_dbt/hooks/dbt_google_hook.py +++ b/airflow_dbt/hooks/dbt_google_hook.py @@ -19,11 +19,8 @@ class DbtCloudBuildHook(DbtBaseHook): Runs the dbt command in a Cloud Build job in GCP :type dir: str - :param dir: The directory containing the DBT files. The logic is, if this - is a GCS blob compressed in tar.gz then it will be used in the build - process without re-uploading. Otherwise we expect "dir" to point to a - local path to be uploaded to the GCS prefix set in - "gcs_staging_location". + :param dir: Optional, if set the process considers that sources must be + uploaded prior to running the DBT job :type env: dict :param env: If set, passed to the dbt executor :param dbt_bin: The `dbt` CLI. Defaults to `dbt`, so assumes it's on your @@ -54,7 +51,7 @@ class DbtCloudBuildHook(DbtBaseHook): def __init__( self, project_id: str, - dir: str = '.', + dir: str = None, gcs_staging_location: str = None, timeout: str = None, wait: bool = True, @@ -63,34 +60,18 @@ def __init__( env: Dict = None, dbt_bin='dbt', ): - if dir is not None and gcs_staging_location is not None: - logging.info(f'Files in "{dir}" will be uploaded to GCS with the ' - f'prefix "{gcs_staging_location}"') - # check the destination is a gcs directory and extract bucket and - # folder - if not gcs_object_is_directory(gcs_staging_location): - raise ValueError( - f'The provided "gcs_sources_location": "' - f'{gcs_staging_location}"' - f' is not a valid gcs folder' - ) - slb, slp = _parse_gcs_url(gcs_staging_location) - # we have provided something similar to 'gs:///' - self.gcs_staging_bucket = slb - self.gcs_staging_blob = f'{slp}dbt_staging_{uuid4().hex}.tar.gz' - self.upload = True - - elif dir is not None and gcs_staging_location is None: - logging.info('Files in the "{dir}" blob will be used') - staging_bucket, staging_blob = _parse_gcs_url(dir) - if not staging_blob.endswith('.tar.gz'): - raise AirflowException( - f'The provided blob "{dir}" to a compressed file does not '+ - f'have the right extension ".tar.gz' + logging.info(f'Files in "{dir}" will be uploaded to GCS with the ' + f'prefix "{gcs_staging_location}"') + staging_bucket, staging_blob = _parse_gcs_url(gcs_staging_location) + # we have provided something similar to + # 'gs:///' + if not staging_blob.endswith('.tar.gz'): + raise AirflowException( + f'The provided blob "{staging_blob}" to a compressed file does not ' + + f'have the right extension ".tar.gz' ) - self.gcs_staging_bucket = staging_bucket - self.gcs_staging_blob = staging_blob - self.upload = False + self.gcs_staging_bucket = staging_bucket + self.gcs_staging_blob = staging_blob self.dbt_version = dbt_version self.cloud_build_hook = CloudBuildHook(gcp_conn_id=gcp_conn_id) @@ -128,7 +109,10 @@ def run_dbt(self, dbt_cmd: List[str]): """See: https://cloud.google.com/cloud-build/docs/api/reference/rest /v1/projects.builds""" - if self.upload: + # if we indicate that the sources are in a local directory by setting + # the "dir" pointing to a local path, then those sources will be + # uploaded to the expected blob + if self.dir is not None: self.upload_dbt_sources() results = self.cloud_build_hook.create_build( @@ -152,7 +136,7 @@ def run_dbt(self, dbt_cmd: List[str]): metadata=self.env, ) logging.info( - f'Triggered build {results["id"]}\nYou can find the logs at ' + f'Triggered build {results["id"]}. You can find the logs at ' f'{results["logUrl"]}' ) From 543d744553e5b219b79cbfd9d034e2db03dbc3f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 15:54:42 +0200 Subject: [PATCH 06/17] Clean unused imports and fix path to upload dbt sources in GCS --- airflow_dbt/hooks/dbt_google_hook.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow_dbt/hooks/dbt_google_hook.py b/airflow_dbt/hooks/dbt_google_hook.py index 5babc41..6abbe11 100644 --- a/airflow_dbt/hooks/dbt_google_hook.py +++ b/airflow_dbt/hooks/dbt_google_hook.py @@ -3,12 +3,11 @@ import tarfile from tempfile import NamedTemporaryFile from typing import Any, Dict, List -from uuid import uuid4 from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook from airflow.providers.google.cloud.hooks.gcs import ( - GCSHook, _parse_gcs_url, gcs_object_is_directory, + GCSHook, _parse_gcs_url, ) from hooks.dbt_hook import DbtBaseHook @@ -131,7 +130,7 @@ def run_dbt(self, dbt_cmd: List[str]): } }, project_id=self.project_id, - wait=True, + wait=self.wait, timeout=self.timeout, metadata=self.env, ) From 5ac7f2ac71b8e23c4036777e8d52374a4a96664c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 15:57:01 +0200 Subject: [PATCH 07/17] Fix the upload sources test --- tests/operators/test_dbt_operator.py | 33 +++++++++++----------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/tests/operators/test_dbt_operator.py b/tests/operators/test_dbt_operator.py index bb4dc31..27544c8 100644 --- a/tests/operators/test_dbt_operator.py +++ b/tests/operators/test_dbt_operator.py @@ -76,35 +76,28 @@ def setUp(self): } self.dag = DAG('test_dag_id', default_args=args) + @patch('airflow_dbt.hooks.dbt_google_hook.NamedTemporaryFile') @patch('airflow_dbt.hooks.dbt_google_hook.CloudBuildHook') @patch('airflow_dbt.hooks.dbt_google_hook.GCSHook') - def test_dbt_deps(self, MockLocalCloudBuildHook, MockGCSHook): + def test_upload_files(self, MockGCSHook, MockCBHook, MockTempFile): + # Change the context provider returned name for the file + MockTempFile.return_value.__enter__.return_value.name = 'tempfile' operator = DbtRunOperator( task_id='test_dbt_run_on_cloud_build', dbt_hook=DbtCloudBuildHook( project_id='my-project-id', gcp_conn_id='my_conn_id', - gcs_staging_location='gs://my-bucket/certain-folder/' + dir='.', + gcs_staging_location='gs://my-bucket/certain-folder' + '/stored_dbt_files.tar.gz' ), dag=self.dag ) operator.execute(None) - MockLocalCloudBuildHook.assert_called_once_with(gcp_conn_id='my_conn_id') + MockCBHook.assert_called_once_with(gcp_conn_id='my_conn_id') MockGCSHook.assert_called_once_with(gcp_conn_id='my_conn_id') - MockGCSHook().upload.assert_called_once() - - -{ - 'steps': [{ - 'name': 'fishtownanalytics/dbt:0.21.0', 'entrypoint': '/bin/sh', - 'args': ['-c', 'dbt', 'run'], 'env': [] - }], - 'source': { - 'storageSource': { - 'bucket': 'my-bucket', - 'object': - 'certain-folder/dbt_staging_c5013d9965b54386bcf84ab76e42c848' - '.tar.gz' - } - } -} + MockGCSHook().upload.assert_called_once_with( + bucket_name='my-bucket', + object_name='certain-folder/stored_dbt_files.tar.gz', + filename='tempfile' + ) From 53243c01389ae0637725c8480b916f58070b5e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 16:09:57 +0200 Subject: [PATCH 08/17] Add docstrings to all DbtBaseOperator subclases --- airflow_dbt/operators/dbt_operator.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index f4f2eaa..90930d5 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Dict from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -108,6 +108,7 @@ def execute(self, context: Any): class DbtRunOperator(DbtBaseOperator): + """Runs a dbt run command""" @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): super().__init__( @@ -120,6 +121,7 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): class DbtTestOperator(DbtBaseOperator): + """Runs a dbt test command""" @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): super().__init__( @@ -132,6 +134,7 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): class DbtDocsGenerateOperator(DbtBaseOperator): + """Runs a dbt docs generate command""" @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): super().__init__( @@ -144,6 +147,7 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): class DbtSnapshotOperator(DbtBaseOperator): + """Runs a dbt snapshot command""" @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): super().__init__( @@ -156,6 +160,7 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): class DbtSeedOperator(DbtBaseOperator): + """Runs a dbt seed command""" @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): super().__init__( @@ -168,6 +173,7 @@ def __init__(self, profiles_dir=None, target=None, *args, **kwargs): class DbtDepsOperator(DbtBaseOperator): + """Runs a dbt deps command""" @apply_defaults def __init__(self, profiles_dir=None, target=None, *args, **kwargs): super().__init__( From 644f59470bfe6948033ddfb0a46701da5bec5c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Mon, 18 Oct 2021 16:10:24 +0200 Subject: [PATCH 09/17] Add `env` param to DbtBaseOperator to pass env variables to runtime --- airflow_dbt/operators/dbt_operator.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 90930d5..f45fc9d 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -16,6 +16,8 @@ class DbtBaseOperator(BaseOperator): :param target: If set, passed as the `--target` argument to the `dbt` command :type dir: str :param dir: The directory to run the CLI in + :param env: If set, passed to the dbt executor + :type env: dict :type vars: str :param vars: If set, passed as the `--vars` argument to the `dbt` command :type vars: dict @@ -53,7 +55,8 @@ class DbtBaseOperator(BaseOperator): def __init__(self, profiles_dir=None, target=None, - dir='.', + dir: str = '.', + env: Dict = None, vars=None, models=None, exclude=None, @@ -73,6 +76,7 @@ def __init__(self, self.profiles_dir = profiles_dir self.target = target self.dir = dir + self.env = {} if env is None else env self.vars = vars self.models = models self.full_refresh = full_refresh @@ -86,6 +90,7 @@ def __init__(self, self.base_command = base_command self.hook = dbt_hook if dbt_hook is not None else DbtCliHook( dir=dir, + env=self.env, dbt_bin=dbt_bin ) From 5cc650d4e84e03e24ac5ce569d9b37e42fc0d98f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Fri, 22 Oct 2021 12:06:18 +0200 Subject: [PATCH 10/17] Export the DbtCloudBuildHook from the hooks package --- airflow_dbt/hooks/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow_dbt/hooks/__init__.py b/airflow_dbt/hooks/__init__.py index 8644e88..a5c997a 100644 --- a/airflow_dbt/hooks/__init__.py +++ b/airflow_dbt/hooks/__init__.py @@ -1 +1,2 @@ from .dbt_hook import DbtCliHook +from .dbt_google_hook import DbtCloudBuildHook From db5e12132adbab1fc1ea90b429325042f4dc7f01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Fri, 22 Oct 2021 12:07:01 +0200 Subject: [PATCH 11/17] Check that google provider API version is the right for the implementation (5) --- airflow_dbt/hooks/dbt_google_hook.py | 38 ++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/airflow_dbt/hooks/dbt_google_hook.py b/airflow_dbt/hooks/dbt_google_hook.py index 6abbe11..dc7e47a 100644 --- a/airflow_dbt/hooks/dbt_google_hook.py +++ b/airflow_dbt/hooks/dbt_google_hook.py @@ -9,8 +9,24 @@ from airflow.providers.google.cloud.hooks.gcs import ( GCSHook, _parse_gcs_url, ) - -from hooks.dbt_hook import DbtBaseHook +from airflow.providers.google.get_provider_info import get_provider_info +from packaging import version + +from .dbt_hook import DbtBaseHook + +# Check we're using the right google provider version. As composer is the +# most brad used Airflow installation we will default to the latest version +# composer is using +google_providers_version = get_provider_info().get('versions')[0] +v_min = version.parse('5.0.0') +v_max = version.parse('6.0.0') +v_provider = version.parse(google_providers_version) +if not v_min <= v_provider < v_max: + raise Exception( + f'The provider "apache-airflow-providers-google" version "' + f'{google_providers_version}" is not compatible with the current API. ' + f'Please install a compatible version in the range [{v_min}, {v_max})"' + ) class DbtCloudBuildHook(DbtBaseHook): @@ -59,8 +75,6 @@ def __init__( env: Dict = None, dbt_bin='dbt', ): - logging.info(f'Files in "{dir}" will be uploaded to GCS with the ' - f'prefix "{gcs_staging_location}"') staging_bucket, staging_blob = _parse_gcs_url(gcs_staging_location) # we have provided something similar to # 'gs:///' @@ -87,10 +101,14 @@ def get_conn(self) -> Any: def upload_dbt_sources(self) -> None: """Upload sources from local to a staging location""" + logging.info( + f'Files in "{dir}" will be uploaded to GCS with the ' + f'prefix "gs://{self.gcs_staging_bucket}/{self.gcs_staging_blob}"' + ) gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) with \ - NamedTemporaryFile() as compressed_file, \ - tarfile.open(compressed_file.name, "w:gz") as tar: + NamedTemporaryFile() as compressed_file, \ + tarfile.open(compressed_file.name, "w:gz") as tar: tar.add(self.dir, arcname=os.path.basename(self.dir)) gcs_hook.upload( bucket_name=self.gcs_staging_bucket, @@ -115,7 +133,7 @@ def run_dbt(self, dbt_cmd: List[str]): self.upload_dbt_sources() results = self.cloud_build_hook.create_build( - build={ + body={ 'steps': [{ 'name': f'fishtownanalytics/dbt:{self.dbt_version}', 'entrypoint': '/bin/sh', @@ -130,9 +148,9 @@ def run_dbt(self, dbt_cmd: List[str]): } }, project_id=self.project_id, - wait=self.wait, - timeout=self.timeout, - metadata=self.env, + # wait=self.wait, + # timeout=self.timeout, + # metadata=self.env, ) logging.info( f'Triggered build {results["id"]}. You can find the logs at ' From f55624ae32500ac87206a8767427a488e28d9467 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Fri, 22 Oct 2021 12:12:46 +0200 Subject: [PATCH 12/17] Add project_dir flag to the command line The Airflow bash executor will run the command isolated in the worker. To be able to locate the profiles and code we have to tell dbt with the new implemented flag `dbt run --project-dir /path/to/my/project_dir --profiles-dir /path/to/profile.yaml` ` --- airflow_dbt/hooks/dbt_hook.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow_dbt/hooks/dbt_hook.py b/airflow_dbt/hooks/dbt_hook.py index bf2c3fc..8897df0 100644 --- a/airflow_dbt/hooks/dbt_hook.py +++ b/airflow_dbt/hooks/dbt_hook.py @@ -31,6 +31,7 @@ def generate_dbt_cli_command( self, base_command: str, profiles_dir: str = None, + project_dir: str = None, target: str = None, vars: Dict[str, str] = None, full_refresh: bool = False, @@ -78,6 +79,9 @@ def generate_dbt_cli_command( if profiles_dir is not None: dbt_cmd.extend(['--profiles-dir', profiles_dir]) + if project_dir is not None: + dbt_cmd.extend(['--project-dir', project_dir]) + if target is not None: dbt_cmd.extend(['--target', target]) From ef26f88067901b14ee559bb9d108983d4886e7ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Fri, 22 Oct 2021 12:58:50 +0200 Subject: [PATCH 13/17] Remove cloud build timeout and wait. Thats google provider api v6, not 5 --- airflow_dbt/hooks/dbt_google_hook.py | 47 +++++++++++++++------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/airflow_dbt/hooks/dbt_google_hook.py b/airflow_dbt/hooks/dbt_google_hook.py index dc7e47a..36aaebd 100644 --- a/airflow_dbt/hooks/dbt_google_hook.py +++ b/airflow_dbt/hooks/dbt_google_hook.py @@ -1,5 +1,6 @@ import logging import os +import pprint import tarfile from tempfile import NamedTemporaryFile from typing import Any, Dict, List @@ -68,12 +69,11 @@ def __init__( project_id: str, dir: str = None, gcs_staging_location: str = None, - timeout: str = None, - wait: bool = True, gcp_conn_id: str = "google_cloud_default", dbt_version: str = '0.21.0', env: Dict = None, - dbt_bin='dbt', + dbt_bin='', + service_account=None, ): staging_bucket, staging_blob = _parse_gcs_url(gcs_staging_location) # we have provided something similar to @@ -90,8 +90,7 @@ def __init__( self.cloud_build_hook = CloudBuildHook(gcp_conn_id=gcp_conn_id) self.gcp_conn_id = gcp_conn_id self.project_id = project_id - self.timeout = timeout - self.wait = wait + self.service_account = service_account super().__init__(dir=dir, env=env, dbt_bin=dbt_bin) @@ -132,25 +131,29 @@ def run_dbt(self, dbt_cmd: List[str]): if self.dir is not None: self.upload_dbt_sources() - results = self.cloud_build_hook.create_build( - body={ - 'steps': [{ - 'name': f'fishtownanalytics/dbt:{self.dbt_version}', - 'entrypoint': '/bin/sh', - 'args': ['-c', *dbt_cmd], - 'env': [f'{k}={v}' for k, v in self.env.items()] - }], - 'source': { - 'storageSource': { - "bucket": self.gcs_staging_bucket, - "object": self.gcs_staging_blob, - } + cloud_build_config = { + 'steps': [{ + 'name': f'fishtownanalytics/dbt:{self.dbt_version}', + 'args': dbt_cmd, + 'env': [f'{k}={v}' for k, v in self.env.items()] + }], + 'source': { + 'storageSource': { + "bucket": self.gcs_staging_bucket, + "object": self.gcs_staging_blob, } - }, + } + } + + if self.service_account is not None: + cloud_build_config['serviceAccount'] = self.service_account + + cloud_build_config_str = pprint.pformat(cloud_build_config) + logging.info(f'Running the following cloud build config:\n{cloud_build_config_str}') + + results = self.cloud_build_hook.create_build( + body=cloud_build_config, project_id=self.project_id, - # wait=self.wait, - # timeout=self.timeout, - # metadata=self.env, ) logging.info( f'Triggered build {results["id"]}. You can find the logs at ' From 129aa933071ecec9fe9b91faddc8b780e65286e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Fri, 22 Oct 2021 12:59:50 +0200 Subject: [PATCH 14/17] If dbt_bin is provided None or empty just don't add it to the command That comes useful when running the command inside cloud run, since the entrypoint for the command is already `dbt`, and this `dbt` is already in the PATH of the image --- airflow_dbt/hooks/dbt_hook.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airflow_dbt/hooks/dbt_hook.py b/airflow_dbt/hooks/dbt_hook.py index 8897df0..67cbc75 100644 --- a/airflow_dbt/hooks/dbt_hook.py +++ b/airflow_dbt/hooks/dbt_hook.py @@ -74,7 +74,15 @@ def generate_dbt_cli_command( command :type select: str """ - dbt_cmd = [self.dbt_bin, base_command] + # if there's no bin do not append it. Rather generate the command + # without the `/path/to/dbt` prefix. That is useful for running it + # inside containers + if self.dbt_bin == '' or self.dbt_bin is None: + dbt_cmd = [] + else: + dbt_cmd = [self.dbt_bin] + + dbt_cmd.append(base_command) if profiles_dir is not None: dbt_cmd.extend(['--profiles-dir', profiles_dir]) From 289d563bf5783a286b0903373ba1370007539608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Fri, 22 Oct 2021 13:02:39 +0200 Subject: [PATCH 15/17] Add `project_dir` option flag Local execution might not take place in the same folder. It depends on the underlying hook implementation. This way we ensure we're pointing to the folder with an absolute path even if run outside the project folder --- airflow_dbt/operators/dbt_operator.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index f45fc9d..52aff60 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -1,3 +1,4 @@ +import logging from typing import Any, Dict from airflow.models import BaseOperator @@ -54,8 +55,9 @@ class DbtBaseOperator(BaseOperator): @apply_defaults def __init__(self, profiles_dir=None, - target=None, + project_dir = None, dir: str = '.', + target=None, env: Dict = None, vars=None, models=None, @@ -74,8 +76,8 @@ def __init__(self, super(DbtBaseOperator, self).__init__(*args, **kwargs) self.profiles_dir = profiles_dir + self.project_dir = project_dir if project_dir is not None else dir self.target = target - self.dir = dir self.env = {} if env is None else env self.vars = vars self.models = models @@ -99,6 +101,7 @@ def execute(self, context: Any): dbt_cli_command = self.hook.generate_dbt_cli_command( base_command=self.base_command, profiles_dir=self.profiles_dir, + project_dir=self.project_dir, target=self.target, vars=self.vars, full_refresh=self.full_refresh, @@ -109,6 +112,7 @@ def execute(self, context: Any): select=self.select, warn_error=self.warn_error, ) + logging.info(f'Running dbt command "{dbt_cli_command}"') self.hook.run_dbt(dbt_cli_command) From 081222da4e57cd28e49e6dde79f543204fb47e36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Sat, 23 Oct 2021 00:05:11 +0200 Subject: [PATCH 16/17] Add the `use_color` flag (#43). If set adds the dbt flag Also make the command build function more idiomatic by using `append` to append single commands (flags) and `extend` when we want to append several. --- airflow_dbt/hooks/dbt_hook.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/airflow_dbt/hooks/dbt_hook.py b/airflow_dbt/hooks/dbt_hook.py index 67cbc75..3bc1cdd 100644 --- a/airflow_dbt/hooks/dbt_hook.py +++ b/airflow_dbt/hooks/dbt_hook.py @@ -21,25 +21,25 @@ class DbtBaseHook(BaseHook, ABC): :type dbt_bin: str """ - def __init__(self, dir: str = '.', env: Dict = None, dbt_bin='dbt'): + def __init__(self, env: Dict = None, dbt_bin='dbt'): super().__init__() - self.dir = dir self.env = env if env is not None else {} self.dbt_bin = dbt_bin def generate_dbt_cli_command( self, base_command: str, - profiles_dir: str = None, - project_dir: str = None, + profiles_dir: str = '.', + project_dir: str = '.', target: str = None, - vars: Dict[str, str] = None, + vars: Dict = None, full_refresh: bool = False, data: bool = False, schema: bool = False, models: str = None, exclude: str = None, select: str = None, + use_colors: bool = None, warn_error: bool = False, ) -> List[str]: """ @@ -97,26 +97,30 @@ def generate_dbt_cli_command( dbt_cmd.extend(['--vars', json.dumps(vars)]) if data: - dbt_cmd.extend(['--data']) + dbt_cmd.append('--data') if schema: - dbt_cmd.extend(['--schema']) + dbt_cmd.append('--schema') if models is not None: dbt_cmd.extend(['--models', models]) if exclude is not None: - dbt_cmd.extend(['--exclude', self]) + dbt_cmd.extend(['--exclude', exclude]) if select is not None: dbt_cmd.extend(['--select', select]) if full_refresh: - dbt_cmd.extend(['--full-refresh']) + dbt_cmd.append('--full-refresh') if warn_error: dbt_cmd.insert(1, '--warn-error') + if use_colors is not None: + colors_flag = "--use-colors" if use_colors else "--no-use-colors" + dbt_cmd.append(colors_flag) + return dbt_cmd @abstractmethod @@ -161,7 +165,6 @@ def run_dbt(self, dbt_cmd: Union[str, List[str]]): self.sp.run_command( command=dbt_cmd, env=self.env, - cwd=self.dir, ) def on_kill(self): From 4a1fa336a7b70be09f754a3dabf78a32865343e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20I=C3=B1igo?= Date: Sat, 23 Oct 2021 00:32:48 +0200 Subject: [PATCH 17/17] Template all the fields from the DbtBaseOperator --- airflow_dbt/operators/dbt_operator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 52aff60..65a5efb 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -50,7 +50,9 @@ class DbtBaseOperator(BaseOperator): ui_color = '#d6522a' - template_fields = ['vars'] + template_fields = ['profiles_dir', 'project_dir', 'target', 'env', + 'vars', 'models', 'exclude', 'select', 'dbt_bin', 'verbose', + 'warn_error', 'full_refresh', 'data', 'schema', 'base_command'] @apply_defaults def __init__(self,