From 0705c36b4ec7f90a085f106f88d126e1290bc875 Mon Sep 17 00:00:00 2001
From: joseph-sentry <136376984+joseph-sentry@users.noreply.github.com>
Date: Wed, 19 Jun 2024 11:48:56 -0400
Subject: [PATCH] feat: add TestResultsFlow for measuring time to notification
 (#439)

Signed-off-by: joseph-sentry <joseph.sawaya@sentry.io>
---
 helpers/checkpoint_logger/flows.py   | 20 +++++++++++++++++++
 tasks/test_results_finisher.py       | 12 +++++++++++-
 tasks/tests/unit/test_upload_task.py |  1 +
 tasks/upload.py                      | 29 ++++++++++++++++------------
 4 files changed, 49 insertions(+), 13 deletions(-)

diff --git a/helpers/checkpoint_logger/flows.py b/helpers/checkpoint_logger/flows.py
index de352f5e2..5e156e9c7 100644
--- a/helpers/checkpoint_logger/flows.py
+++ b/helpers/checkpoint_logger/flows.py
@@ -55,3 +55,23 @@ class UploadFlow(BaseFlow):
     NOTIF_TOO_MANY_RETRIES = auto()
     NOTIF_STALE_HEAD = auto()
     NOTIF_ERROR_NO_REPORT = auto()
+
+
+@failure_events("TEST_RESULTS_ERROR")
+@success_events("TEST_RESULTS_BEGIN")
+@subflows(
+    ("test_results_notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"),
+    ("flake_notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"),
+    (
+        "test_results_processing_time",
+        "TEST_RESULTS_BEGIN",
+        "TEST_RESULTS_FINISHER_BEGIN",
+    ),
+)
+@reliability_counters
+class TestResultsFlow(BaseFlow):
+    TEST_RESULTS_BEGIN = auto()
+    TEST_RESULTS_NOTIFY = auto()
+    FLAKE_DETECTION_NOTIFY = auto()
+    TEST_RESULTS_ERROR = auto()
+    TEST_RESULTS_FINISHER_BEGIN = auto()
diff --git a/tasks/test_results_finisher.py b/tasks/test_results_finisher.py
index 3b10b338f..438962cd6 100644
--- a/tasks/test_results_finisher.py
+++ b/tasks/test_results_finisher.py
@@ -10,6 +10,8 @@
 from app import celery_app
 from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError
 from database.models import Commit, TestResultReportTotals
+from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
+from helpers.checkpoint_logger.flows import TestResultsFlow
 from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths
 from rollouts import FLAKY_TEST_DETECTION
 from services.failure_normalizer import FailureNormalizer
@@ -135,6 +137,10 @@ def process_impl_within_lock(
             ),
         )
 
+        checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs)
+
+        checkpoints.log(TestResultsFlow.TEST_RESULTS_FINISHER_BEGIN)
+
         commit: Commit = (
             db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first()
         )
@@ -270,6 +276,7 @@ def process_impl_within_lock(
         )
 
         with metrics.timing("test_results.finisher.notification"):
+            checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
             success, reason = async_to_sync(notifier.notify)(payload)
 
         log.info(
@@ -301,7 +308,7 @@ def process_impl_within_lock(
             )
             with metrics.timing("test_results.finisher.run_flaky_test_detection"):
                 success, reason = self.run_flaky_test_detection(
-                    db_session, repoid, notifier, payload
+                    db_session, repoid, notifier, payload, checkpoints=checkpoints
                 )
 
             metrics.incr(
@@ -321,6 +328,7 @@ def run_flaky_test_detection(
         repoid,
         notifier: TestResultsNotifier,
         payload: TestResultsNotificationPayload,
+        checkpoints=None,
     ):
         ignore_predefined = read_yaml_field(
             "test_analytics", "ignore_predefined", _else=False
@@ -375,6 +383,8 @@ def run_flaky_test_detection(
             )
             db_session.flush()
 
+        if checkpoints:
+            checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
         success, reason = async_to_sync(notifier.notify)(payload)
         log.info(
             "Added flaky test information to the PR comment",
diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py
index 3edd24708..b363fd3eb 100644
--- a/tasks/tests/unit/test_upload_task.py
+++ b/tasks/tests/unit/test_upload_task.py
@@ -335,6 +335,7 @@ def test_upload_task_call_test_results(
                 repoid=commit.repoid,
                 commitid=commit.commitid,
                 commit_yaml={"codecov": {"max_report_age": "1y ago"}},
+                checkpoints_TestResultsFlow=None,
             )
         )
 
diff --git a/tasks/upload.py b/tasks/upload.py
index e45664468..eab16ae7f 100644
--- a/tasks/upload.py
+++ b/tasks/upload.py
@@ -26,7 +26,7 @@
 from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME
 from helpers.checkpoint_logger import _kwargs_key
 from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
-from helpers.checkpoint_logger.flows import UploadFlow
+from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
 from helpers.exceptions import RepositoryWithoutValidBotError
 from helpers.github_installation import get_installation_name_for_owner_for_task
 from helpers.parallel_upload_processing import get_parallel_session_ids
@@ -258,6 +258,10 @@ def run_impl(
             checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log(
                 UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True
             )
+        elif report_type == ReportType.TEST_RESULTS.value:
+            checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log(
+                TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True
+            )
 
         repoid = int(repoid)
         log.info(
@@ -628,7 +632,7 @@ def schedule_task(
             )
         elif commit_report.report_type == ReportType.TEST_RESULTS.value:
             res = self._schedule_test_results_processing_task(
-                commit, commit_yaml, argument_list, commit_report
+                commit, commit_yaml, argument_list, commit_report, checkpoints
             )
 
         if res:
@@ -850,11 +854,7 @@ def _schedule_bundle_analysis_processing_task(
         return res
 
     def _schedule_test_results_processing_task(
-        self,
-        commit,
-        commit_yaml,
-        argument_list,
-        commit_report,
+        self, commit, commit_yaml, argument_list, commit_report, checkpoints=None
     ):
         processor_task_group = []
         for i in range(0, len(argument_list), CHUNK_SIZE):
@@ -872,15 +872,20 @@ def _schedule_test_results_processing_task(
                 )
                 processor_task_group.append(sig)
         if processor_task_group:
+            checkpoint_data = None
+            if checkpoints:
+                checkpoint_data = checkpoints.data
+            kwargs = {
+                "repoid": commit.repoid,
+                "commitid": commit.commitid,
+                "commit_yaml": commit_yaml,
+                _kwargs_key(TestResultsFlow): checkpoint_data,
+            }
             res = chord(
                 processor_task_group,
                 test_results_finisher_task.signature(
                     args=(),
-                    kwargs=dict(
-                        repoid=commit.repoid,
-                        commitid=commit.commitid,
-                        commit_yaml=commit_yaml,
-                    ),
+                    kwargs=kwargs,
                 ),
             ).apply_async()