From d3cc9507feda140855928bd55a41fd6a45d882f3 Mon Sep 17 00:00:00 2001 From: Nonumpa Date: Mon, 18 Mar 2024 15:46:02 +0800 Subject: [PATCH] Migrate db, retrieve contributors from ydocs, and insert them into the db --- migration/addContributors.ts | 189 +++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 migration/addContributors.ts diff --git a/migration/addContributors.ts b/migration/addContributors.ts new file mode 100644 index 0000000..446e4c1 --- /dev/null +++ b/migration/addContributors.ts @@ -0,0 +1,189 @@ +/** + * How to use + * 1. Make sure reindexed the ydocs using `npm run reload -- ydocs` in rumors-db + * 2. Run `npx ts-node --esm .\migration\addContributors.ts` + */ + +import * as Y from 'yjs'; +import elasticsearch from '@elastic/elasticsearch'; +import 'dotenv/config'; + +const elasticsearchOpts: elasticsearch.ClientOptions = { + node: process.env.ELASTICSEARCH_URL, +}; + +const client = new elasticsearch.Client(elasticsearchOpts); + +export const getAllContributors = ( + articleId: string, + document: Y.Doc, + versions: { createdAt: string; snapshot: string }[] +) => { + const contributors = new Map(); + const permanentUserData = new Y.PermanentUserData(document); + const yXmlFragment = document.getXmlFragment('prosemirror'); + + /** + * @param {'removed'|'added'} type + * @param {Y.ID} id + */ + const computeYChange = (type: string, id: Y.ID) => { + const user = + type === 'added' + ? permanentUserData.getUserByClientId(id.client) + : permanentUserData.getUserByDeletedId(id); + let userId; + try { + userId = JSON.parse(user).id; + } catch (e) { + userId = undefined; + console.error('error user format: ', user); + } + return { + userId, + type, + }; + }; + + versions.forEach((v: { createdAt: string; snapshot: string }, i: number) => { + const snapshot = Y.decodeSnapshot(Buffer.from(v.snapshot, 'base64')); + const versionCreatedAt = v.createdAt; + const versionDate = new Date(versionCreatedAt); + const prevSnapshot = + i > 0 + ? Y.decodeSnapshot(Buffer.from(versions[i - 1].snapshot, 'base64')) + : Y.createSnapshot(Y.createDeleteSet(), new Map()); + + yXmlFragment.forEach((xmlElement) => { + const fragmentContent = Y.typeListToArraySnapshot( + xmlElement, + new Y.Snapshot(prevSnapshot.ds, snapshot.sv) + ); + fragmentContent.forEach((f) => { + const deltas = + f.constructor === Y.XmlText + ? f.toDelta(snapshot, prevSnapshot, computeYChange) + : undefined; + deltas?.forEach((d) => { + const { attributes } = d; + if (attributes && attributes.ychange) { + const { userId } = attributes.ychange; + if (!userId) { + console.error('unknown user, articleId: ', articleId); + return; + } + const contributor = contributors.get(userId); + if (!contributor || versionDate > new Date(contributor.createdAt)) { + contributors.set(userId, { + userId, + updatedAt: versionCreatedAt, + appId: 'WEBSITE', + }); + } + } + }); + }); + }); + }); + return [...contributors.values()]; +}; + +const forEachYdoc = async (callback) => { + let scroll_id, + processedCount = 0, + total = Infinity; + + const { + body: { hits, _scroll_id }, + } = await client.search({ + index: 'ydocs', + type: 'doc', + scroll: '30s', + size: 100, + body: { + query: { + match_all: {}, + }, + }, + _source: ['ydoc', 'versions'], + }); + + await callback(hits.hits); + + processedCount += hits.hits.length; + total = hits.total; + scroll_id = _scroll_id; + + // eslint-disable-next-line no-console + console.info(`${processedCount} / ${total} Processed`); + while (processedCount < total) { + const { + body: { hits, _scroll_id }, + } = await client.scroll({ + scroll: '30s', + scroll_id, // Fix: Change 'scrollId' to 'scroll_id' + }); + + await callback(hits.hits); + + processedCount += hits.hits.length; + scroll_id = _scroll_id; + + // eslint-disable-next-line no-console + console.info(`${processedCount} / ${total} Processed`); + } +}; + +async function main() { + const errorArticles = []; + // list all ydocs + forEachYdoc(async (hits) => { + const operations = []; + hits.map(async ({ _id: id, _source: { ydoc: data, versions } }) => { + if (!id || !data || !versions) { + errorArticles.push(id); + return; + } + + // restore the document + const update = Buffer.from(data, 'base64'); + const doc = new Y.Doc(); + Y.applyUpdate(doc, update); + + const contributors = getAllContributors(id, doc, versions); + if (contributors.length === 0) { + errorArticles.push(id); + console.error('no contributors found for articleId: ', id); + return; + } + operations.push({ + update: { + _index: 'articles', + _type: 'doc', + _id: id, + }, + }); + operations.push({ + doc: { contributors }, + }); + }); + if (operations.length !== 0) { + try { + const { body: result } = await client.bulk({ + body: operations, + refresh: 'true', + _source: 'false', + timeout: '10m', + }); + console.error('result: ', result); + } catch (e) { + console.error('error: ', e); + throw e; + } + } + + console.error('errorArticles: ', errorArticles); + }); +} + +main().catch(console.error);