Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Once a month dag to fully replace signals #221

Open
wants to merge 1 commit into
base: production
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions dags/atd_knack_signals_full_replace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import os

from airflow.models import DAG
from airflow.operators.docker_operator import DockerOperator
from pendulum import datetime, duration, now

from utils.onepassword import get_env_vars_task
from utils.slack_operator import task_fail_slack_alert
from utils.knack import get_date_filter_arg

DEPLOYMENT_ENVIRONMENT = os.getenv("ENVIRONMENT", "development")

DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2015, 1, 1, tz="America/Chicago"),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"execution_timeout": duration(minutes=15),
"on_failure_callback": task_fail_slack_alert,
}

REQUIRED_SECRETS = {
"KNACK_APP_ID": {
"opitem": "Knack AMD Data Tracker",
"opfield": f"production.appId",
},
"KNACK_API_KEY": {
"opitem": "Knack AMD Data Tracker",
"opfield": f"production.apiKey",
},
"SOCRATA_API_KEY_ID": {
"opitem": "Socrata Key ID, Secret, and Token",
"opfield": "socrata.apiKeyId",
},
"SOCRATA_API_KEY_SECRET": {
"opitem": "Socrata Key ID, Secret, and Token",
"opfield": "socrata.apiKeySecret",
},
"SOCRATA_APP_TOKEN": {
"opitem": "Socrata Key ID, Secret, and Token",
"opfield": "socrata.appToken",
},
"PGREST_ENDPOINT": {
"opitem": "atd-knack-services PostgREST",
"opfield": "production.endpoint",
},
"PGREST_JWT": {
"opitem": "atd-knack-services PostgREST",
"opfield": "production.jwt",
},
"AGOL_USERNAME": {
"opitem": "ArcGIS Online (AGOL) Scripts Publisher",
"opfield": "production.username",
},
"AGOL_PASSWORD": {
"opitem": "ArcGIS Online (AGOL) Scripts Publisher",
"opfield": "production.password",
},
}


with DAG(
dag_id=f"atd_knack_signals_full_replace",
description="Load a full replace of signals (view_197) records from Knack to Postgrest to AGOL and Socrata",
default_args=DEFAULT_ARGS,
schedule_interval="28 2 1 * *" if DEPLOYMENT_ENVIRONMENT == "production" else None,
Copy link
Collaborator

@mddilley mddilley Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chiaberry One thing that crossed my mind when you mentioned figuring out how to pause the other DAG is the max_active_runs setting for a DAG. I'm a little weary of having multiple DAGs for different versions of the same process, and I'm wondering if there is some combination of a schedule (the complexity here might be a stretch for cron expressions but idk), branching logic or something to toggle the full replace arg, and using the max runs setting to make sure that no more than one DAG run occurs at the same time to let the full replace play out before the next run starts.

I'm know that you've put time into this already so maybe it would be a time sink, and we can always think about it more while we test this out.

Copy link
Member

@johnclary johnclary Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am also wary of running multiple instances of this ETL at the same time, and agree with Mike that it would be a good idea to handle this with DAG code.

one other wrinkle, iirc, is that the effect of a full replace is that the modified date of every record in the postgres db is going to be updated to the current date, and so every single time the DAG runs for the rest of the day it is going to process all records, because the postgres > socrata ETL queries by date, not timestamp.

i don't know, you might want to altogether abandon running a full replace on this dataset. the current ETL is not optimized to deal with situation.

tags=["repo:atd-knack-services", "knack", "socrata", "agol", "data-tracker"],
catchup=False,
) as dag:
docker_image = "atddocker/atd-knack-services:production"
app_name = "data-tracker"
container = "view_197"

env_vars = get_env_vars_task(REQUIRED_SECRETS)

t1 = DockerOperator(
task_id="atd_knack_signals_to_postgrest",
image=docker_image,
docker_conn_id="docker_default",
auto_remove=True,
command=f"./atd-knack-services/services/records_to_postgrest.py -a {app_name} -c {container}",
environment=env_vars,
tty=True,
force_pull=True,
mount_tmp_dir=False,
)

t2 = DockerOperator(
task_id="atd_knack_signals_to_socrata",
image=docker_image,
docker_conn_id="docker_default",
auto_remove=True,
command=f"./atd-knack-services/services/records_to_socrata.py -a {app_name} -c {container}",
environment=env_vars,
tty=True,
mount_tmp_dir=False,
)

t3 = DockerOperator(
task_id="atd_knack_signals_to_agol",
image=docker_image,
docker_conn_id="docker_default",
auto_remove=True,
command=f"./atd-knack-services/services/records_to_agol.py -a {app_name} -c {container}",
environment=env_vars,
tty=True,
mount_tmp_dir=False,
)

t1 >> t2 >> t3