Skip to content

Commit

Permalink
feat: update user info for all users in github log (#1423)
Browse files Browse the repository at this point in the history
* feat: update user info for all users in github log

Signed-off-by: frank-zsy <[email protected]>

* fix: modify the table schema

Signed-off-by: frank-zsy <[email protected]>

---------

Signed-off-by: frank-zsy <[email protected]>
  • Loading branch information
frank-zsy authored Nov 4, 2023
1 parent 3cbfc02 commit db65eb9
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 63 deletions.
40 changes: 6 additions & 34 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"@octokit/core": "^3.6.0",
"ali-oss": "^6.17.1",
"dateformat": "^4.5.1",
"github-graphql-v4-client": "0.1.4",
"ijavascript-plotly": "0.0.1",
"js-yaml": "^4.1.0",
"lodash": "^4.17.21",
Expand All @@ -57,7 +58,6 @@
"@types/node": "^14.14.37",
"@types/node-cron": "^3.0.1",
"@types/request": "^2.48.5",
"github-data-cat": "^1.1.4",
"mocha": "^10.2.0",
"ts-node": "^10.9.1",
"typescript": "^4.9.3"
Expand Down
104 changes: 76 additions & 28 deletions src/cron/tasks/update_github_users.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Task } from '..';
import getConfig from '../../config';
import { DataCat } from 'github-data-cat';
import { query, queryStream } from '../../db/clickhouse';
import { query } from '../../db/clickhouse';
import { Readable } from 'stream';
import { waitUntil } from '../../utils';
import { createClient } from '@clickhouse/client';
import { GitHubClient } from 'github-graphql-v4-client';

/**
* This task is used to update github users basic info
Expand All @@ -14,18 +13,43 @@ const task: Task = {
enable: false,
immediate: false,
callback: async () => {
const updateBatchSize = 2000;
const config = await getConfig();
const tokens = config.github.tokens;
const dataCat = new DataCat({
const graphqlClient = new GitHubClient({
tokens,
maxConcurrentReqNumber: 30,
logger: {
info: () => { },
error: () => { },
warn: () => { }
}
});
await dataCat.init();
})
await graphqlClient.init();

const queryUserInfo = async (login: string) => {
const result: any = await graphqlClient.query(`query getUser($login: String!){
user(login: $login) {
name
bio
location
email
company
twitterUsername
createdAt
socialAccounts(first: 50) {
nodes {
displayName
provider
}
}
}
}`, { login });
if (!result) return null;
const user = result.user;
user.socialAccounts = user.socialAccounts.nodes;
return user;
};

// create info table
const createTableQuery = `
Expand All @@ -39,6 +63,9 @@ const task: Task = {
\`bio\` String,
\`email\` String,
\`name\` String,
\`twitter_username\` String,
\`social_accounts.name\` Array(String),
\`social_accounts.provider\` Array(LowCardinality(String)),
\`created_at\` DateTime
)
ENGINE = ReplacingMergeTree
Expand All @@ -48,46 +75,67 @@ const task: Task = {

// get users
const now = new Date();
const userQuery = `
SELECT id, actor_login FROM gh_export_user
const date = `${now.getFullYear()}-${(now.getMonth() + 1).toString().padStart(2, '0')}-01 00:00:00`;
const getUsersList = async (totalCount: number): Promise<any[]> => {
// try to get export user first, export users need to be updated every month
let q = `SELECT id, actor_login FROM gh_export_user
WHERE id NOT IN (
SELECT id FROM gh_user_info
WHERE toYYYYMM(updated_at) = ${now.getFullYear()}${(now.getMonth() + 1).toString().padStart(2, '0')})
LIMIT 1500`;
LIMIT ${totalCount}`;
let usersList = await query(q);
if (usersList.length > 0) return usersList;
// then try to get users who has events, every user should be updated at least once a year
q = `SELECT actor_id, argMax(actor_login, created_at) FROM gh_user_openrank
WHERE actor_id NOT IN (SELECT id FROM gh_user_info WHERE toYear(updated_at) = ${now.getFullYear()})
GROUP BY actor_id
LIMIT ${totalCount}`;
usersList = await query(q);
if (usersList.length > 0) return usersList;
// then try to get any user in the log, every user should be updated at least once
q = `SELECT actor_id, argMax(actor_login, created_at) FROM gh_events
WHERE actor_id NOT IN (SELECT id FROM gh_user_info)
GROUP BY actor_id
LIMIT ${totalCount}`;
usersList = await query(q);
return usersList;
};

const date = `${now.getFullYear()}-${(now.getMonth() + 1).toString().padStart(2, '0')}-01 00:00:00`;
let totalCount = 0, processedCount = 0;
const usersList = await getUsersList(updateBatchSize);
if (usersList.length === 0) return;
console.log(`Get ${usersList.length} users to update`);

let processedCount = 0;
const stream = new Readable({
objectMode: true,
read: () => { },
});
const client = createClient(config.db.clickhouse);
queryStream(userQuery, async row => {
totalCount++;
const [id, login] = row;
const info = await dataCat.user.info(login);
const item: any = { id, updated_at: date };
if (!info) {

for (const [id, login] of usersList) {
const user: any = await queryUserInfo(login);
const item: any = { id: parseInt(id), updated_at: date };
if (!user) {
item.status = 'not_found';
} else {
item.status = 'normal';
item.location = info.location ?? '';
item.company = info.company ?? '';
item.bio = info.bio ?? '';
item.email = info.email ?? '';
item.name = info.name ?? '';
item.created_at = info.createdAt.replace('T', ' ').replace('Z', '');
item.location = user.location ?? '';
item.company = user.company ?? '';
item.bio = user.bio ?? '';
item.email = user.email ?? '';
item.name = user.name ?? '';
item.twitter_username = user.twitterUsername ?? '';
item['social_accounts.name'] = (user.socialAccounts ?? []).map(i => i.displayName);
item['social_accounts.provider'] = (user.socialAccounts ?? []).map(i => i.provider);
item.created_at = user.createdAt.replace('T', ' ').replace('Z', '');
}
stream.push(item);
processedCount++;
if (processedCount % 100 === 0) {
console.log(`${processedCount} accounts has been processed.`);
}
}).then(() => {
waitUntil(() => totalCount === processedCount).then(() => {
stream.push(null);
})
});
}
stream.push(null);
await client.insert({
table: 'gh_user_info',
values: stream,
Expand Down

0 comments on commit db65eb9

Please sign in to comment.