Skip to content

Commit

Permalink
Migrate db, retrieve contributors from ydocs, and insert them into th…
Browse files Browse the repository at this point in the history
…e db
  • Loading branch information
nonumpa committed Mar 18, 2024
1 parent 070dd8e commit d3cc950
Showing 1 changed file with 189 additions and 0 deletions.
189 changes: 189 additions & 0 deletions migration/addContributors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* How to use
* 1. Make sure reindexed the ydocs using `npm run reload -- ydocs` in rumors-db
* 2. Run `npx ts-node --esm .\migration\addContributors.ts`
*/

import * as Y from 'yjs';
import elasticsearch from '@elastic/elasticsearch';
import 'dotenv/config';

const elasticsearchOpts: elasticsearch.ClientOptions = {
node: process.env.ELASTICSEARCH_URL,
};

const client = new elasticsearch.Client(elasticsearchOpts);

export const getAllContributors = (
articleId: string,
document: Y.Doc,
versions: { createdAt: string; snapshot: string }[]
) => {
const contributors = new Map();
const permanentUserData = new Y.PermanentUserData(document);
const yXmlFragment = document.getXmlFragment('prosemirror');

/**
* @param {'removed'|'added'} type
* @param {Y.ID} id
*/
const computeYChange = (type: string, id: Y.ID) => {
const user =
type === 'added'
? permanentUserData.getUserByClientId(id.client)
: permanentUserData.getUserByDeletedId(id);
let userId;
try {
userId = JSON.parse(user).id;
} catch (e) {
userId = undefined;
console.error('error user format: ', user);
}
return {
userId,
type,
};
};

versions.forEach((v: { createdAt: string; snapshot: string }, i: number) => {
const snapshot = Y.decodeSnapshot(Buffer.from(v.snapshot, 'base64'));
const versionCreatedAt = v.createdAt;
const versionDate = new Date(versionCreatedAt);
const prevSnapshot =
i > 0
? Y.decodeSnapshot(Buffer.from(versions[i - 1].snapshot, 'base64'))
: Y.createSnapshot(Y.createDeleteSet(), new Map());

yXmlFragment.forEach((xmlElement) => {
const fragmentContent = Y.typeListToArraySnapshot(
xmlElement,
new Y.Snapshot(prevSnapshot.ds, snapshot.sv)
);
fragmentContent.forEach((f) => {
const deltas =
f.constructor === Y.XmlText
? f.toDelta(snapshot, prevSnapshot, computeYChange)
: undefined;
deltas?.forEach((d) => {
const { attributes } = d;
if (attributes && attributes.ychange) {
const { userId } = attributes.ychange;
if (!userId) {
console.error('unknown user, articleId: ', articleId);
return;
}
const contributor = contributors.get(userId);
if (!contributor || versionDate > new Date(contributor.createdAt)) {
contributors.set(userId, {
userId,
updatedAt: versionCreatedAt,
appId: 'WEBSITE',
});
}
}
});
});
});
});
return [...contributors.values()];
};

const forEachYdoc = async (callback) => {
let scroll_id,
processedCount = 0,
total = Infinity;

const {
body: { hits, _scroll_id },
} = await client.search({
index: 'ydocs',
type: 'doc',
scroll: '30s',
size: 100,
body: {
query: {
match_all: {},
},
},
_source: ['ydoc', 'versions'],
});

await callback(hits.hits);

processedCount += hits.hits.length;
total = hits.total;
scroll_id = _scroll_id;

// eslint-disable-next-line no-console
console.info(`${processedCount} / ${total} Processed`);
while (processedCount < total) {
const {
body: { hits, _scroll_id },
} = await client.scroll({
scroll: '30s',
scroll_id, // Fix: Change 'scrollId' to 'scroll_id'
});

await callback(hits.hits);

processedCount += hits.hits.length;
scroll_id = _scroll_id;

// eslint-disable-next-line no-console
console.info(`${processedCount} / ${total} Processed`);
}
};

async function main() {
const errorArticles = [];
// list all ydocs
forEachYdoc(async (hits) => {
const operations = [];
hits.map(async ({ _id: id, _source: { ydoc: data, versions } }) => {
if (!id || !data || !versions) {
errorArticles.push(id);
return;
}

// restore the document
const update = Buffer.from(data, 'base64');
const doc = new Y.Doc();
Y.applyUpdate(doc, update);

const contributors = getAllContributors(id, doc, versions);
if (contributors.length === 0) {
errorArticles.push(id);
console.error('no contributors found for articleId: ', id);
return;
}
operations.push({
update: {
_index: 'articles',
_type: 'doc',
_id: id,
},
});
operations.push({
doc: { contributors },
});
});
if (operations.length !== 0) {
try {
const { body: result } = await client.bulk({
body: operations,
refresh: 'true',
_source: 'false',
timeout: '10m',
});
console.error('result: ', result);
} catch (e) {
console.error('error: ', e);
throw e;
}
}

console.error('errorArticles: ', errorArticles);
});
}

main().catch(console.error);

0 comments on commit d3cc950

Please sign in to comment.