Skip to content

Commit

Permalink
feat(tempest): Add cron job and tasks (#82454)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-wilfert authored and andrewshie-sentry committed Jan 22, 2025
1 parent 671bc36 commit af30cd1
Show file tree
Hide file tree
Showing 5 changed files with 450 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.unmerge",
"sentry.tasks.update_user_reports",
"sentry.tasks.user_report",
"sentry.tempest.tasks",
"sentry.profiles.task",
"sentry.release_health.tasks",
"sentry.rules.processing.delayed_processing",
Expand Down Expand Up @@ -951,7 +952,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"dynamicsampling",
routing_key="dynamicsampling",
),
Queue("tempest", routing_key="tempest"),
Queue("incidents", routing_key="incidents"),
Queue("incident_snapshots", routing_key="incident_snapshots"),
Queue("incidents", routing_key="incidents"),
Expand All @@ -972,6 +972,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
Queue("sleep", routing_key="sleep"),
Queue("stats", routing_key="stats"),
Queue("subscriptions", routing_key="subscriptions"),
Queue("tempest", routing_key="tempest"),
Queue("unmerge", routing_key="unmerge"),
Queue("update", routing_key="update"),
Queue("uptime", routing_key="uptime"),
Expand Down Expand Up @@ -1188,6 +1189,12 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"schedule": crontab(minute="*/10"),
"options": {"expires": 10 * 60},
},
"poll_tempest": {
"task": "sentry.tempest.tasks.poll_tempest",
# Run every 5 minute
"schedule": crontab(minute="*/5"),
"options": {"expires": 5 * 60},
},
"transaction-name-clusterer": {
"task": "sentry.ingest.transaction_clusterer.tasks.spawn_clusterers",
# Run every 1 hour at minute 17
Expand Down Expand Up @@ -3162,6 +3169,8 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
# This is the URL to the profiling service
SENTRY_VROOM = os.getenv("VROOM", "http://127.0.0.1:8085")

SENTRY_TEMPEST_URL = os.getenv("TEMPEST", "http://127.0.0.1:9130")

SENTRY_REPLAYS_SERVICE_URL = "http://localhost:8090"


Expand Down
2 changes: 2 additions & 0 deletions src/sentry/models/projectkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class UseCase(enum.Enum):
PROFILING = "profiling"
""" An internal project key for submitting escalating issues metrics."""
ESCALATING_ISSUES = "escalating_issues"
""" An internal project key for submitting events from tempest."""
TEMPEST = "tempest"


@region_silo_model
Expand Down
3 changes: 3 additions & 0 deletions src/sentry/tempest/endpoints/tempest_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sentry.tempest.models import TempestCredentials
from sentry.tempest.permissions import TempestCredentialsPermission
from sentry.tempest.serializers import DRFTempestCredentialsSerializer, TempestCredentialsSerializer
from sentry.tempest.tasks import fetch_latest_item_id
from sentry.tempest.utils import has_tempest_access


Expand Down Expand Up @@ -47,6 +48,8 @@ def post(self, request: Request, project: Project) -> Response:
serializer.is_valid(raise_exception=True)
try:
credentials = serializer.save(created_by_id=request.user.id, project=project)
# Make initial call to determine the latest item ID
fetch_latest_item_id.delay(credentials.id)
except IntegrityError:
return Response(
{"detail": "A credential with this client ID already exists."}, status=400
Expand Down
229 changes: 229 additions & 0 deletions src/sentry/tempest/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import logging

from django.conf import settings
from requests import Response

from sentry import http
from sentry.models.projectkey import ProjectKey, UseCase
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.tasks.relay import schedule_invalidate_project_config
from sentry.tempest.models import MessageType, TempestCredentials

POLL_LIMIT = 348 # 348 every 5 min ~ 100k a day


logger = logging.getLogger(__name__)


@instrumented_task(
name="sentry.tempest.tasks.poll_tempest",
queue="tempest",
silo_mode=SiloMode.REGION,
soft_time_limit=4 * 60,
time_limit=4 * 60 + 5,
)
def poll_tempest(**kwargs):
# FIXME: Once we have more traffic this needs to be done smarter.
for credentials in TempestCredentials.objects.all():
if credentials.latest_fetched_item_id is None:
fetch_latest_item_id.delay(credentials.id)
else:
poll_tempest_crashes.delay(credentials.id)


@instrumented_task(
name="sentry.tempest.tasks.fetch_latest_item_id",
queue="tempest",
silo_mode=SiloMode.REGION,
soft_time_limit=1 * 60,
time_limit=1 * 60 + 5,
)
def fetch_latest_item_id(credentials_id: int) -> None:
# FIXME: Try catch this later
credentials = TempestCredentials.objects.select_related("project").get(id=credentials_id)
project_id = credentials.project.id
org_id = credentials.project.organization_id
client_id = credentials.client_id

try:
response = fetch_latest_id_from_tempest(
org_id=org_id,
project_id=project_id,
client_id=client_id,
client_secret=credentials.client_secret,
)
result = response.json()

if "latest_id" in result:
credentials.latest_fetched_item_id = result["latest_id"]
credentials.message = ""
credentials.save(update_fields=["message", "latest_fetched_item_id"])
return
elif "error" in result:
if result["error"]["type"] == "Invalid credentials":
credentials.message = "Seems like the provided credentials are invalid"
credentials.message_type = MessageType.ERROR
credentials.save(update_fields=["message", "message_type"])

logger.info(
"Invalid credentials",
extra={
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"status_code": response.status_code,
"response_text": result,
},
)
return
elif result["error"]["type"] == "IP address not allow-listed":
credentials.message = "Seems like our IP is not allow-listed"
credentials.message_type = MessageType.ERROR
credentials.save(update_fields=["message", "message_type"])

logger.info(
"IP address not allow-listed",
extra={
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"status_code": response.status_code,
"response_text": result,
},
)
return

# Default in case things go wrong
logger.info(
"Fetching the latest item id failed.",
extra={
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"status_code": response.status_code,
"response_text": result,
},
)

except Exception as e:
logger.info(
"Fetching the latest item id failed.",
extra={
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"error": str(e),
},
)


@instrumented_task(
name="sentry.tempest.tasks.poll_tempest_crashes",
queue="tempest",
silo_mode=SiloMode.REGION,
soft_time_limit=4 * 60,
time_limit=4 * 60 + 5,
)
def poll_tempest_crashes(credentials_id: int) -> None:
credentials = TempestCredentials.objects.select_related("project").get(id=credentials_id)
project_id = credentials.project.id
org_id = credentials.project.organization_id
client_id = credentials.client_id

try:
if credentials.latest_fetched_item_id is not None:
# This should generate/fetch a dsn explicitly for using with Tempest.
project_key, created = ProjectKey.objects.get_or_create(
use_case=UseCase.TEMPEST, project=credentials.project
)
dsn = project_key.get_dsn()
if created:
schedule_invalidate_project_config(
project_id=project_id, trigger="tempest:poll_tempest_crashes"
)

# Check if we should attach screenshots (opt-in feature)
attach_screenshot = credentials.project.get_option("sentry:tempest_fetch_screenshots")

response = fetch_items_from_tempest(
org_id=org_id,
project_id=project_id,
client_id=client_id,
client_secret=credentials.client_secret,
dsn=dsn,
offset=int(credentials.latest_fetched_item_id),
attach_screenshot=attach_screenshot,
)
else:
raise ValueError(
f"Unexpected None latest_fetched_item_id for credentials {credentials_id}. "
"This should never happen as poll_tempest_crashes should only be called "
"when latest_fetched_item_id is set."
)

result = response.json()
credentials.latest_fetched_item_id = result["latest_id"]
credentials.save(update_fields=["latest_fetched_item_id"])
except Exception as e:
logger.info(
"Fetching the crashes failed.",
extra={
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"latest_id": credentials.latest_fetched_item_id,
"error": str(e),
},
)


def fetch_latest_id_from_tempest(
org_id: int, project_id: int, client_id: str, client_secret: str
) -> Response:
payload = {
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"client_secret": client_secret,
}

response = http.safe_urlopen(
url=settings.SENTRY_TEMPEST_URL + "/latest-id",
method="POST",
headers={"Content-Type": "application/json"},
json=payload,
)
return response


def fetch_items_from_tempest(
org_id: int,
project_id: int,
client_id: str,
client_secret: str,
dsn: str,
offset: int,
limit: int = POLL_LIMIT,
attach_screenshot: bool = False,
time_out: int = 120,
) -> Response:
payload = {
"org_id": org_id,
"project_id": project_id,
"client_id": client_id,
"client_secret": client_secret,
"dsn": dsn,
"offset": offset,
"limit": limit,
"attach_screenshot": attach_screenshot,
}

response = http.safe_urlopen(
url=settings.SENTRY_TEMPEST_URL + "/crashes",
method="POST",
headers={"Content-Type": "application/json"},
json=payload,
timeout=time_out,
)
return response
Loading

0 comments on commit af30cd1

Please sign in to comment.