-
Notifications
You must be signed in to change notification settings - Fork 510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make sure checkins from crons work and keep working #3155
Closed
antonpirker
wants to merge
18
commits into
antonpirker/fix-celery-beat-refactoring
from
antonpirker/fix-crons-checkin-on-success
Closed
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
7007984
Make sure checkins from crons work and keep working
antonpirker 6768206
Merge branch 'master' into antonpirker/fix-crons-checkin-on-success
antonpirker cda3942
Added integration tests for Celery Beat
antonpirker 17a1f26
Merge branches 'antonpirker/fix-crons-checkin-on-success' and 'antonp…
antonpirker d0649d8
Removed conftest.py
antonpirker 3e85ea5
Starting redis in celery tests
antonpirker a1fb71a
One queue for each test suite
antonpirker 1b8dfb8
linting
antonpirker 7bf044c
There is of course also a error event in the envelopes
antonpirker 7dcc4bf
linting
antonpirker 07edd9b
Merge branch 'master' into antonpirker/fix-crons-checkin-on-success
antonpirker d51dcc6
Explanations
antonpirker f5a10e8
Merge branch 'antonpirker/fix-crons-checkin-on-success' of github.com…
antonpirker 7815a1b
Merge branch 'master' into antonpirker/fix-crons-checkin-on-success
antonpirker 7f44c36
Do not patch Celery multiple times
antonpirker e5fbc30
Merge branch 'master' into antonpirker/fix-crons-checkin-on-success
antonpirker ec5c3f6
Update sentry_sdk/integrations/celery/beat.py
antonpirker 3c05c65
Merge branch 'master' into antonpirker/fix-crons-checkin-on-success
antonpirker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,8 @@ jobs: | |
- uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
- name: Start Redis | ||
uses: supercharge/[email protected] | ||
- name: Setup Test Env | ||
run: | | ||
pip install coverage tox | ||
|
@@ -108,6 +110,8 @@ jobs: | |
- uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
- name: Start Redis | ||
uses: supercharge/[email protected] | ||
- name: Setup Test Env | ||
run: | | ||
pip install coverage tox | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,11 @@ | |
- uses: getsentry/action-clickhouse-in-ci@v1 | ||
{% endif %} | ||
|
||
{% if needs_redis %} | ||
- name: Start Redis | ||
uses: supercharge/[email protected] | ||
{% endif %} | ||
|
||
- name: Setup Test Env | ||
run: | | ||
pip install coverage tox | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import os | ||
import signal | ||
import tempfile | ||
import threading | ||
import time | ||
|
||
from celery.beat import Scheduler | ||
|
||
from sentry_sdk.utils import logger | ||
|
||
|
||
class ImmediateScheduler(Scheduler): | ||
""" | ||
A custom scheduler that starts tasks immediately after starting Celery beat. | ||
""" | ||
|
||
def setup_schedule(self): | ||
super().setup_schedule() | ||
for _, entry in self.schedule.items(): | ||
self.apply_entry(entry) | ||
|
||
def tick(self): | ||
# Override tick to prevent the normal schedule cycle | ||
return 1 | ||
|
||
|
||
def kill_beat(beat_pid_file, delay_seconds=1): | ||
""" | ||
Terminates Celery Beat after the given `delay_seconds`. | ||
""" | ||
logger.info("Starting Celery Beat killer...") | ||
time.sleep(delay_seconds) | ||
pid = int(open(beat_pid_file, "r").read()) | ||
logger.info("Terminating Celery Beat...") | ||
os.kill(pid, signal.SIGTERM) | ||
|
||
|
||
def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True): | ||
""" | ||
Run Celery Beat that immediately starts tasks. | ||
The Celery Beat instance is automatically terminated after `runtime_seconds`. | ||
""" | ||
logger.info("Starting Celery Beat...") | ||
pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid") | ||
|
||
t = threading.Thread( | ||
target=kill_beat, | ||
args=(pid_file,), | ||
kwargs={"delay_seconds": runtime_seconds}, | ||
) | ||
t.start() | ||
|
||
beat_instance = celery_app.Beat( | ||
loglevel=loglevel, | ||
quiet=quiet, | ||
pidfile=pid_file, | ||
) | ||
beat_instance.run() |
153 changes: 153 additions & 0 deletions
153
tests/integrations/celery/integration_tests/test_celery_beat_cron_monitoring.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
import os | ||
import pytest | ||
|
||
from celery.contrib.testing.worker import start_worker | ||
|
||
from sentry_sdk.utils import logger | ||
|
||
from tests.integrations.celery.integration_tests import run_beat | ||
|
||
|
||
REDIS_SERVER = "redis://127.0.0.1:6379" | ||
REDIS_DB = 15 | ||
|
||
|
||
@pytest.fixture() | ||
def celery_config(): | ||
return { | ||
"worker_concurrency": 1, | ||
"broker_url": f"{REDIS_SERVER}/{REDIS_DB}", | ||
"result_backend": f"{REDIS_SERVER}/{REDIS_DB}", | ||
"beat_scheduler": "tests.integrations.celery.integration_tests:ImmediateScheduler", | ||
"task_always_eager": False, | ||
"task_create_missing_queues": True, | ||
"task_default_queue": f"queue_{os.getpid()}", | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def celery_init(sentry_init, celery_config): | ||
""" | ||
Create a Sentry instrumented Celery app. | ||
""" | ||
from celery import Celery | ||
|
||
from sentry_sdk.integrations.celery import CeleryIntegration | ||
|
||
def inner(propagate_traces=True, monitor_beat_tasks=False, **kwargs): | ||
sentry_init( | ||
integrations=[ | ||
CeleryIntegration( | ||
propagate_traces=propagate_traces, | ||
monitor_beat_tasks=monitor_beat_tasks, | ||
) | ||
], | ||
**kwargs, | ||
) | ||
app = Celery("tasks") | ||
app.conf.update(celery_config) | ||
|
||
return app | ||
|
||
return inner | ||
|
||
|
||
@pytest.mark.forked | ||
def test_explanation(celery_init, capture_envelopes): | ||
""" | ||
This is a dummy test for explaining how to test using Celery Beat | ||
""" | ||
|
||
# First initialize a Celery app. | ||
# You can give the options of CeleryIntegrations | ||
# and the options for `sentry_dks.init` as keyword arguments. | ||
# See the celery_init fixture for details. | ||
app = celery_init( | ||
monitor_beat_tasks=True, | ||
) | ||
|
||
# Capture envelopes. | ||
envelopes = capture_envelopes() | ||
|
||
# Define the task you want to run | ||
@app.task | ||
def test_task(): | ||
logger.info("Running test_task") | ||
|
||
# Add the task to the beat schedule | ||
app.add_periodic_task(60.0, test_task.s(), name="success_from_beat") | ||
|
||
# Start a Celery worker | ||
with start_worker(app, perform_ping_check=False): | ||
# And start a Celery Beat instance | ||
# This Celery Beat will start the task above immediately | ||
# after start for the first time | ||
# By default Celery Beat is terminated after 1 second. | ||
# See `run_beat` function on how to change this. | ||
run_beat(app) | ||
|
||
# After the Celery Beat is terminated, you can check the envelopes | ||
assert len(envelopes) >= 0 | ||
|
||
|
||
@pytest.mark.forked | ||
def test_beat_task_crons_success(celery_init, capture_envelopes): | ||
app = celery_init( | ||
monitor_beat_tasks=True, | ||
) | ||
envelopes = capture_envelopes() | ||
|
||
@app.task | ||
def test_task(): | ||
logger.info("Running test_task") | ||
|
||
app.add_periodic_task(60.0, test_task.s(), name="success_from_beat") | ||
|
||
with start_worker(app, perform_ping_check=False): | ||
run_beat(app) | ||
|
||
assert len(envelopes) == 2 | ||
(envelop_in_progress, envelope_ok) = envelopes | ||
|
||
assert envelop_in_progress.items[0].headers["type"] == "check_in" | ||
check_in = envelop_in_progress.items[0].payload.json | ||
assert check_in["type"] == "check_in" | ||
assert check_in["monitor_slug"] == "success_from_beat" | ||
assert check_in["status"] == "in_progress" | ||
|
||
assert envelope_ok.items[0].headers["type"] == "check_in" | ||
check_in = envelope_ok.items[0].payload.json | ||
assert check_in["type"] == "check_in" | ||
assert check_in["monitor_slug"] == "success_from_beat" | ||
assert check_in["status"] == "ok" | ||
|
||
|
||
@pytest.mark.forked | ||
def test_beat_task_crons_error(celery_init, capture_envelopes): | ||
app = celery_init( | ||
monitor_beat_tasks=True, | ||
) | ||
envelopes = capture_envelopes() | ||
|
||
@app.task | ||
def test_task(): | ||
logger.info("Running test_task") | ||
1 / 0 | ||
|
||
app.add_periodic_task(60.0, test_task.s(), name="failure_from_beat") | ||
|
||
with start_worker(app, perform_ping_check=False): | ||
run_beat(app) | ||
|
||
envelop_in_progress = envelopes[0] | ||
envelope_error = envelopes[-1] | ||
|
||
check_in = envelop_in_progress.items[0].payload.json | ||
assert check_in["type"] == "check_in" | ||
assert check_in["monitor_slug"] == "failure_from_beat" | ||
assert check_in["status"] == "in_progress" | ||
|
||
check_in = envelope_error.items[0].payload.json | ||
assert check_in["type"] == "check_in" | ||
assert check_in["monitor_slug"] == "failure_from_beat" | ||
assert check_in["status"] == "error" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this
init_celery
to make sure we can also setmonitor_beat_tasks
here.