Skip to content

Commit

Permalink
Add support for SQS to our Celery configuration (PP-1397) (#1914)
Browse files Browse the repository at this point in the history
* Add support for SQS to our Celery configuration.

* Add additional test, make sure unknown options get merged into broker options.
  • Loading branch information
jonathangreen authored Jun 21, 2024
1 parent 35b8f8c commit b66ee21
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 5 deletions.
58 changes: 56 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ aws-xray-sdk = "~2.14"
"backports.strenum" = {version = "^1.3.1", python = "<3.11"}
bcrypt = "^4.0.1"
boto3 = "^1.28"
celery = {extras = ["redis", "tblib"], version = "^5.3.6"}
celery = {extras = ["redis", "tblib", 'sqs'], version = "^5.3.6"}
certifi = "*"
click = "^8.1.3"
contextlib2 = "21.6.0"
Expand Down
56 changes: 54 additions & 2 deletions src/palace/manager/service/celery/configuration.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
import os
from typing import Any

from pydantic import RedisDsn
from pydantic import AnyUrl, Extra
from pydantic.env_settings import BaseSettings, SettingsSourceCallable

from palace.manager.service.configuration.service_configuration import (
ServiceConfiguration,
)


class CeleryBrokerUrl(AnyUrl):
host_required: bool = False
allowed_schemes = {"redis", "sqs"}


class CeleryConfiguration(ServiceConfiguration):
# All the settings here are named following the Celery configuration, so we can
# easily pass them into the Celery app. You can find more details about any of
# these settings in the Celery documentation.
# https://docs.celeryq.dev/en/stable/userguide/configuration.html
broker_url: RedisDsn
broker_url: CeleryBrokerUrl
broker_connection_retry_on_startup: bool = True

# Redis broker options
broker_transport_options_global_keyprefix: str = "palace"
broker_transport_options_queue_order_strategy: str = "priority"

# SQS broker options
broker_transport_options_region: str = "us-west-2"
broker_transport_options_queue_name_prefix: str = "palace-"

task_acks_late: bool = True
task_reject_on_worker_lost: bool = True
task_remote_tracebacks: bool = True
Expand All @@ -43,6 +56,27 @@ class CeleryConfiguration(ServiceConfiguration):

class Config:
env_prefix = "PALACE_CELERY_"
extra = Extra.allow

# See `pydantic` documentation on customizing sources.
# https://docs.pydantic.dev/1.10/usage/settings/#adding-sources
@classmethod
def customise_sources(
cls,
init_settings: SettingsSourceCallable,
env_settings: SettingsSourceCallable,
file_secret_settings: SettingsSourceCallable,
) -> tuple[SettingsSourceCallable, ...]:
# We return an additional function that will parse the environment
# variables and extract any that are not part of the settings model,
# so that we can set additional configuration options for Celery at
# deployment time if needed.
return (
init_settings,
env_settings,
file_secret_settings,
additional_fields_from_env,
)

def dict(self, *, merge_options: bool = True, **kwargs: Any) -> dict[str, Any]:
results = super().dict(**kwargs)
Expand All @@ -57,3 +91,21 @@ def dict(self, *, merge_options: bool = True, **kwargs: Any) -> dict[str, Any]:
] = value
results["broker_transport_options"] = broker_transport_options
return results


def additional_fields_from_env(settings: BaseSettings) -> dict[str, Any]:
"""
This function will extract any environment variables that start with
the settings model's env_prefix but are not part of the settings model.
This allows us to set additional configuration options via environment
variables at deployment time.
"""
additional_fields = {}
env_prefix = settings.__config__.env_prefix or ""
for key, value in os.environ.items():
if key.startswith(env_prefix):
field_name = key.replace(env_prefix, "").lower()
if field_name not in settings.__fields__:
additional_fields[field_name] = value
return additional_fields
14 changes: 14 additions & 0 deletions tests/manager/service/celery/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def test_dict_merge(
monkeypatch.setenv(
"PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_QUEUE_ORDER_STRATEGY", "y"
)
monkeypatch.setenv("PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_OTHER_OPTION", "z")

config = celery_configuration()
result = config.dict()
Expand All @@ -48,5 +49,18 @@ def test_dict_merge(
options = result["broker_transport_options"]
assert options.get("global_keyprefix") == "x"
assert options.get("queue_order_strategy") == "y"
assert options.get("other_option") == "z"
assert "broker_transport_options_global_keyprefix" not in result
assert "broker_transport_options_queue_order_strategy" not in result
assert "broker_transport_options_other_option" not in result

def test_additional_options(
self, celery_configuration: CeleryConfFixture, monkeypatch: pytest.MonkeyPatch
):
monkeypatch.setenv("PALACE_CELERY_TEST", "test")

config = celery_configuration()
result = config.dict()
assert "broker_url" in result
assert result.get("test") == "test"
assert "test" not in config.__fields__

0 comments on commit b66ee21

Please sign in to comment.