From b66ee21d8229c69a098e7d46fe0d6efcd4ea9e9b Mon Sep 17 00:00:00 2001 From: Jonathan Green Date: Fri, 21 Jun 2024 10:16:41 -0300 Subject: [PATCH] Add support for SQS to our Celery configuration (PP-1397) (#1914) * Add support for SQS to our Celery configuration. * Add additional test, make sure unknown options get merged into broker options. --- poetry.lock | 58 ++++++++++++++++++- pyproject.toml | 2 +- .../manager/service/celery/configuration.py | 56 +++++++++++++++++- .../service/celery/test_configuration.py | 14 +++++ 4 files changed, 125 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index d13d8dda67..9db031a801 100644 --- a/poetry.lock +++ b/poetry.lock @@ -700,15 +700,21 @@ files = [ [package.dependencies] billiard = ">=4.2.0,<5.0" +boto3 = {version = ">=1.26.143", optional = true, markers = "extra == \"sqs\""} click = ">=8.1.2,<9.0" click-didyoumean = ">=0.3.0" click-plugins = ">=1.1.1" click-repl = ">=0.2.0" -kombu = ">=5.3.4,<6.0" +kombu = [ + {version = ">=5.3.4,<6.0"}, + {version = ">=5.3.4", extras = ["sqs"], optional = true, markers = "extra == \"sqs\""}, +] +pycurl = {version = ">=7.43.0.5", optional = true, markers = "sys_platform != \"win32\" and platform_python_implementation == \"CPython\" and extra == \"sqs\""} python-dateutil = ">=2.8.2" redis = {version = ">=4.5.2,<4.5.5 || >4.5.5,<6.0.0", optional = true, markers = "extra == \"redis\""} tblib = {version = ">=1.5.0", optional = true, markers = "python_version >= \"3.8.0\" and extra == \"tblib\""} tzdata = ">=2022.7" +urllib3 = {version = ">=1.26.16", optional = true, markers = "extra == \"sqs\""} vine = ">=5.1.0,<6.0" [package.extras] @@ -2172,6 +2178,9 @@ files = [ [package.dependencies] amqp = ">=5.1.1,<6.0.0" +boto3 = {version = ">=1.26.143", optional = true, markers = "extra == \"sqs\""} +pycurl = {version = ">=7.43.0.5", optional = true, markers = "sys_platform != \"win32\" and platform_python_implementation == \"CPython\" and extra == \"sqs\""} +urllib3 = {version = ">=1.26.16", optional = true, markers = "extra == \"sqs\""} vine = "*" [package.extras] @@ -3314,6 +3323,51 @@ files = [ {file = "pycryptodome-3.20.0.tar.gz", hash = "sha256:09609209ed7de61c2b560cc5c8c4fbf892f8b15b1faf7e4cbffac97db1fffda7"}, ] +[[package]] +name = "pycurl" +version = "7.45.3" +description = "PycURL -- A Python Interface To The cURL library" +optional = false +python-versions = ">=3.5" +files = [ + {file = "pycurl-7.45.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:86f66d334deaaab20a576fb785587566081407adc703318203fe26e43277ef12"}, + {file = "pycurl-7.45.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:205983e87d6aa0b6e93ec7320060de44efaa905ecc5d13f70cbe38c65684c5c4"}, + {file = "pycurl-7.45.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fbd4a6b8654b779089c5a44af1c65c1419c2cd60718780df6d8f354eb35d6d55"}, + {file = "pycurl-7.45.3-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:5ebc6a0ac60c371a9efaf7d55dec5820f76fdafb43a3be1e390011339dc329ae"}, + {file = "pycurl-7.45.3-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:2facab1c35600088cb82b5b093bd700bfbd1e3191deab24f7d1803d9dc5b76fc"}, + {file = "pycurl-7.45.3-cp310-cp310-win32.whl", hash = "sha256:7cfca02d70579853041063e53ca713d31161b8831b98d4f68c3554dc0448beec"}, + {file = "pycurl-7.45.3-cp310-cp310-win_amd64.whl", hash = "sha256:8451e8475051f16eb4776380384699cb8ddd10ea8410bcbfaee5a6fc4c046de6"}, + {file = "pycurl-7.45.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:1610cc45b5bc8b39bc18b981d0473e59ef41226ee467eaa8fbfc7276603ef5af"}, + {file = "pycurl-7.45.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c854885398410fa6e88fc29f7a420a3c13b88bae9b4e10a804437b582e24f58b"}, + {file = "pycurl-7.45.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:921c9db0c3128481954f625b3b1bc10c730100aa944d54643528f716676439ee"}, + {file = "pycurl-7.45.3-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:483f3aa5d1bc8cff5657ad96f68e1d89281f971a7b6aa93408a31e3199981ea9"}, + {file = "pycurl-7.45.3-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:1e0d32d6ed3a7ba13dbbd3a6fb50ca76c40c70e6bc6fe347f90677478d3422c7"}, + {file = "pycurl-7.45.3-cp311-cp311-win32.whl", hash = "sha256:beaaa4450e23d41dd0c2f2f47a4f8a171210271543550c2c556090c7eeea88f5"}, + {file = "pycurl-7.45.3-cp311-cp311-win_amd64.whl", hash = "sha256:dd33fd9de8907a6275c70113124aeb7eea672c1324f5d5423f203738b341697d"}, + {file = "pycurl-7.45.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:0c41a172d5e8a5cdd8328cc8134f47b2a57960ac677f7cda8520eaa9fbe7d990"}, + {file = "pycurl-7.45.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:13006b62c157bb4483c58e1abdced6df723c9399255a4f5f6bb7f8e425106679"}, + {file = "pycurl-7.45.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:27f4c5c20c86a9a823677316724306fb1ce3b25ec568efd52026dc6c563e5b29"}, + {file = "pycurl-7.45.3-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:c2c246bc29e8762ff4c8a833ac5b4da4c797d16ab138286e8aec9b0c0a0da2d4"}, + {file = "pycurl-7.45.3-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:3d07c5daef2d0d85949e32ec254ee44232bb57febb0634194379dd14d1ff4f87"}, + {file = "pycurl-7.45.3-cp312-cp312-win32.whl", hash = "sha256:9f7afe5ef0e4750ac4515baebc251ee94aaefe5de6e2e8a24668473128d69904"}, + {file = "pycurl-7.45.3-cp312-cp312-win_amd64.whl", hash = "sha256:3648ed9a57a6b704673faeab3dc64d1469cc69f2bc1ed8227ffa0f84e147c500"}, + {file = "pycurl-7.45.3-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:c0915ea139f66a289edc4f9de10cb45078af1bb950491c5612969864236a2e7e"}, + {file = "pycurl-7.45.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:43c5e61a58783ddf78ef84949f6bb6e52e092a13ec67678e9a9e21071ecf5b80"}, + {file = "pycurl-7.45.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:bf613844a1647fe3d2bba1f5c9c96a62a85280123a57a8a0c8d2f37d518bc10a"}, + {file = "pycurl-7.45.3-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:936afd9c5ff7fe7457065e878a279811787778f472f9a4e8c5df79e7728358e2"}, + {file = "pycurl-7.45.3-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:dbf816a6d0cb71e7fd06609246bbea4eaf100649d9decf49e4eb329594f70be7"}, + {file = "pycurl-7.45.3-cp38-cp38-win32.whl", hash = "sha256:2c8a2ce568193f9f84763717d8961cec0db4ec1aa08c6bcf4d90da5eb72bec86"}, + {file = "pycurl-7.45.3-cp38-cp38-win_amd64.whl", hash = "sha256:80ac7c17e69ca6b76ccccb4255f7c29a2a36e5b69eb10c2adba82135d43afe8c"}, + {file = "pycurl-7.45.3-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:fa7751b614d9aa82d7a0f49ca90924c29c6cedf85a2f8687fb6a772dbfe48711"}, + {file = "pycurl-7.45.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b129e9ee07f80b4af957607917af46ab517b0c4e746692f6d9e50e973edba8d8"}, + {file = "pycurl-7.45.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a0f920582b8713ca87d5a288a7532607bc4454275d733fc880650d602dbe3c67"}, + {file = "pycurl-7.45.3-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:c7c13e4268550cde14a6f4743cc8bd8c035d4cd36514d58eff70276d68954b6f"}, + {file = "pycurl-7.45.3-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:0f0e1251a608ffd75fc502f4014442e554c67d3d7a1b0a839c35efb6ad2f8bf8"}, + {file = "pycurl-7.45.3-cp39-cp39-win32.whl", hash = "sha256:51a40a56c58e63dac6145829f9e9bd66e5867a9f0741bcb9ffefab619851d44f"}, + {file = "pycurl-7.45.3-cp39-cp39-win_amd64.whl", hash = "sha256:e08a06802c8c8a9d04cf3319f9230ec09062c55d2550bd48f8ada1df1431adcf"}, + {file = "pycurl-7.45.3.tar.gz", hash = "sha256:8c2471af9079ad798e1645ec0b0d3d4223db687379d17dd36a70637449f81d6b"}, +] + [[package]] name = "pydantic" version = "1.10.17" @@ -5060,4 +5114,4 @@ lxml = ">=3.8" [metadata] lock-version = "2.0" python-versions = ">=3.10,<4" -content-hash = "21a985a3a04a73e7c5d2597e314d4c714540f2b74be9fb29366ebf64ce7f89ef" +content-hash = "006150fca1ace2a66cff9b7235ded77eeb0b230799d72bbe0b3995274b74e14e" diff --git a/pyproject.toml b/pyproject.toml index 63b0770a64..6839819f51 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -222,7 +222,7 @@ aws-xray-sdk = "~2.14" "backports.strenum" = {version = "^1.3.1", python = "<3.11"} bcrypt = "^4.0.1" boto3 = "^1.28" -celery = {extras = ["redis", "tblib"], version = "^5.3.6"} +celery = {extras = ["redis", "tblib", 'sqs'], version = "^5.3.6"} certifi = "*" click = "^8.1.3" contextlib2 = "21.6.0" diff --git a/src/palace/manager/service/celery/configuration.py b/src/palace/manager/service/celery/configuration.py index fe686d5bd8..ce3f05283c 100644 --- a/src/palace/manager/service/celery/configuration.py +++ b/src/palace/manager/service/celery/configuration.py @@ -1,22 +1,35 @@ +import os from typing import Any -from pydantic import RedisDsn +from pydantic import AnyUrl, Extra +from pydantic.env_settings import BaseSettings, SettingsSourceCallable from palace.manager.service.configuration.service_configuration import ( ServiceConfiguration, ) +class CeleryBrokerUrl(AnyUrl): + host_required: bool = False + allowed_schemes = {"redis", "sqs"} + + class CeleryConfiguration(ServiceConfiguration): # All the settings here are named following the Celery configuration, so we can # easily pass them into the Celery app. You can find more details about any of # these settings in the Celery documentation. # https://docs.celeryq.dev/en/stable/userguide/configuration.html - broker_url: RedisDsn + broker_url: CeleryBrokerUrl broker_connection_retry_on_startup: bool = True + + # Redis broker options broker_transport_options_global_keyprefix: str = "palace" broker_transport_options_queue_order_strategy: str = "priority" + # SQS broker options + broker_transport_options_region: str = "us-west-2" + broker_transport_options_queue_name_prefix: str = "palace-" + task_acks_late: bool = True task_reject_on_worker_lost: bool = True task_remote_tracebacks: bool = True @@ -43,6 +56,27 @@ class CeleryConfiguration(ServiceConfiguration): class Config: env_prefix = "PALACE_CELERY_" + extra = Extra.allow + + # See `pydantic` documentation on customizing sources. + # https://docs.pydantic.dev/1.10/usage/settings/#adding-sources + @classmethod + def customise_sources( + cls, + init_settings: SettingsSourceCallable, + env_settings: SettingsSourceCallable, + file_secret_settings: SettingsSourceCallable, + ) -> tuple[SettingsSourceCallable, ...]: + # We return an additional function that will parse the environment + # variables and extract any that are not part of the settings model, + # so that we can set additional configuration options for Celery at + # deployment time if needed. + return ( + init_settings, + env_settings, + file_secret_settings, + additional_fields_from_env, + ) def dict(self, *, merge_options: bool = True, **kwargs: Any) -> dict[str, Any]: results = super().dict(**kwargs) @@ -57,3 +91,21 @@ def dict(self, *, merge_options: bool = True, **kwargs: Any) -> dict[str, Any]: ] = value results["broker_transport_options"] = broker_transport_options return results + + +def additional_fields_from_env(settings: BaseSettings) -> dict[str, Any]: + """ + This function will extract any environment variables that start with + the settings model's env_prefix but are not part of the settings model. + + This allows us to set additional configuration options via environment + variables at deployment time. + """ + additional_fields = {} + env_prefix = settings.__config__.env_prefix or "" + for key, value in os.environ.items(): + if key.startswith(env_prefix): + field_name = key.replace(env_prefix, "").lower() + if field_name not in settings.__fields__: + additional_fields[field_name] = value + return additional_fields diff --git a/tests/manager/service/celery/test_configuration.py b/tests/manager/service/celery/test_configuration.py index f0015c3e20..7f4168c552 100644 --- a/tests/manager/service/celery/test_configuration.py +++ b/tests/manager/service/celery/test_configuration.py @@ -40,6 +40,7 @@ def test_dict_merge( monkeypatch.setenv( "PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_QUEUE_ORDER_STRATEGY", "y" ) + monkeypatch.setenv("PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_OTHER_OPTION", "z") config = celery_configuration() result = config.dict() @@ -48,5 +49,18 @@ def test_dict_merge( options = result["broker_transport_options"] assert options.get("global_keyprefix") == "x" assert options.get("queue_order_strategy") == "y" + assert options.get("other_option") == "z" assert "broker_transport_options_global_keyprefix" not in result assert "broker_transport_options_queue_order_strategy" not in result + assert "broker_transport_options_other_option" not in result + + def test_additional_options( + self, celery_configuration: CeleryConfFixture, monkeypatch: pytest.MonkeyPatch + ): + monkeypatch.setenv("PALACE_CELERY_TEST", "test") + + config = celery_configuration() + result = config.dict() + assert "broker_url" in result + assert result.get("test") == "test" + assert "test" not in config.__fields__