Skip to content

Commit

Permalink
fix(ta): handle random task retry better (#917)
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-sentry authored Dec 3, 2024
1 parent 1d68d39 commit 5676738
Showing 1 changed file with 80 additions and 87 deletions.
167 changes: 80 additions & 87 deletions tasks/test_results_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import zlib
from dataclasses import dataclass
from datetime import date, datetime
from typing import List

import sentry_sdk
from shared.celery_config import test_results_processor_task_name
Expand Down Expand Up @@ -79,36 +78,38 @@ def run_impl(
repoid = int(repoid)

results = []
upload_list = []

repo_flakes = (
db_session.query(Flake.testid)
.filter(Flake.repoid == repoid, Flake.end_date.is_(None))
.all()
)
flaky_test_set = {flake.testid for flake in repo_flakes}
repository = (
db_session.query(Repository)
.filter(Repository.repoid == int(repoid))
.first()
)

should_delete_archive = self.should_delete_archive(commit_yaml)
archive_service = ArchiveService(repository)

# process each report session's test information
for arguments in arguments_list:
upload = (
db_session.query(Upload).filter_by(id_=arguments["upload_id"]).first()
)
result = self.process_individual_upload(
db_session, repoid, commitid, upload, flaky_test_set
db_session,
archive_service,
repository,
commitid,
upload,
flaky_test_set,
should_delete_archive,
)

results.append(result)
upload_list.append(upload)

if self.should_delete_archive(commit_yaml):
repository = (
db_session.query(Repository)
.filter(Repository.repoid == int(repoid))
.first()
)
self.delete_archive(
commitid, repository, commit_yaml, uploads_to_delete=upload_list
)

return results

Expand Down Expand Up @@ -329,60 +330,21 @@ def create_daily_total():
log.info("Inserted test instances to database", extra=dict(upload_id=upload_id))

def process_individual_upload(
self, db_session, repoid, commitid, upload_obj: Upload, flaky_test_set: set[str]
self,
db_session,
archive_service: ArchiveService,
repository: Repository,
commitid,
upload: Upload,
flaky_test_set: set[str],
should_delete_archive: bool,
):
upload_id = upload_obj.id
log.info("Processing individual upload", extra=dict(upload_id=upload_id))
parsing_results = self.process_individual_arg(
db_session, upload_obj, upload_obj.report.commit.repository
)

if all(len(result.testruns) == 0 for result in parsing_results):
log.error(
"No test result files were successfully parsed for this upload",
extra=dict(upload_id=upload_id),
)
return {"successful": False}

self._bulk_write_tests_to_db(
db_session,
repoid,
commitid,
upload_id,
upload_obj.report.commit.branch,
parsing_results,
flaky_test_set,
upload_obj.flag_names,
)
log.info(
"Finished processing individual upload", extra=dict(upload_id=upload_id)
)

return {"successful": True}

def rewrite_readable(
self, network: list[str] | None, report_contents: list[ReadableFile]
) -> bytes:
buffer = b""
if network is not None:
for file in network:
buffer += f"{file}\n".encode("utf-8")
buffer += b"<<<<<< network\n\n"
for report_content in report_contents:
buffer += f"# path={report_content.path}\n".encode("utf-8")
buffer += report_content.contents
buffer += b"\n<<<<<< EOF\n\n"
return buffer
upload_id = upload.id

@sentry_sdk.trace
def process_individual_arg(
self, db_session: Session, upload: Upload, repository: Repository
) -> list[ParsingInfo]:
log.info("Processing individual upload", extra=dict(upload_id=upload_id))
if upload.state == "processed" or upload.state == "has_failed":
return []

archive_service = ArchiveService(repository)

payload_bytes = archive_service.read_file(upload.storage_path)
try:
data = json.loads(payload_bytes)
Expand Down Expand Up @@ -423,16 +385,53 @@ def process_individual_arg(
if upload.state != "has_failed":
upload.state = "processed"

db_session.flush()
if all(len(result.testruns) == 0 for result in parsing_results):
successful = False
log.error(
"No test result files were successfully parsed for this upload",
extra=dict(upload_id=upload_id),
)
else:
successful = True

self._bulk_write_tests_to_db(
db_session,
repository.repoid,
commitid,
upload_id,
upload.report.commit.branch,
parsing_results,
flaky_test_set,
upload.flag_names,
)

db_session.commit()
log.info("Marked upload as processed", extra=dict(upload_id=upload.id))

# FIXME: we are unconditionally rewriting as readable, even if we delete it later
readable_report = self.rewrite_readable(network, report_contents)
archive_service.write_file(upload.storage_path, readable_report)
log.info("Wrote readable report to archive", extra=dict(upload_id=upload.id))
log.info(
"Finished processing individual upload", extra=dict(upload_id=upload_id)
)

return parsing_results
if should_delete_archive:
self.delete_archive(archive_service, upload)
else:
readable_report = self.rewrite_readable(network, report_contents)
archive_service.write_file(upload.storage_path, readable_report)

return {"successful": successful}

def rewrite_readable(
self, network: list[str] | None, report_contents: list[ReadableFile]
) -> bytes:
buffer = b""
if network is not None:
for file in network:
buffer += f"{file}\n".encode("utf-8")
buffer += b"<<<<<< network\n\n"
for report_content in report_contents:
buffer += f"# path={report_content.path}\n".encode("utf-8")
buffer += report_content.contents
buffer += b"\n<<<<<< EOF\n\n"
return buffer

def should_delete_archive(self, commit_yaml):
if get_config("services", "minio", "expire_raw_after_n_days"):
Expand All @@ -441,23 +440,17 @@ def should_delete_archive(self, commit_yaml):
commit_yaml, ("codecov", "archive", "uploads"), _else=True
)

def delete_archive(
self, commitid, repository, commit_yaml, uploads_to_delete: List[Upload]
):
archive_service = ArchiveService(repository)
for upload in uploads_to_delete:
archive_url = upload.storage_path
if archive_url and not archive_url.startswith("http"):
log.info(
"Deleting uploaded file as requested",
extra=dict(
archive_url=archive_url,
commit=commitid,
upload=upload.external_id,
parent_task=self.request.parent_id,
),
)
archive_service.delete_file(archive_url)
def delete_archive(self, archive_service: ArchiveService, upload: Upload):
archive_url = upload.storage_path
if archive_url and not archive_url.startswith("http"):
log.info(
"Deleting uploaded file as requested",
extra=dict(
archive_url=archive_url,
upload=upload.external_id,
),
)
archive_service.delete_file(archive_url)


RegisteredTestResultsProcessorTask = celery_app.register_task(
Expand Down

0 comments on commit 5676738

Please sign in to comment.