diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile b/images/airflow/2.8.0/Dockerfiles/Dockerfile index 0581364..ee70bfa 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile @@ -3,7 +3,7 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.029839 +# This file was generated on 2024-02-22 19:42:44.935774 # FROM amazon-mwaa/airflow:2.8.0-base diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile-dev b/images/airflow/2.8.0/Dockerfiles/Dockerfile-dev index f4c0c5f..0ae295d 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile-dev +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile-dev @@ -3,7 +3,7 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.021778 +# This file was generated on 2024-02-22 19:42:44.927521 # FROM amazon-mwaa/airflow:2.8.0-base diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer index 96c6a1f..df2a928 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer @@ -3,7 +3,7 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.032499 +# This file was generated on 2024-02-22 19:42:44.938417 # FROM amazon-mwaa/airflow:2.8.0-base diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-dev b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-dev index 9d285d8..a229408 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-dev +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-dev @@ -3,7 +3,7 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.024518 +# This file was generated on 2024-02-22 19:42:44.930305 # FROM amazon-mwaa/airflow:2.8.0-base diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged index 6ef5f4d..dfb4196 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged @@ -3,7 +3,7 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.035092 +# This file was generated on 2024-02-22 19:42:44.941098 # FROM amazon-mwaa/airflow:2.8.0-base diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged-dev b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged-dev index 1ce50d9..c4676f4 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged-dev +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-privileged-dev @@ -3,7 +3,7 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.027225 +# This file was generated on 2024-02-22 19:42:44.933100 # FROM amazon-mwaa/airflow:2.8.0-base diff --git a/images/airflow/2.8.0/Dockerfiles/Dockerfile.base b/images/airflow/2.8.0/Dockerfiles/Dockerfile.base index 228a8d5..095bd05 100644 --- a/images/airflow/2.8.0/Dockerfiles/Dockerfile.base +++ b/images/airflow/2.8.0/Dockerfiles/Dockerfile.base @@ -3,14 +3,15 @@ # the Jinja2-templated Dockerfile.j2 file, so you need to change that file # instead. # -# This file was generated on 2024-02-12 01:56:33.018473 +# This file was generated on 2024-02-22 19:42:44.924226 # FROM public.ecr.aws/amazonlinux/amazonlinux:2023 # Environment variables -# Temporarily downgrading to 2.7.2 to make it easier to test using it internally. +# Temporarily downgrading to 2.7.2 to make it easier to test the Docker image +# within Amazon MWAA since 2.7.2 is a version we support. ENV AIRFLOW_VERSION=2.7.2 ENV AIRFLOW_AMAZON_PROVIDERS_VERSION=8.7.1 diff --git a/images/airflow/2.8.0/docker-compose.yaml b/images/airflow/2.8.0/docker-compose.yaml index 00115e1..4f0b912 100644 --- a/images/airflow/2.8.0/docker-compose.yaml +++ b/images/airflow/2.8.0/docker-compose.yaml @@ -2,7 +2,6 @@ version: "3.8" x-airflow-common: &airflow-common image: amazon-mwaa/airflow:2.8.0 - container_name: mwaa-280-db restart: always environment: # AWS credentials @@ -23,7 +22,10 @@ x-airflow-common: &airflow-common MWAA__DB__POSTGRES_DB: "airflow" # SQS configuration - MWAA__SQS__QUEUE_URL: ${MWAA__SQS__QUEUE_URL} + MWAA__SQS__CREATE_QUEUE: True + MWAA__SQS__CUSTOM_ENDPOINT: http://sqs:9324 + MWAA__SQS__QUEUE_URL: http://sqs:9324/000000000000/celery-queue + MWAA__SQS__USE_SSL: False volumes: - ./dags:/usr/local/airflow/dags @@ -32,6 +34,8 @@ x-airflow-common: &airflow-common depends_on: &airflow-common-depends-on postgres: condition: service_healthy + sqs: + condition: service_healthy services: postgres: @@ -54,22 +58,20 @@ services: expose: - 5432 - # TODO Support a local SQS server to allow the user to use this Docker Compose file without a real AWS account. - # - # sqs: - # image: softwaremill/elasticmq:latest - # healthcheck: - # # https://github.com/softwaremill/elasticmq/issues/776#issuecomment-1582527921 - # test: ["CMD-SHELL", "wget -q -S -O - 127.0.0.1:9324/?Action=ListQueues"] - # interval: 10s - # retries: 5 - # start_period: 5s - # ports: - # - 9324:9324 - # - 9325:9325 - # expose: - # - 9324 - # - 9325 + sqs: + image: softwaremill/elasticmq:latest + healthcheck: + # https://github.com/softwaremill/elasticmq/issues/776#issuecomment-1582527921 + test: ["CMD-SHELL", "wget -q -S -O - 127.0.0.1:9324/?Action=ListQueues"] + interval: 10s + retries: 5 + start_period: 5s + ports: + - 9324:9324 + - 9325:9325 + expose: + - 9324 + - 9325 # TODO Create a local CloudWatch endpoint to allow the customer to use this Docker Compose file without a real AWS account. # TODO Create a local CloudWatch Metrics endpoint to allow the customer to use this Docker Compose file without a real AWS account. diff --git a/images/airflow/2.8.0/python/mwaa/config/celery.py b/images/airflow/2.8.0/python/mwaa/config/celery.py index 588c658..c67a805 100644 --- a/images/airflow/2.8.0/python/mwaa/config/celery.py +++ b/images/airflow/2.8.0/python/mwaa/config/celery.py @@ -7,7 +7,7 @@ # Our import from mwaa.config.aws import get_aws_region -from mwaa.config.sqs import get_sqs_queue_name, get_sqs_queue_url +from mwaa.config.sqs import get_sqs_queue_name, get_sqs_queue_url, should_use_ssl def create_celery_config() -> dict[str, Any]: @@ -23,7 +23,7 @@ def create_celery_config() -> dict[str, Any]: "broker_transport_options": { **celery_config["broker_transport_options"], "predefined_queues": {get_sqs_queue_name(): {"url": get_sqs_queue_url()}}, - "is_secure": True, + "is_secure": should_use_ssl(), "region": get_aws_region(), }, } diff --git a/images/airflow/2.8.0/python/mwaa/config/sqs.py b/images/airflow/2.8.0/python/mwaa/config/sqs.py index 2d04223..926b7bf 100644 --- a/images/airflow/2.8.0/python/mwaa/config/sqs.py +++ b/images/airflow/2.8.0/python/mwaa/config/sqs.py @@ -38,7 +38,7 @@ def _change_protocol_to_sqs(url: str) -> str: ) -def _get_sqs_default_endpoint() -> str: +def get_sqs_default_endpoint() -> str: """ Retrieves the default SQS endpoint for the current AWS region. """ @@ -61,7 +61,7 @@ def get_sqs_endpoint() -> str: used. """ return _change_protocol_to_sqs( - os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT") or _get_sqs_default_endpoint() + os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT") or get_sqs_default_endpoint() ) @@ -108,3 +108,23 @@ def get_sqs_queue_name() -> str: Retrieves the name of the SQS queue specified for use with Celery. """ return _get_queue_name_from_url(get_sqs_queue_url()) + + +def should_create_queue() -> bool: + """ + Determine whether the SQS queue should be created or not. + + :return: True or False. + """ + return os.environ.get("MWAA__SQS__CREATE_QUEUE", "false").lower() == "true" + + +def should_use_ssl() -> bool: + """ + Determines whether to use SSL when communicating with SQS or not. This + configuration is expected to be true when connecting to AWS SQS, and false + when connecting to elasticmq. + + :return: True or False. + """ + return os.environ.get("MWAA__SQS__USE_SSL", "true").lower() == "true" diff --git a/images/airflow/2.8.0/python/mwaa/entrypoint.py b/images/airflow/2.8.0/python/mwaa/entrypoint.py index 843726a..1172630 100644 --- a/images/airflow/2.8.0/python/mwaa/entrypoint.py +++ b/images/airflow/2.8.0/python/mwaa/entrypoint.py @@ -15,12 +15,18 @@ # 3rd party imports +import boto3 +from botocore.exceptions import ClientError from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine # Our imports from mwaa.config.airflow import get_airflow_config from mwaa.config.database import get_db_connection_string +from mwaa.config.sqs import ( + get_sqs_queue_name, + should_create_queue, +) def abort(err_msg: str, exit_code: int = 1) -> None: @@ -129,6 +135,31 @@ def create_www_user(environ: dict[str, str]) -> None: raise RuntimeError(f"Failed to create user. Error: {response.stderr}") +@db_lock(1357) +def create_queue() -> None: + if not should_create_queue(): + return + queue_name = get_sqs_queue_name() + endpoint = os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT") + sqs = boto3.client("sqs", endpoint_url=endpoint) # type: ignore + try: + # Try to get the queue URL to check if it exists + sqs.get_queue_url(QueueName=queue_name)["QueueUrl"] + print(f"Queue {queue_name} already exists.") + except ClientError as e: + # If the queue does not exist, create it + if ( + e.response.get("Error", {}).get("Code") + == "AWS.SimpleQueueService.NonExistentQueue" + ): + response = sqs.create_queue(QueueName=queue_name) + queue_url = response["QueueUrl"] + print(f"Queue created: {queue_url}") + else: + # If there is a different error, raise it + raise e + + def install_user_requirements(environ: dict[str, str]) -> None: requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH") print(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}") @@ -195,6 +226,7 @@ def main() -> None: airflow_db_init(environ) create_www_user(environ) + create_queue() install_user_requirements(environ) # Export the environment variables to .bashrc and .bash_profile to enable diff --git a/images/airflow/2.8.0/requirements.txt b/images/airflow/2.8.0/requirements.txt index 1572cd7..5274e67 100644 --- a/images/airflow/2.8.0/requirements.txt +++ b/images/airflow/2.8.0/requirements.txt @@ -1,5 +1,13 @@ -apache-airflow-providers-amazon[aiobotocore]==8.13.0 -apache-airflow[celery,statsd]==2.8.0 +# This requirements file is used when creating a virtual environment for +# building and developing the Airflow image. It is not to be confused with +# the requirements of Airflow within the image. Still, they are largely similar +# apart from some additional requirements for type checking, e.g. boto3-stubs +# or similar stuff for aiding build and development. +--constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.11.txt +apache-airflow-providers-amazon[aiobotocore]==8.7.1 +apache-airflow[celery,statsd]==2.7.2 +boto3 +boto3-stubs[essential] celery[sqs] psycopg2 pycurl