From 48092a9ac9e01596718ee03cfeb23c5a2f1d3cb5 Mon Sep 17 00:00:00 2001 From: Frank Zhao Date: Mon, 15 Jan 2024 14:55:36 +0800 Subject: [PATCH] feat: add columns into community openrank table (#1502) Signed-off-by: frank-zsy --- src/cron/tasks/community_openrank.ts | 46 ++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/src/cron/tasks/community_openrank.ts b/src/cron/tasks/community_openrank.ts index a8c0cfdb4..cadf241d2 100644 --- a/src/cron/tasks/community_openrank.ts +++ b/src/cron/tasks/community_openrank.ts @@ -31,6 +31,9 @@ const task: Task = { const lastMonth = new Date(now.setMonth(now.getMonth() - 1)); const cor = new Map>>>(); + const actorNameMap = new Map(); + const repoNameMap = new Map(); + const repoOrgMap = new Map(); const workerPool: StaticPool = new StaticPool({ size: localWorkerNumber, task: localCalcTask, @@ -82,18 +85,18 @@ const task: Task = { } const prepareCor = (data: any[], ctx: string[]) => { - const cor: any = {}; + const _cor: any = {}; for (const [platform, repoId] of data) { if (!cor.has(platform)) continue; if (!cor.get(platform)!.has(repoId)) continue; for (const c of ctx) { if (!cor.get(platform)!.get(repoId)!.has(c)) continue; for (const [id, openrank] of cor.get(platform)!.get(repoId)!.get(c)!.entries()) { - cor[`${repoId}_${id}_${c}`] = openrank; + _cor[`${repoId}_${id}_${c}`] = openrank; } } } - return cor; + return _cor; } const calcByNeo4j = async (p: any) => { @@ -131,6 +134,24 @@ const task: Task = { .map(d => `${d.getFullYear()}${(d.getMonth() + 1).toString().padStart(2, '0')}`); } + const loadNames = async (y: number, m: number) => { + repoNameMap.clear(); + repoOrgMap.clear(); + actorNameMap.clear(); + const yyyymm = `${y}${m.toString().padStart(2, '0')}`; + const repoResult = await queryClickhouse(`SELECT DISTINCT platform, repo_id, argMax(repo_name, created_at) FROM events WHERE toYYYYMM(created_at) = ${yyyymm} AND repo_id IN (SELECT id FROM export_repo) GROUP BY repo_id, platform`, { format: 'JSONCompactEachRow' }); + repoResult.forEach(row => { + const [platform, repoId, repoName, orgId, orgLogin] = row; + repoNameMap.set(`${platform}_${repoId}`, repoName); + repoOrgMap.set(`${platform}_${repoId}`, { id: orgId, login: orgLogin }); + }); + const actorResult = await queryClickhouse(`SELECT DISTINCT platform, actor_id, argMax(actor_login, created_at) FROM events WHERE toYYYYMM(created_at) = ${yyyymm} AND repo_id IN (SELECT id FROM export_repo) GROUP BY actor_id, platform`, { format: 'JSONCompactEachRow' }); + actorResult.forEach(row => { + const [platform, actorId, actorLogin] = row; + actorNameMap.set(`${platform}_${actorId}`, actorLogin); + }); + }; + const loadCalculateRepos = async (y: number, m: number) => { const yyyymm = `${y}${m.toString().padStart(2, '0')}`; const q = `SELECT platform, repo_id, groupArray((actor_id, issue_number, activity, merged)) AS rels FROM @@ -188,10 +209,14 @@ GROUP BY repo_id, platform`; const ctx = prepareContext(y, m); const createdAt = `${y}-${m.toString().padStart(2, '0')}-01 00:00:00`; + const lists = await loadCalculateRepos(y, m); logger.info(`Got ${lists.length} repos to calculate, context is ${ctx}`); if (lists.length === 0) return; + await loadNames(y, m); + logger.info(`Loaded ${actorNameMap.size} actors, ${repoNameMap.size} repos.`); + await loadOpenrankHistory(ctx); const elpsMap = new Map(); const processLists: any[] = splitArrayIntoChunks(lists, localCalcBatch); @@ -206,8 +231,17 @@ GROUP BY repo_id, platform`; const saveRecord = (platform: string, repoId: string, idStr: string, openrank: number) => { const type = idStr.substring(0, 1); const id = parseInt(idStr.substring(1)); + const repoName = repoNameMap.get(`${platform}_${repoId}`); + const orgInfo = repoOrgMap.get(`${platform}_${repoId}`); + if (!repoName || !orgInfo) { + logger.error(`Can not find repo name or org info for ${platform}_${repoId}`); + return; + } const record: any = { repo_id: parseInt(repoId), + repo_name: repoName, + org_id: parseInt(orgInfo.id), + org_login: orgInfo.login, platform, openrank, created_at: createdAt, @@ -216,6 +250,12 @@ GROUP BY repo_id, platform`; record.issue_number = id; } else if (type === 'u') { record.actor_id = id; + const actorName = actorNameMap.get(`${platform}_${id}`); + if (!actorName) { + logger.error(`Can not find actor name for ${platform}_${id}`); + return; + } + record.actor_login = actorName; } stream.push(record); updateCor(platform, repoId, `${y}${m.toString().padStart(2, '0')}`, idStr === 'bg' ? repoId : idStr, openrank);