Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port local runner utility - resetdb #54

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions images/airflow/2.9.1/docker-compose-resetdb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
version: "3.8"

x-airflow-common: &airflow-common
image: amazon-mwaa-docker-images/airflow:2.9.1
environment:
# AWS credentials
AWS_ACCESS_KEY_ID: "FAKE_AWS_ACCESS_KEY_ID"
AWS_SECRET_ACCESS_KEY: "FAKE_AWS_SECRET_ACCESS_KEY"
AWS_SESSION_TOKEN: "FAKE_AWS_SESSION_TOKEN"
AWS_REGION: "us-west-2"
AWS_DEFAULT_REGION: "us-west-2"

# Core configuration
MWAA__CORE__REQUIREMENTS_PATH: "/usr/local/airflow/requirements/requirements.txt"

# Database configuration
MWAA__DB__CREDENTIALS: '{"username": "airflow", "password": "airflow"}'
MWAA__DB__POSTGRES_DB: "airflow"
MWAA__DB__POSTGRES_HOST: "postgres"
MWAA__DB__POSTGRES_PORT: "5432"
MWAA__DB__POSTGRES_SSLMODE: "prefer"

# SQS configuration
MWAA__SQS__CREATE_QUEUE: True
MWAA__SQS__CUSTOM_ENDPOINT: http://sqs:9324
MWAA__SQS__QUEUE_URL: http://sqs:9324/000000000000/celery-queue
MWAA__SQS__USE_SSL: False

volumes:
- ./dags:/usr/local/airflow/dags
- ./plugins:/usr/local/airflow/plugins
- ./requirements:/usr/local/airflow/requirements
depends_on: &airflow-common-depends-on
postgres:
condition: service_healthy
sqs:
condition: service_healthy

services:
postgres:
image: postgres:13
container_name: mwaa-291-db
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
ports:
- 5432:5432
expose:
- 5432

sqs:
image: softwaremill/elasticmq:latest
healthcheck:
# https://github.com/softwaremill/elasticmq/issues/776#issuecomment-1582527921
test: ["CMD-SHELL", "wget -q -S -O - 127.0.0.1:9324/?Action=ListQueues"]
interval: 10s
retries: 5
start_period: 5s
ports:
- 9324:9324
- 9325:9325
expose:
- 9324
- 9325

resetdb:
<<: *airflow-common
command: resetdb
container_name: mwaa-291-resetdb
depends_on:
- postgres
restart: no

volumes:
postgres-db-volume:
name: "mwaa-291-db-volume"
20 changes: 20 additions & 0 deletions images/airflow/2.9.1/python/mwaa/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"worker",
"triggerer",
"shell",
"resetdb",
"spy",
]

Expand All @@ -73,6 +74,20 @@ async def airflow_db_init(environ: dict[str, str]):
await run_command("airflow db migrate", env=environ)


@with_db_lock(4321)
async def airflow_db_reset(environ: dict[str, str]):
"""
Reset Airflow metadata database.

This function resets the Airflow metadata database. It is called when the `resetdb`
command is specified.

:param environ: A dictionary containing the environment variables.
"""
logger.info("Resetting Airflow metadata database.")
await run_command("airflow db reset --yes", env=environ)


@with_db_lock(5678)
async def create_airflow_user(environ: dict[str, str]):
"""
Expand Down Expand Up @@ -238,6 +253,11 @@ async def main() -> None:
time.sleep(1)
case "worker":
os.execlpe("airflow", "airflow", "celery", "worker", environ)
case "resetdb":
# Perform the resetdb functionality
await airflow_db_reset(environ)
# After resetting the db, initialize it again
await airflow_db_init(environ)
case _:
os.execlpe("airflow", "airflow", command, environ)

Expand Down
Loading