From 3cb0c034ebc1dafc4bc253276fd624e69489f276 Mon Sep 17 00:00:00 2001 From: thenaturalist Date: Thu, 22 Aug 2019 19:42:03 +0200 Subject: [PATCH 1/3] update README --- README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/README.md b/README.md index 5213dbc..e1fb9e9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,28 @@ # dbt-cloud-plugin DBT Cloud Plugin for Airflow + +## Configuration + +Copy the `dbt_cloud_plugin` directory in Airflow's `plugin` directory. + +Create a new connection with the following dictionary as the `Extra` parameter. Leave connection type blank. +``` +{ + "dbt_cloud_api_token": "123abcdefg456", + "dbt_cloud_account_id": 12345678 +} +``` + +In order to obtain your API token, log into your [dbt Cloud Account](https://cloud.getdbt.com), click on your Avatar in the top right corner, then `My Account` and finally on `API Access` in the left bar. + +Note: API Access is not available on the _Free_ plan. + + +In order to test if the connection is set up correctly, log onto the Airflow shell and run + +`airflow test --dry_run dbt_cloud_dag run_dbt_cloud_job 2019-01-01` + + + +---- +MIT License \ No newline at end of file From 97b4ff3cb6e5e44f8053793c40e1f457df56bd9e Mon Sep 17 00:00:00 2001 From: thenaturalist Date: Thu, 22 Aug 2019 19:45:05 +0200 Subject: [PATCH 2/3] code refactoring to improve readability, small corrections, harmonizations --- dbt_cloud_plugin/__init__.py | 4 +- dbt_cloud_plugin/dbt_cloud/dbt_cloud.py | 38 ++++++++++++++----- dbt_cloud_plugin/hooks/dbt_cloud_hook.py | 14 +++++-- .../operators/dbt_cloud_run_job_operator.py | 31 ++++++++------- .../sensors/dbt_cloud_job_sensor.py | 20 ++++++---- examples/dbt_cloud_hourly_dag.py | 10 ++++- 6 files changed, 82 insertions(+), 35 deletions(-) diff --git a/dbt_cloud_plugin/__init__.py b/dbt_cloud_plugin/__init__.py index 085d692..cd74bfe 100644 --- a/dbt_cloud_plugin/__init__.py +++ b/dbt_cloud_plugin/__init__.py @@ -1,7 +1,9 @@ from airflow.plugins_manager import AirflowPlugin + from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook from dbt_cloud_plugin.operators.dbt_cloud_run_job_operator import DbtCloudRunJobOperator -from dbt_cloud_plugin.sensors.dbt_cloud_run_sensor import DbtCloudRunSensor +from dbt_cloud_plugin.sensors.dbt_cloud_job_sensor import DbtCloudRunSensor + class DbtCloudPlugin(AirflowPlugin): name = "dbt_cloud_plugin" diff --git a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py index 7b0afdb..4123315 100644 --- a/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py +++ b/dbt_cloud_plugin/dbt_cloud/dbt_cloud.py @@ -2,6 +2,8 @@ import json import requests import time +from airflow.exceptions import AirflowException + class DbtCloud(object): """ @@ -20,7 +22,7 @@ def __init__(self, account_id, api_token): def _get(self, url_suffix): url = self.api_base + url_suffix - headers = {'Authorization': 'Token %s' % self.api_token} + headers = {'Authorization': f'Token {self.api_token}'} response = requests.get(url, headers=headers) if response.status_code == 200: return json.loads(response.content) @@ -29,7 +31,7 @@ def _get(self, url_suffix): def _post(self, url_suffix, data=None): url = self.api_base + url_suffix - headers = {'Authorization': 'token %s' % self.api_token} + headers = {'Authorization': f'Token {self.api_token}'} response = requests.post(url, headers=headers, data=data) if response.status_code == 200: return json.loads(response.content) @@ -37,13 +39,20 @@ def _post(self, url_suffix, data=None): raise RuntimeError(response.content) def list_jobs(self): - return self._get('/accounts/%s/jobs/' % self.account_id).get('data') + return self._get( + f'/accounts/{self.account_id}/jobs/' + ).get('data') def get_run(self, run_id): - return self._get('/accounts/%s/runs/%s/' % (self.account_id, run_id)).get('data') + return self._get( + f'/accounts/{self.account_id}/runs/{run_id}/' + ).get('data') def trigger_job_run(self, job_id, data=None): - return self._post(url_suffix='/accounts/%s/jobs/%s/run/' % (self.account_id, job_id), data=data).get('data') + return self._post( + url_suffix=f'/accounts/{self.account_id}/jobs/{job_id}/run/', + data=data + ).get('data') def try_get_run(self, run_id, max_tries=3): for i in range(max_tries): @@ -51,10 +60,15 @@ def try_get_run(self, run_id, max_tries=3): run = self.get_run(run_id) return run except RuntimeError as e: - print("Encountered a runtime error while fetching status for {}".format(run_id)) + print( + 'Encountered a runtime error while ' + f'fetching status for {run_id}' + ) time.sleep(10) - raise RuntimeError("Too many failures ({}) while querying for run status".format(run_id)) + raise RuntimeError( + f'Too many failures ({run_id}) while querying for run status' + ) def run_job(self, job_name, data=None): jobs = self.list_jobs() @@ -62,8 +76,14 @@ def run_job(self, job_name, data=None): job_matches = [j for j in jobs if j['name'] == job_name] if len(job_matches) != 1: - raise AirflowException("{} jobs found for {}".format(len(job_matches), job_name)) + raise AirflowException( + f'{len(job_matches)} jobs found for {job_name}' + ) job_def = job_matches[0] - trigger_resp = self.trigger_job_run(job_id=job_def['id'], data=data) + trigger_resp = self.trigger_job_run( + job_id=job_def['id'], + data=data + ) + return trigger_resp diff --git a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py index f2f4980..d4d8c86 100644 --- a/dbt_cloud_plugin/hooks/dbt_cloud_hook.py +++ b/dbt_cloud_plugin/hooks/dbt_cloud_hook.py @@ -1,7 +1,9 @@ -from dbt_cloud_plugin.dbt_cloud.dbt_cloud import DbtCloud from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException +from dbt_cloud_plugin.dbt_cloud.dbt_cloud import DbtCloud + + class RunStatus: queued = 1 dequeued = 2 @@ -23,6 +25,7 @@ class RunStatus: def lookup(cls, status): return cls.LOOKUP.get(status, 'Unknown') + class DbtCloudHook(BaseHook): """ Interact with dbt Cloud. @@ -36,11 +39,16 @@ def get_conn(self): if 'dbt_cloud_api_token' in conn.extra_dejson: dbt_cloud_api_token = conn.extra_dejson['dbt_cloud_api_token'] else: - raise AirflowException('No dbt Cloud API Token was supplied in dbt Cloud connection.') + raise AirflowException( + 'No dbt Cloud API Token was supplied in dbt Cloud connection.' + ) + if 'dbt_cloud_account_id' in conn.extra_dejson: dbt_cloud_account_id = conn.extra_dejson['dbt_cloud_account_id'] else: - raise AirflowException('No dbt Cloud Account ID was supplied in dbt Cloud connection.') + raise AirflowException( + 'No dbt Cloud Account ID was supplied in dbt Cloud connection.' + ) return DbtCloud(dbt_cloud_account_id, dbt_cloud_api_token) diff --git a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py index 17b8faa..b4ee69f 100644 --- a/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py +++ b/dbt_cloud_plugin/operators/dbt_cloud_run_job_operator.py @@ -1,20 +1,16 @@ # -*- coding: utf-8 -*- -import json -import requests -import time - from airflow.models import BaseOperator -from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException +from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook + + class DbtCloudRunJobOperator(BaseOperator): """ Operator to run a dbt cloud job. :param dbt_cloud_conn_id: dbt Cloud connection ID. :type dbt_cloud_conn_id: string - :param project_id: dbt Cloud project ID. - :type project_id: int :param job_name: dbt Cloud job name. :type job_name: string """ @@ -27,7 +23,8 @@ def __init__(self, super(DbtCloudRunJobOperator, self).__init__(*args, **kwargs) if dbt_cloud_conn_id is None: - raise AirflowException('No valid dbt cloud connection ID was supplied.') + raise AirflowException('No valid dbt Cloud ' + 'connection ID was supplied.') if job_name is None: raise AirflowException('No job name was supplied.') @@ -37,15 +34,23 @@ def __init__(self, def execute(self, **kwargs): - self.log.info('Attempting to trigger a run of dbt cloud job: {}'.format(self.job_name)) + self.log.info( + f'Attempting to trigger a run of dbt Cloud job: {self.job_name}' + ) try: dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) dbt_cloud = dbt_cloud_hook.get_conn() - data = {'cause':'Kicked off via Airflow'} + + data = {'cause': 'Kicked off via Airflow'} trigger_resp = dbt_cloud.run_job(self.job_name, data=data) - self.log.info('Triggered Run ID {}'.format(trigger_resp['id'])) + triggered_run_id = trigger_resp['id'] + + self.log.info(f'Triggered Run ID {triggered_run_id}') + except RuntimeError as e: - raise AirflowException("Error while triggering job {}: {}".format(self.job_name, e)) + raise AirflowException( + f'Error while triggering job {self.job_name}: {e}' + ) - return trigger_resp['id'] + return triggered_run_id diff --git a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py index 4c5c376..06fc17e 100644 --- a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py +++ b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py @@ -1,7 +1,9 @@ -from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook -from airflow.sensors.base_sensor_operator import BaseSensorOperator -from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException +from airflow.utils.decorators import apply_defaults +from airflow.sensors.base_sensor_operator import BaseSensorOperator + +from dbt_cloud_plugin.hooks.dbt_cloud_hook import DbtCloudHook + class DbtCloudRunSensor(BaseSensorOperator): """ @@ -23,25 +25,27 @@ def __init__(self, super(DbtCloudRunSensor, self).__init__(*args, **kwargs) if dbt_cloud_conn_id is None: - raise AirflowException('No valid dbt cloud connection ID was supplied.') + raise AirflowException('No valid dbt Cloud connection ID was supplied.') if run_id is None: - raise AirflowException('No dbt cloud run ID was supplied.') + raise AirflowException('No dbt Cloud Run ID was supplied.') self.dbt_cloud_conn_id = dbt_cloud_conn_id self.run_id = run_id def poke(self, context): - self.log.info('Sensor checking state of dbt cloud run ID: %s', self.run_id) + self.log.info(f'Sensor checking state of dbt Cloud Run ID: {self.run_id}') dbt_cloud_hook = DbtCloudHook(dbt_cloud_conn_id=self.dbt_cloud_conn_id) run_status = dbt_cloud_hook.get_run_status(run_id=self.run_id) - self.log.info('State of Run ID {}: {}'.format(self.run_id, run_status)) + self.log.info(f'State of Run ID {self.run_id}: {run_status}') TERMINAL_RUN_STATES = ['Success', 'Error', 'Cancelled'] FAILED_RUN_STATES = ['Error'] if run_status in FAILED_RUN_STATES: - return AirflowException('dbt cloud Run ID {} Failed.'.format(self.run_id)) + return AirflowException( + f'dbt Cloud Run ID {self.run_id} failed.' + ) if run_status in TERMINAL_RUN_STATES: return True else: diff --git a/examples/dbt_cloud_hourly_dag.py b/examples/dbt_cloud_hourly_dag.py index a707758..2333a3e 100644 --- a/examples/dbt_cloud_hourly_dag.py +++ b/examples/dbt_cloud_hourly_dag.py @@ -19,7 +19,15 @@ 'provide_context': True } -dag = DAG('dbt_cloud_hourly_dag', concurrency=1, max_active_runs=1, catchup=False, schedule_interval='0 * * * *', default_args=default_args) +dag = DAG( + 'dbt_cloud_hourly_dag', + concurrency=1, + max_active_runs=1, + catchup=False, + schedule_interval='0 * * * *', + default_args=default_args +) + dag.doc_md = __doc__ # Run hourly DAG through dbt cloud. From 8e8f360e54bb9ad94d98f1aae12e53d3a9f89e58 Mon Sep 17 00:00:00 2001 From: thenaturalist Date: Fri, 23 Aug 2019 14:44:47 +0200 Subject: [PATCH 3/3] update sensor to fail upon dbt Cloud API Error & correct example watch task --- dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py | 4 ++-- examples/dbt_cloud_hourly_dag.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py index 06fc17e..346dab6 100644 --- a/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py +++ b/dbt_cloud_plugin/sensors/dbt_cloud_job_sensor.py @@ -39,11 +39,11 @@ def poke(self, context): run_status = dbt_cloud_hook.get_run_status(run_id=self.run_id) self.log.info(f'State of Run ID {self.run_id}: {run_status}') - TERMINAL_RUN_STATES = ['Success', 'Error', 'Cancelled'] + TERMINAL_RUN_STATES = ['Success', 'Cancelled'] FAILED_RUN_STATES = ['Error'] if run_status in FAILED_RUN_STATES: - return AirflowException( + raise AirflowException( f'dbt Cloud Run ID {self.run_id} failed.' ) if run_status in TERMINAL_RUN_STATES: diff --git a/examples/dbt_cloud_hourly_dag.py b/examples/dbt_cloud_hourly_dag.py index 2333a3e..b2f9171 100644 --- a/examples/dbt_cloud_hourly_dag.py +++ b/examples/dbt_cloud_hourly_dag.py @@ -41,7 +41,7 @@ watch_dbt_cloud_job = DbtCloudRunSensor( task_id='watch_dbt_cloud_job', dbt_cloud_conn_id='dbt_cloud', - job_id="{{ task_instance.xcom_pull(task_ids='run_dbt_cloud_job', dag_id='dbt_cloud_hourly_dag', key='return_value') }}", + run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_cloud_job', dag_id='dbt_cloud_hourly_dag', key='return_value') }}", sla=timedelta(minutes=45), dag=dag)