From c3c77d33cc84c3955af30591606a1aa6fda81b6d Mon Sep 17 00:00:00 2001 From: Matt Hammerly Date: Tue, 18 Jul 2023 19:38:20 -0700 Subject: [PATCH] add timers for subspans of sync_repos to understand slowdowns --- tasks/sync_repos.py | 91 ++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/tasks/sync_repos.py b/tasks/sync_repos.py index e3e1325f9..49fad7777 100644 --- a/tasks/sync_repos.py +++ b/tasks/sync_repos.py @@ -4,6 +4,7 @@ from celery.exceptions import SoftTimeLimitExceeded from redis.exceptions import LockError from shared.celery_config import sync_repos_task_name +from shared.metrics import metrics from shared.torngit.exceptions import TorngitClientError from app import celery_app @@ -13,6 +14,7 @@ from tasks.base import BaseCodecovTask log = logging.getLogger(__name__) +metrics_scope = "worker.SyncReposTask" class SyncReposTask(BaseCodecovTask): @@ -66,18 +68,21 @@ async def run_async( ): git = get_owner_provider_service(owner, using_integration) if using_integration: - await self.sync_repos_using_integration( - db_session, git, ownerid, username - ) + with metrics.timer(f"{metrics_scope}.sync_repos_using_integration"): + await self.sync_repos_using_integration( + db_session, git, ownerid, username + ) else: - await self.sync_repos( - db_session, git, owner, username, using_integration - ) + with metrics.timer(f"{metrics_scope}.sync_repos"): + await self.sync_repos( + db_session, git, owner, username, using_integration + ) except LockError: log.warning("Unable to sync repos because another task is already doing it") async def sync_repos_using_integration(self, db_session, git, ownerid, username): - repo_service_ids = await git.list_repos_using_installation(username) + with metrics.timer(f"{metrics_scope}.sync_repos_using_integration.list_repos"): + repo_service_ids = await git.list_repos_using_installation(username) if repo_service_ids: repo_service_ids = list(map(str, repo_service_ids)) if repo_service_ids: @@ -100,7 +105,8 @@ async def sync_repos(self, db_session, git, owner, username, using_integration): # get my repos (and team repos) try: - repos = await git.list_repos() + with metrics.timer(f"{metrics_scope}.sync_repos.list_repos"): + repos = await git.list_repos() except SoftTimeLimitExceeded: old_permissions = owner.permission or [] log.warning( @@ -127,41 +133,48 @@ async def sync_repos(self, db_session, git, owner, username, using_integration): owners_by_id = {} for repo in repos: - _ownerid = owners_by_id.get( - (service, repo["owner"]["service_id"], repo["owner"]["username"]) - ) - if not _ownerid: - _ownerid = self.upsert_owner( - db_session, - service, - repo["owner"]["service_id"], - repo["owner"]["username"], - ) - owners_by_id[ + # Time how long processing a single repo takes so we can estimate how + # performance degrades. Sampling at 10% will be enough. + with metrics.timer(f"{metrics_scope}.process_each_repo", rate=0.1): + _ownerid = owners_by_id.get( (service, repo["owner"]["service_id"], repo["owner"]["username"]) - ] = _ownerid - - repoid = self.upsert_repo( - db_session, service, _ownerid, repo["repo"], using_integration - ) - - if repo["repo"]["fork"]: - _ownerid = self.upsert_owner( - db_session, - service, - repo["repo"]["fork"]["owner"]["service_id"], - repo["repo"]["fork"]["owner"]["username"], ) - - _repoid = self.upsert_repo( - db_session, service, _ownerid, repo["repo"]["fork"]["repo"] + if not _ownerid: + _ownerid = self.upsert_owner( + db_session, + service, + repo["owner"]["service_id"], + repo["owner"]["username"], + ) + owners_by_id[ + ( + service, + repo["owner"]["service_id"], + repo["owner"]["username"], + ) + ] = _ownerid + + repoid = self.upsert_repo( + db_session, service, _ownerid, repo["repo"], using_integration ) - if repo["repo"]["fork"]["repo"]["private"]: - private_project_ids.append(int(_repoid)) - if repo["repo"]["private"]: - private_project_ids.append(int(repoid)) - db_session.commit() + if repo["repo"]["fork"]: + _ownerid = self.upsert_owner( + db_session, + service, + repo["repo"]["fork"]["owner"]["service_id"], + repo["repo"]["fork"]["owner"]["username"], + ) + + _repoid = self.upsert_repo( + db_session, service, _ownerid, repo["repo"]["fork"]["repo"] + ) + + if repo["repo"]["fork"]["repo"]["private"]: + private_project_ids.append(int(_repoid)) + if repo["repo"]["private"]: + private_project_ids.append(int(repoid)) + db_session.commit() log.info( "Updating permissions",