From 5676738985deb26ac1b2360ba5faf17fa92e69a1 Mon Sep 17 00:00:00 2001 From: joseph-sentry <136376984+joseph-sentry@users.noreply.github.com> Date: Tue, 3 Dec 2024 11:48:38 -0500 Subject: [PATCH] fix(ta): handle random task retry better (#917) --- tasks/test_results_processor.py | 167 +++++++++++++++----------------- 1 file changed, 80 insertions(+), 87 deletions(-) diff --git a/tasks/test_results_processor.py b/tasks/test_results_processor.py index aba50b955..95f98b8fc 100644 --- a/tasks/test_results_processor.py +++ b/tasks/test_results_processor.py @@ -4,7 +4,6 @@ import zlib from dataclasses import dataclass from datetime import date, datetime -from typing import List import sentry_sdk from shared.celery_config import test_results_processor_task_name @@ -79,7 +78,6 @@ def run_impl( repoid = int(repoid) results = [] - upload_list = [] repo_flakes = ( db_session.query(Flake.testid) @@ -87,6 +85,14 @@ def run_impl( .all() ) flaky_test_set = {flake.testid for flake in repo_flakes} + repository = ( + db_session.query(Repository) + .filter(Repository.repoid == int(repoid)) + .first() + ) + + should_delete_archive = self.should_delete_archive(commit_yaml) + archive_service = ArchiveService(repository) # process each report session's test information for arguments in arguments_list: @@ -94,21 +100,16 @@ def run_impl( db_session.query(Upload).filter_by(id_=arguments["upload_id"]).first() ) result = self.process_individual_upload( - db_session, repoid, commitid, upload, flaky_test_set + db_session, + archive_service, + repository, + commitid, + upload, + flaky_test_set, + should_delete_archive, ) results.append(result) - upload_list.append(upload) - - if self.should_delete_archive(commit_yaml): - repository = ( - db_session.query(Repository) - .filter(Repository.repoid == int(repoid)) - .first() - ) - self.delete_archive( - commitid, repository, commit_yaml, uploads_to_delete=upload_list - ) return results @@ -329,60 +330,21 @@ def create_daily_total(): log.info("Inserted test instances to database", extra=dict(upload_id=upload_id)) def process_individual_upload( - self, db_session, repoid, commitid, upload_obj: Upload, flaky_test_set: set[str] + self, + db_session, + archive_service: ArchiveService, + repository: Repository, + commitid, + upload: Upload, + flaky_test_set: set[str], + should_delete_archive: bool, ): - upload_id = upload_obj.id - log.info("Processing individual upload", extra=dict(upload_id=upload_id)) - parsing_results = self.process_individual_arg( - db_session, upload_obj, upload_obj.report.commit.repository - ) - - if all(len(result.testruns) == 0 for result in parsing_results): - log.error( - "No test result files were successfully parsed for this upload", - extra=dict(upload_id=upload_id), - ) - return {"successful": False} - - self._bulk_write_tests_to_db( - db_session, - repoid, - commitid, - upload_id, - upload_obj.report.commit.branch, - parsing_results, - flaky_test_set, - upload_obj.flag_names, - ) - log.info( - "Finished processing individual upload", extra=dict(upload_id=upload_id) - ) - - return {"successful": True} - - def rewrite_readable( - self, network: list[str] | None, report_contents: list[ReadableFile] - ) -> bytes: - buffer = b"" - if network is not None: - for file in network: - buffer += f"{file}\n".encode("utf-8") - buffer += b"<<<<<< network\n\n" - for report_content in report_contents: - buffer += f"# path={report_content.path}\n".encode("utf-8") - buffer += report_content.contents - buffer += b"\n<<<<<< EOF\n\n" - return buffer + upload_id = upload.id - @sentry_sdk.trace - def process_individual_arg( - self, db_session: Session, upload: Upload, repository: Repository - ) -> list[ParsingInfo]: + log.info("Processing individual upload", extra=dict(upload_id=upload_id)) if upload.state == "processed" or upload.state == "has_failed": return [] - archive_service = ArchiveService(repository) - payload_bytes = archive_service.read_file(upload.storage_path) try: data = json.loads(payload_bytes) @@ -423,16 +385,53 @@ def process_individual_arg( if upload.state != "has_failed": upload.state = "processed" - db_session.flush() + if all(len(result.testruns) == 0 for result in parsing_results): + successful = False + log.error( + "No test result files were successfully parsed for this upload", + extra=dict(upload_id=upload_id), + ) + else: + successful = True + + self._bulk_write_tests_to_db( + db_session, + repository.repoid, + commitid, + upload_id, + upload.report.commit.branch, + parsing_results, + flaky_test_set, + upload.flag_names, + ) + db_session.commit() - log.info("Marked upload as processed", extra=dict(upload_id=upload.id)) - # FIXME: we are unconditionally rewriting as readable, even if we delete it later - readable_report = self.rewrite_readable(network, report_contents) - archive_service.write_file(upload.storage_path, readable_report) - log.info("Wrote readable report to archive", extra=dict(upload_id=upload.id)) + log.info( + "Finished processing individual upload", extra=dict(upload_id=upload_id) + ) - return parsing_results + if should_delete_archive: + self.delete_archive(archive_service, upload) + else: + readable_report = self.rewrite_readable(network, report_contents) + archive_service.write_file(upload.storage_path, readable_report) + + return {"successful": successful} + + def rewrite_readable( + self, network: list[str] | None, report_contents: list[ReadableFile] + ) -> bytes: + buffer = b"" + if network is not None: + for file in network: + buffer += f"{file}\n".encode("utf-8") + buffer += b"<<<<<< network\n\n" + for report_content in report_contents: + buffer += f"# path={report_content.path}\n".encode("utf-8") + buffer += report_content.contents + buffer += b"\n<<<<<< EOF\n\n" + return buffer def should_delete_archive(self, commit_yaml): if get_config("services", "minio", "expire_raw_after_n_days"): @@ -441,23 +440,17 @@ def should_delete_archive(self, commit_yaml): commit_yaml, ("codecov", "archive", "uploads"), _else=True ) - def delete_archive( - self, commitid, repository, commit_yaml, uploads_to_delete: List[Upload] - ): - archive_service = ArchiveService(repository) - for upload in uploads_to_delete: - archive_url = upload.storage_path - if archive_url and not archive_url.startswith("http"): - log.info( - "Deleting uploaded file as requested", - extra=dict( - archive_url=archive_url, - commit=commitid, - upload=upload.external_id, - parent_task=self.request.parent_id, - ), - ) - archive_service.delete_file(archive_url) + def delete_archive(self, archive_service: ArchiveService, upload: Upload): + archive_url = upload.storage_path + if archive_url and not archive_url.startswith("http"): + log.info( + "Deleting uploaded file as requested", + extra=dict( + archive_url=archive_url, + upload=upload.external_id, + ), + ) + archive_service.delete_file(archive_url) RegisteredTestResultsProcessorTask = celery_app.register_task(