Skip to content

Commit

Permalink
Make the Docker Compose setup work with elasticmq
Browse files Browse the repository at this point in the history
To make the setup work without having to have an actual SQS account, I
made the necessary changes to use a local SQS queue server served by
elasticmq.
  • Loading branch information
rafidka committed Feb 22, 2024
1 parent 85438ed commit 752b4bf
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 32 deletions.
2 changes: 1 addition & 1 deletion images/airflow/2.8.0/Dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.029839
# This file was generated on 2024-02-22 19:42:44.935774
#

FROM amazon-mwaa/airflow:2.8.0-base
Expand Down
2 changes: 1 addition & 1 deletion images/airflow/2.8.0/Dockerfiles/Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.021778
# This file was generated on 2024-02-22 19:42:44.927521
#

FROM amazon-mwaa/airflow:2.8.0-base
Expand Down
2 changes: 1 addition & 1 deletion images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.032499
# This file was generated on 2024-02-22 19:42:44.938417
#

FROM amazon-mwaa/airflow:2.8.0-base
Expand Down
2 changes: 1 addition & 1 deletion images/airflow/2.8.0/Dockerfiles/Dockerfile-explorer-dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.024518
# This file was generated on 2024-02-22 19:42:44.930305
#

FROM amazon-mwaa/airflow:2.8.0-base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.035092
# This file was generated on 2024-02-22 19:42:44.941098
#

FROM amazon-mwaa/airflow:2.8.0-base
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.027225
# This file was generated on 2024-02-22 19:42:44.933100
#

FROM amazon-mwaa/airflow:2.8.0-base
Expand Down
5 changes: 3 additions & 2 deletions images/airflow/2.8.0/Dockerfiles/Dockerfile.base
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
# the Jinja2-templated Dockerfile.j2 file, so you need to change that file
# instead.
#
# This file was generated on 2024-02-12 01:56:33.018473
# This file was generated on 2024-02-22 19:42:44.924226
#

FROM public.ecr.aws/amazonlinux/amazonlinux:2023

# Environment variables

# Temporarily downgrading to 2.7.2 to make it easier to test using it internally.
# Temporarily downgrading to 2.7.2 to make it easier to test the Docker image
# within Amazon MWAA since 2.7.2 is a version we support.
ENV AIRFLOW_VERSION=2.7.2
ENV AIRFLOW_AMAZON_PROVIDERS_VERSION=8.7.1

Expand Down
38 changes: 20 additions & 18 deletions images/airflow/2.8.0/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ version: "3.8"

x-airflow-common: &airflow-common
image: amazon-mwaa/airflow:2.8.0
container_name: mwaa-280-db
restart: always
environment:
# AWS credentials
Expand All @@ -23,7 +22,10 @@ x-airflow-common: &airflow-common
MWAA__DB__POSTGRES_DB: "airflow"

# SQS configuration
MWAA__SQS__QUEUE_URL: ${MWAA__SQS__QUEUE_URL}
MWAA__SQS__CREATE_QUEUE: True
MWAA__SQS__CUSTOM_ENDPOINT: http://sqs:9324
MWAA__SQS__QUEUE_URL: http://sqs:9324/000000000000/celery-queue
MWAA__SQS__USE_SSL: False

volumes:
- ./dags:/usr/local/airflow/dags
Expand All @@ -32,6 +34,8 @@ x-airflow-common: &airflow-common
depends_on: &airflow-common-depends-on
postgres:
condition: service_healthy
sqs:
condition: service_healthy

services:
postgres:
Expand All @@ -54,22 +58,20 @@ services:
expose:
- 5432

# TODO Support a local SQS server to allow the user to use this Docker Compose file without a real AWS account.
#
# sqs:
# image: softwaremill/elasticmq:latest
# healthcheck:
# # https://github.com/softwaremill/elasticmq/issues/776#issuecomment-1582527921
# test: ["CMD-SHELL", "wget -q -S -O - 127.0.0.1:9324/?Action=ListQueues"]
# interval: 10s
# retries: 5
# start_period: 5s
# ports:
# - 9324:9324
# - 9325:9325
# expose:
# - 9324
# - 9325
sqs:
image: softwaremill/elasticmq:latest
healthcheck:
# https://github.com/softwaremill/elasticmq/issues/776#issuecomment-1582527921
test: ["CMD-SHELL", "wget -q -S -O - 127.0.0.1:9324/?Action=ListQueues"]
interval: 10s
retries: 5
start_period: 5s
ports:
- 9324:9324
- 9325:9325
expose:
- 9324
- 9325

# TODO Create a local CloudWatch endpoint to allow the customer to use this Docker Compose file without a real AWS account.
# TODO Create a local CloudWatch Metrics endpoint to allow the customer to use this Docker Compose file without a real AWS account.
Expand Down
4 changes: 2 additions & 2 deletions images/airflow/2.8.0/python/mwaa/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

# Our import
from mwaa.config.aws import get_aws_region
from mwaa.config.sqs import get_sqs_queue_name, get_sqs_queue_url
from mwaa.config.sqs import get_sqs_queue_name, get_sqs_queue_url, should_use_ssl


def create_celery_config() -> dict[str, Any]:
Expand All @@ -23,7 +23,7 @@ def create_celery_config() -> dict[str, Any]:
"broker_transport_options": {
**celery_config["broker_transport_options"],
"predefined_queues": {get_sqs_queue_name(): {"url": get_sqs_queue_url()}},
"is_secure": True,
"is_secure": should_use_ssl(),
"region": get_aws_region(),
},
}
Expand Down
24 changes: 22 additions & 2 deletions images/airflow/2.8.0/python/mwaa/config/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _change_protocol_to_sqs(url: str) -> str:
)


def _get_sqs_default_endpoint() -> str:
def get_sqs_default_endpoint() -> str:
"""
Retrieves the default SQS endpoint for the current AWS region.
"""
Expand All @@ -61,7 +61,7 @@ def get_sqs_endpoint() -> str:
used.
"""
return _change_protocol_to_sqs(
os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT") or _get_sqs_default_endpoint()
os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT") or get_sqs_default_endpoint()
)


Expand Down Expand Up @@ -108,3 +108,23 @@ def get_sqs_queue_name() -> str:
Retrieves the name of the SQS queue specified for use with Celery.
"""
return _get_queue_name_from_url(get_sqs_queue_url())


def should_create_queue() -> bool:
"""
Determine whether the SQS queue should be created or not.
:return: True or False.
"""
return os.environ.get("MWAA__SQS__CREATE_QUEUE", "false").lower() == "true"


def should_use_ssl() -> bool:
"""
Determines whether to use SSL when communicating with SQS or not. This
configuration is expected to be true when connecting to AWS SQS, and false
when connecting to elasticmq.
:return: True or False.
"""
return os.environ.get("MWAA__SQS__USE_SSL", "true").lower() == "true"
32 changes: 32 additions & 0 deletions images/airflow/2.8.0/python/mwaa/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@


# 3rd party imports
import boto3
from botocore.exceptions import ClientError
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine

# Our imports
from mwaa.config.airflow import get_airflow_config
from mwaa.config.database import get_db_connection_string
from mwaa.config.sqs import (
get_sqs_queue_name,
should_create_queue,
)


def abort(err_msg: str, exit_code: int = 1) -> None:
Expand Down Expand Up @@ -129,6 +135,31 @@ def create_www_user(environ: dict[str, str]) -> None:
raise RuntimeError(f"Failed to create user. Error: {response.stderr}")


@db_lock(1357)
def create_queue() -> None:
if not should_create_queue():
return
queue_name = get_sqs_queue_name()
endpoint = os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT")
sqs = boto3.client("sqs", endpoint_url=endpoint) # type: ignore
try:
# Try to get the queue URL to check if it exists
sqs.get_queue_url(QueueName=queue_name)["QueueUrl"]
print(f"Queue {queue_name} already exists.")
except ClientError as e:
# If the queue does not exist, create it
if (
e.response.get("Error", {}).get("Code")
== "AWS.SimpleQueueService.NonExistentQueue"
):
response = sqs.create_queue(QueueName=queue_name)
queue_url = response["QueueUrl"]
print(f"Queue created: {queue_url}")
else:
# If there is a different error, raise it
raise e


def install_user_requirements(environ: dict[str, str]) -> None:
requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH")
print(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}")
Expand Down Expand Up @@ -195,6 +226,7 @@ def main() -> None:

airflow_db_init(environ)
create_www_user(environ)
create_queue()
install_user_requirements(environ)

# Export the environment variables to .bashrc and .bash_profile to enable
Expand Down
12 changes: 10 additions & 2 deletions images/airflow/2.8.0/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
apache-airflow-providers-amazon[aiobotocore]==8.13.0
apache-airflow[celery,statsd]==2.8.0
# This requirements file is used when creating a virtual environment for
# building and developing the Airflow image. It is not to be confused with
# the requirements of Airflow within the image. Still, they are largely similar
# apart from some additional requirements for type checking, e.g. boto3-stubs
# or similar stuff for aiding build and development.
--constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.11.txt
apache-airflow-providers-amazon[aiobotocore]==8.7.1
apache-airflow[celery,statsd]==2.7.2
boto3
boto3-stubs[essential]
celery[sqs]
psycopg2
pycurl
Expand Down

0 comments on commit 752b4bf

Please sign in to comment.