From 9afe111f78f0616a3398d48c3785d687ce57f81e Mon Sep 17 00:00:00 2001 From: chiaberry Date: Thu, 18 Apr 2024 17:12:13 -0500 Subject: [PATCH] the knack signals but full replace and once a month --- dags/atd_knack_signals_full_replace.py | 112 +++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 dags/atd_knack_signals_full_replace.py diff --git a/dags/atd_knack_signals_full_replace.py b/dags/atd_knack_signals_full_replace.py new file mode 100644 index 0000000..bc3cd69 --- /dev/null +++ b/dags/atd_knack_signals_full_replace.py @@ -0,0 +1,112 @@ +import os + +from airflow.models import DAG +from airflow.operators.docker_operator import DockerOperator +from pendulum import datetime, duration, now + +from utils.onepassword import get_env_vars_task +from utils.slack_operator import task_fail_slack_alert +from utils.knack import get_date_filter_arg + +DEPLOYMENT_ENVIRONMENT = os.getenv("ENVIRONMENT", "development") + +DEFAULT_ARGS = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2015, 1, 1, tz="America/Chicago"), + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, + "execution_timeout": duration(minutes=15), + "on_failure_callback": task_fail_slack_alert, +} + +REQUIRED_SECRETS = { + "KNACK_APP_ID": { + "opitem": "Knack AMD Data Tracker", + "opfield": f"production.appId", + }, + "KNACK_API_KEY": { + "opitem": "Knack AMD Data Tracker", + "opfield": f"production.apiKey", + }, + "SOCRATA_API_KEY_ID": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.apiKeyId", + }, + "SOCRATA_API_KEY_SECRET": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.apiKeySecret", + }, + "SOCRATA_APP_TOKEN": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.appToken", + }, + "PGREST_ENDPOINT": { + "opitem": "atd-knack-services PostgREST", + "opfield": "production.endpoint", + }, + "PGREST_JWT": { + "opitem": "atd-knack-services PostgREST", + "opfield": "production.jwt", + }, + "AGOL_USERNAME": { + "opitem": "ArcGIS Online (AGOL) Scripts Publisher", + "opfield": "production.username", + }, + "AGOL_PASSWORD": { + "opitem": "ArcGIS Online (AGOL) Scripts Publisher", + "opfield": "production.password", + }, +} + + +with DAG( + dag_id=f"atd_knack_signals_full_replace", + description="Load a full replace of signals (view_197) records from Knack to Postgrest to AGOL and Socrata", + default_args=DEFAULT_ARGS, + schedule_interval="28 2 1 * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, + tags=["repo:atd-knack-services", "knack", "socrata", "agol", "data-tracker"], + catchup=False, +) as dag: + docker_image = "atddocker/atd-knack-services:production" + app_name = "data-tracker" + container = "view_197" + + env_vars = get_env_vars_task(REQUIRED_SECRETS) + + t1 = DockerOperator( + task_id="atd_knack_signals_to_postgrest", + image=docker_image, + docker_conn_id="docker_default", + auto_remove=True, + command=f"./atd-knack-services/services/records_to_postgrest.py -a {app_name} -c {container}", + environment=env_vars, + tty=True, + force_pull=True, + mount_tmp_dir=False, + ) + + t2 = DockerOperator( + task_id="atd_knack_signals_to_socrata", + image=docker_image, + docker_conn_id="docker_default", + auto_remove=True, + command=f"./atd-knack-services/services/records_to_socrata.py -a {app_name} -c {container}", + environment=env_vars, + tty=True, + mount_tmp_dir=False, + ) + + t3 = DockerOperator( + task_id="atd_knack_signals_to_agol", + image=docker_image, + docker_conn_id="docker_default", + auto_remove=True, + command=f"./atd-knack-services/services/records_to_agol.py -a {app_name} -c {container}", + environment=env_vars, + tty=True, + mount_tmp_dir=False, + ) + + t1 >> t2 >> t3