Skip to content

Commit

Permalink
Merge pull request #27 from KeychainMDIP/17-nodes-not-syncing
Browse files Browse the repository at this point in the history
Improved hyperswarm syncing
  • Loading branch information
macterra authored Mar 18, 2024
2 parents ef4b0d4 + 98e491e commit 4abdba0
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 90 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
image: macterra/gatekeeper
volumes:
- ./data:/app/data
ports:
- "3000:3000"

hyperswarm:
build:
Expand Down
58 changes: 52 additions & 6 deletions gatekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ export function writeDb(db) {
fs.writeFileSync(dbName, JSON.stringify(db, null, 4));
}

export async function verifyDb() {
const db = loadDb();
const dids = Object.keys(db.anchors);
let n = 0;
let invalid = 0;

for (const did of dids) {
n += 1;
try {
const doc = await resolveDID(did, null, true);
console.log(`${n} ${did} OK`);
}
catch (error) {
console.log(`${n} ${did} ${error}`);
invalid += 1;
}
}

return invalid;
}

let helia = null;
let ipfs = null;

Expand Down Expand Up @@ -317,7 +338,7 @@ export function fetchUpdates(registry, did) {
return [];
}

export async function resolveDID(did, asOfTime = null) {
export async function resolveDID(did, asOfTime = null, verify = false) {
let doc = await generateDoc(did);
let mdip = doc?.didDocumentMetadata?.mdip;

Expand Down Expand Up @@ -345,13 +366,20 @@ export async function resolveDID(did, asOfTime = null) {

if (hash !== txn.prev) {
// hash mismatch
// if (verify) {
// throw "Invalid hash";
// }
// !!! This fails on key rotation #3 (!?), disabling for now
// continue;
}

const valid = await verifyUpdate(txn, doc);

if (!valid) {
if (verify) {
throw "Invalid update";
}

continue;
}

Expand All @@ -372,6 +400,10 @@ export async function resolveDID(did, asOfTime = null) {
doc.didDocumentMetadata.updated = time;
}
else {
if (verify) {
throw "Invalid operation";
}

console.error(`unknown op ${txn.op}`);
}
}
Expand Down Expand Up @@ -462,15 +494,29 @@ export async function importDID(txns) {
}

export async function mergeBatch(batch) {
let merged = 0;
let verified = 0;
let updated = 0;
let failed = 0;

for (const txns of batch) {
const diff = await importDID(txns);
try {
const diff = await importDID(txns);

if (diff > 0) {
merged += 1;
if (diff > 0) {
updated += 1;
}
else {
verified += 1;
}
}
catch {
failed += 1;
}
}

return merged;
return {
verified: verified,
updated: updated,
failed: failed,
};
}
139 changes: 78 additions & 61 deletions hyperswarm-mediator.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import config from './config.js';
import { EventEmitter } from 'events';
EventEmitter.defaultMaxListeners = 100;

const protocol = '/MDIP/v22.03.01';
const protocol = '/MDIP/v22.03.18';

const swarm = new Hyperswarm();
const peerName = b4a.toString(swarm.keyPair.publicKey, 'hex');
Expand All @@ -37,6 +37,10 @@ function shortName(name) {
return name.slice(0, 4) + '-' + name.slice(-4);
}

function isEmpty(obj) {
return Object.keys(obj).length === 0 && obj.constructor === Object;
}

function loadDb() {
const dbName = 'data/mdip.json';

Expand All @@ -55,13 +59,18 @@ async function shareDb() {

try {
const db = loadDb();
const hash = cipher.hashJSON(db);

if (isEmpty(db) || !db.hyperswarm || isEmpty(db.hyperswarm)) {
return;
}

const hash = cipher.hashJSON(db.hyperswarm);

messagesSeen[hash] = true;

const msg = {
hash: hash.toString(),
data: db,
data: db.hyperswarm,
relays: [],
};

Expand Down Expand Up @@ -90,48 +99,36 @@ async function relayDb(msg) {
}
}

async function mergeBatch(batch) {
try {
console.log(`mergeBatch: merging ${batch.length} DIDs...`);
const { verified, updated, failed } = await gatekeeper.mergeBatch(batch);
console.log(`* ${verified} verified, ${updated} updated, ${failed} failed`);
}
catch (error) {
console.error(`mergeBatch error: ${error}`);
}
}

async function mergeDb(db) {
merging = true;
if (db.hyperswarm) {
if (db) {
// Import DIDs by creation time order to avoid dependency errors
let dids = Object.keys(db.hyperswarm);
dids.sort((a, b) => db.hyperswarm[a][0].time - db.hyperswarm[b][0].time);
let dids = Object.keys(db);
dids.sort((a, b) => db[a][0].time - db[b][0].time);

let batch = [];
for (const did of dids) {
console.log(`Adding to batch: ${did} ${db.hyperswarm[did][0].time}`);
batch.push(db.hyperswarm[did]);
//console.log(`Adding to batch: ${did} ${db.hyperswarm[did][0].time}`);
batch.push(db[did]);

if (batch.length >= 100) {
try {
const imported = await gatekeeper.mergeBatch(batch);
if (imported > 0) {
console.log(`* imported ${imported} DIDs *`);
}
else {
console.log(`* DID synchronization confirmed *`);
}
}
catch (error) {
console.error(`error importing DID: ${did}: ${error}`);
}

await mergeBatch(batch);
batch = [];
}
}

try {
const imported = await gatekeeper.mergeBatch(batch);
if (imported > 0) {
console.log(`* imported ${imported} DIDs *`);
}
else {
console.log(`* DID synchronization confirmed *`);
}
}
catch (error) {
console.error(`error importing DID: ${did}: ${error}`);
}
await mergeBatch(batch);
}
merging = false;
}
Expand All @@ -145,11 +142,21 @@ let queue = asyncLib.queue(async function (task, callback) {

if (!seen) {
messagesSeen[hash] = true;

const db = msg.data;

if (isEmpty(db)) {
return;
}

// const dbName = `${hash}.json`
// fs.writeFileSync(dbName, JSON.stringify(db, null, 4));

msg.relays.push(name);
logMsg(msg.relays[0], hash);
relayDb(msg);
console.log(`* merging db ${hash} *`);
await mergeDb(msg.data);
console.log(`* merging db ${shortName(hash)} *`);
await mergeDb(db);
}
else {
console.log(`received old db: ${shortName(hash)} from: ${shortName(name)}`);
Expand All @@ -173,32 +180,6 @@ function logMsg(name, hash) {
console.log(`--- ${conns.length} nodes connected, ${detected} nodes detected`);
}

setInterval(async () => {
try {
const version = gatekeeper.getVersion();

if (version) {
shareDb();
}
}
catch (error) {
console.error(`Error: ${error}`);
}
}, 10000);

// Join a common topic
const hash = sha256(protocol);
const networkID = Buffer.from(hash).toString('hex');
const topic = b4a.from(networkID, 'hex');
const discovery = swarm.join(topic, { client: true, server: true });

// The flushed promise will resolve when the topic has been fully announced to the DHT
discovery.flushed().then(() => {
console.log(`connecting to gatekeeper at ${config.gatekeeperURL}`);
console.log(`hyperswarm peer id: ${peerName}`);
console.log('joined topic:', b4a.toString(topic, 'hex'));
});

process.on('uncaughtException', (error) => {
//console.error('Unhandled exception caught');
console.error('Unhandled exception caught', error);
Expand All @@ -214,3 +195,39 @@ process.stdin.on('data', d => {
process.exit();
}
});

// Join a common topic
const hash = sha256(protocol);
const networkID = Buffer.from(hash).toString('hex');
const topic = b4a.from(networkID, 'hex');

async function start() {
console.log(`hyperswarm peer id: ${peerName}`);
console.log('joined topic:', b4a.toString(topic, 'hex'));

setInterval(async () => {
try {
const version = gatekeeper.getVersion();

if (version) {
shareDb();
}
}
catch (error) {
console.error(`Error: ${error}`);
}
}, 10000);
}

function main() {
console.log(`connecting to gatekeeper at ${config.gatekeeperURL}`);

const discovery = swarm.join(topic, { client: true, server: true });

// The flushed promise will resolve when the topic has been fully announced to the DHT
discovery.flushed().then(() => {
start();
});
}

main();
54 changes: 31 additions & 23 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,27 @@ app.get('/did/:did', async (req, res) => {

app.get('/explore/:did', async (req, res) => {
try {
const doc = await gatekeeper.resolveDID(req.params.did, req.query.asof);
var hthead = '<html><body>';
hthead = hthead + '<h1>MDIP Network Explorer</h1>';
hthead = hthead + '<table><tr><td><h3>' + req.params.did + '</h3></td>';
var htdoc = JSON.stringify(doc,null,5);
htdoc = htdoc.replace(/"didDocument"/g, '"<b>didDocument</b>"');
htdoc = htdoc.replace(/"didDocumentMetadata"/g, '"<b>didDocumentMetadata</b>"');
htdoc = htdoc.replace(/"manifest"/g, '"<b>manifest</b>"');
htdoc = htdoc.replace(/"issuer"/g, '"<b>issuer</b>"');
htdoc = htdoc.replace(/"signer"/g, '"<b>signer</b>"');
htdoc = htdoc.replace(/"id"/g, '"<b>id</b>"');
htdoc = htdoc.replace(/"credential"/g, '"<b>credential</b>"');
htdoc = htdoc.replace(/"vault"/g, '"<b>vault</b>"');
htdoc = htdoc.replace(/"(did:mdip:.*)"/g, '"<a href="/explore/$1">$1</a>"');
htdoc = hthead + '<tr><td><hr><pre>' + htdoc + '</pre><hr></td></tr>';
var now = new Date();
htdoc = htdoc + '</table>' + now + '</body></html>';
res.send(htdoc);
} catch (error ) {
console.error(error);
res.status(500).send(error.toString());
const doc = await gatekeeper.resolveDID(req.params.did, req.query.asof);
var hthead = '<html><body>';
hthead = hthead + '<h1>MDIP Network Explorer</h1>';
hthead = hthead + '<table><tr><td><h3>' + req.params.did + '</h3></td>';
var htdoc = JSON.stringify(doc, null, 5);
htdoc = htdoc.replace(/"didDocument"/g, '"<b>didDocument</b>"');
htdoc = htdoc.replace(/"didDocumentMetadata"/g, '"<b>didDocumentMetadata</b>"');
htdoc = htdoc.replace(/"manifest"/g, '"<b>manifest</b>"');
htdoc = htdoc.replace(/"issuer"/g, '"<b>issuer</b>"');
htdoc = htdoc.replace(/"signer"/g, '"<b>signer</b>"');
htdoc = htdoc.replace(/"id"/g, '"<b>id</b>"');
htdoc = htdoc.replace(/"credential"/g, '"<b>credential</b>"');
htdoc = htdoc.replace(/"vault"/g, '"<b>vault</b>"');
htdoc = htdoc.replace(/"(did:mdip:.*)"/g, '"<a href="/explore/$1">$1</a>"');
htdoc = hthead + '<tr><td><hr><pre>' + htdoc + '</pre><hr></td></tr>';
var now = new Date();
htdoc = htdoc + '</table>' + now + '</body></html>';
res.send(htdoc);
} catch (error) {
console.error(error);
res.status(500).send(error.toString());
}
});

Expand Down Expand Up @@ -123,8 +123,16 @@ app.post('/merge', async (req, res) => {

const port = 3000;

app.listen(port, () => {
console.log(`Server is running on port ${port}`);
gatekeeper.verifyDb().then((invalid) => {
if (invalid === 0) {
app.listen(port, () => {
console.log(`Server is running on port ${port}`);
});
}
else {
console.log(`${invalid} invalid DIDs in MDIP db`);
process.exit();
}
});

process.on('uncaughtException', (error) => {
Expand Down

0 comments on commit 4abdba0

Please sign in to comment.