Skip to content

Commit

Permalink
Merge branch 'X-lab2017:master' into pycjs
Browse files Browse the repository at this point in the history
  • Loading branch information
birdflyi authored Jan 15, 2024
2 parents 8e037f5 + 48092a9 commit ac2b4bc
Showing 1 changed file with 43 additions and 3 deletions.
46 changes: 43 additions & 3 deletions src/cron/tasks/community_openrank.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const task: Task = {
const lastMonth = new Date(now.setMonth(now.getMonth() - 1));

const cor = new Map<string, Map<string, Map<string, Map<string, number>>>>();
const actorNameMap = new Map<string, string>();
const repoNameMap = new Map<string, string>();
const repoOrgMap = new Map<string, { id: string, login: string }>();
const workerPool: StaticPool<any, any> = new StaticPool({
size: localWorkerNumber,
task: localCalcTask,
Expand Down Expand Up @@ -82,18 +85,18 @@ const task: Task = {
}

const prepareCor = (data: any[], ctx: string[]) => {
const cor: any = {};
const _cor: any = {};
for (const [platform, repoId] of data) {
if (!cor.has(platform)) continue;
if (!cor.get(platform)!.has(repoId)) continue;
for (const c of ctx) {
if (!cor.get(platform)!.get(repoId)!.has(c)) continue;
for (const [id, openrank] of cor.get(platform)!.get(repoId)!.get(c)!.entries()) {
cor[`${repoId}_${id}_${c}`] = openrank;
_cor[`${repoId}_${id}_${c}`] = openrank;
}
}
}
return cor;
return _cor;
}

const calcByNeo4j = async (p: any) => {
Expand Down Expand Up @@ -131,6 +134,24 @@ const task: Task = {
.map(d => `${d.getFullYear()}${(d.getMonth() + 1).toString().padStart(2, '0')}`);
}

const loadNames = async (y: number, m: number) => {
repoNameMap.clear();
repoOrgMap.clear();
actorNameMap.clear();
const yyyymm = `${y}${m.toString().padStart(2, '0')}`;
const repoResult = await queryClickhouse<string[]>(`SELECT DISTINCT platform, repo_id, argMax(repo_name, created_at) FROM events WHERE toYYYYMM(created_at) = ${yyyymm} AND repo_id IN (SELECT id FROM export_repo) GROUP BY repo_id, platform`, { format: 'JSONCompactEachRow' });
repoResult.forEach(row => {
const [platform, repoId, repoName, orgId, orgLogin] = row;
repoNameMap.set(`${platform}_${repoId}`, repoName);
repoOrgMap.set(`${platform}_${repoId}`, { id: orgId, login: orgLogin });
});
const actorResult = await queryClickhouse<string[]>(`SELECT DISTINCT platform, actor_id, argMax(actor_login, created_at) FROM events WHERE toYYYYMM(created_at) = ${yyyymm} AND repo_id IN (SELECT id FROM export_repo) GROUP BY actor_id, platform`, { format: 'JSONCompactEachRow' });
actorResult.forEach(row => {
const [platform, actorId, actorLogin] = row;
actorNameMap.set(`${platform}_${actorId}`, actorLogin);
});
};

const loadCalculateRepos = async (y: number, m: number) => {
const yyyymm = `${y}${m.toString().padStart(2, '0')}`;
const q = `SELECT platform, repo_id, groupArray((actor_id, issue_number, activity, merged)) AS rels FROM
Expand Down Expand Up @@ -188,10 +209,14 @@ GROUP BY repo_id, platform`;
const ctx = prepareContext(y, m);

const createdAt = `${y}-${m.toString().padStart(2, '0')}-01 00:00:00`;

const lists = await loadCalculateRepos(y, m);
logger.info(`Got ${lists.length} repos to calculate, context is ${ctx}`);
if (lists.length === 0) return;

await loadNames(y, m);
logger.info(`Loaded ${actorNameMap.size} actors, ${repoNameMap.size} repos.`);

await loadOpenrankHistory(ctx);
const elpsMap = new Map<number, { count: number; elps: number; }>();
const processLists: any[] = splitArrayIntoChunks(lists, localCalcBatch);
Expand All @@ -206,8 +231,17 @@ GROUP BY repo_id, platform`;
const saveRecord = (platform: string, repoId: string, idStr: string, openrank: number) => {
const type = idStr.substring(0, 1);
const id = parseInt(idStr.substring(1));
const repoName = repoNameMap.get(`${platform}_${repoId}`);
const orgInfo = repoOrgMap.get(`${platform}_${repoId}`);
if (!repoName || !orgInfo) {
logger.error(`Can not find repo name or org info for ${platform}_${repoId}`);
return;
}
const record: any = {
repo_id: parseInt(repoId),
repo_name: repoName,
org_id: parseInt(orgInfo.id),
org_login: orgInfo.login,
platform,
openrank,
created_at: createdAt,
Expand All @@ -216,6 +250,12 @@ GROUP BY repo_id, platform`;
record.issue_number = id;
} else if (type === 'u') {
record.actor_id = id;
const actorName = actorNameMap.get(`${platform}_${id}`);
if (!actorName) {
logger.error(`Can not find actor name for ${platform}_${id}`);
return;
}
record.actor_login = actorName;
}
stream.push(record);
updateCor(platform, repoId, `${y}${m.toString().padStart(2, '0')}`, idStr === 'bg' ? repoId : idStr, openrank);
Expand Down

0 comments on commit ac2b4bc

Please sign in to comment.