From 828f971779e733f72d45276c654f747e2dacdc25 Mon Sep 17 00:00:00 2001 From: David McFadzean Date: Mon, 18 Mar 2024 12:21:31 -0400 Subject: [PATCH] Refactored mergeDb --- docker-compose.yml | 2 + gatekeeper.js | 24 +++++-- hyperswarm-mediator.js | 139 +++++++++++++++++++++++------------------ 3 files changed, 99 insertions(+), 66 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index b445b0c3..b443e139 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,8 @@ services: image: macterra/gatekeeper volumes: - ./data:/app/data + ports: + - "3000:3000" hyperswarm: build: diff --git a/gatekeeper.js b/gatekeeper.js index dbafbd0e..c8191ae8 100644 --- a/gatekeeper.js +++ b/gatekeeper.js @@ -462,15 +462,29 @@ export async function importDID(txns) { } export async function mergeBatch(batch) { - let merged = 0; + let verified = 0; + let updated = 0; + let failed = 0; for (const txns of batch) { - const diff = await importDID(txns); + try { + const diff = await importDID(txns); - if (diff > 0) { - merged += 1; + if (diff > 0) { + updated += 1; + } + else { + verified += 1; + } + } + catch { + failed += 1; } } - return merged; + return { + verified: verified, + updated: updated, + failed: failed, + }; } diff --git a/hyperswarm-mediator.js b/hyperswarm-mediator.js index 383d3cc3..b5ec3b17 100644 --- a/hyperswarm-mediator.js +++ b/hyperswarm-mediator.js @@ -12,7 +12,7 @@ import config from './config.js'; import { EventEmitter } from 'events'; EventEmitter.defaultMaxListeners = 100; -const protocol = '/MDIP/v22.03.01'; +const protocol = '/MDIP/v22.03.18'; const swarm = new Hyperswarm(); const peerName = b4a.toString(swarm.keyPair.publicKey, 'hex'); @@ -37,6 +37,10 @@ function shortName(name) { return name.slice(0, 4) + '-' + name.slice(-4); } +function isEmpty(obj) { + return Object.keys(obj).length === 0 && obj.constructor === Object; +} + function loadDb() { const dbName = 'data/mdip.json'; @@ -55,13 +59,18 @@ async function shareDb() { try { const db = loadDb(); - const hash = cipher.hashJSON(db); + + if (isEmpty(db) || !db.hyperswarm || isEmpty(db.hyperswarm)) { + return; + } + + const hash = cipher.hashJSON(db.hyperswarm); messagesSeen[hash] = true; const msg = { hash: hash.toString(), - data: db, + data: db.hyperswarm, relays: [], }; @@ -90,48 +99,36 @@ async function relayDb(msg) { } } +async function mergeBatch(batch) { + try { + console.log(`mergeBatch: merging ${batch.length} DIDs...`); + const { verified, updated, failed } = await gatekeeper.mergeBatch(batch); + console.log(`* ${verified} verified, ${updated} updated, ${failed} failed`); + } + catch (error) { + console.error(`mergeBatch error: ${error}`); + } +} + async function mergeDb(db) { merging = true; - if (db.hyperswarm) { + if (db) { // Import DIDs by creation time order to avoid dependency errors - let dids = Object.keys(db.hyperswarm); - dids.sort((a, b) => db.hyperswarm[a][0].time - db.hyperswarm[b][0].time); + let dids = Object.keys(db); + dids.sort((a, b) => db[a][0].time - db[b][0].time); let batch = []; for (const did of dids) { - console.log(`Adding to batch: ${did} ${db.hyperswarm[did][0].time}`); - batch.push(db.hyperswarm[did]); + //console.log(`Adding to batch: ${did} ${db.hyperswarm[did][0].time}`); + batch.push(db[did]); if (batch.length >= 100) { - try { - const imported = await gatekeeper.mergeBatch(batch); - if (imported > 0) { - console.log(`* imported ${imported} DIDs *`); - } - else { - console.log(`* DID synchronization confirmed *`); - } - } - catch (error) { - console.error(`error importing DID: ${did}: ${error}`); - } - + await mergeBatch(batch); batch = []; } } - try { - const imported = await gatekeeper.mergeBatch(batch); - if (imported > 0) { - console.log(`* imported ${imported} DIDs *`); - } - else { - console.log(`* DID synchronization confirmed *`); - } - } - catch (error) { - console.error(`error importing DID: ${did}: ${error}`); - } + await mergeBatch(batch); } merging = false; } @@ -145,11 +142,21 @@ let queue = asyncLib.queue(async function (task, callback) { if (!seen) { messagesSeen[hash] = true; + + const db = msg.data; + + if (isEmpty(db)) { + return; + } + + // const dbName = `${hash}.json` + // fs.writeFileSync(dbName, JSON.stringify(db, null, 4)); + msg.relays.push(name); logMsg(msg.relays[0], hash); relayDb(msg); - console.log(`* merging db ${hash} *`); - await mergeDb(msg.data); + console.log(`* merging db ${shortName(hash)} *`); + await mergeDb(db); } else { console.log(`received old db: ${shortName(hash)} from: ${shortName(name)}`); @@ -173,32 +180,6 @@ function logMsg(name, hash) { console.log(`--- ${conns.length} nodes connected, ${detected} nodes detected`); } -setInterval(async () => { - try { - const version = gatekeeper.getVersion(); - - if (version) { - shareDb(); - } - } - catch (error) { - console.error(`Error: ${error}`); - } -}, 10000); - -// Join a common topic -const hash = sha256(protocol); -const networkID = Buffer.from(hash).toString('hex'); -const topic = b4a.from(networkID, 'hex'); -const discovery = swarm.join(topic, { client: true, server: true }); - -// The flushed promise will resolve when the topic has been fully announced to the DHT -discovery.flushed().then(() => { - console.log(`connecting to gatekeeper at ${config.gatekeeperURL}`); - console.log(`hyperswarm peer id: ${peerName}`); - console.log('joined topic:', b4a.toString(topic, 'hex')); -}); - process.on('uncaughtException', (error) => { //console.error('Unhandled exception caught'); console.error('Unhandled exception caught', error); @@ -214,3 +195,39 @@ process.stdin.on('data', d => { process.exit(); } }); + +// Join a common topic +const hash = sha256(protocol); +const networkID = Buffer.from(hash).toString('hex'); +const topic = b4a.from(networkID, 'hex'); + +async function start() { + console.log(`hyperswarm peer id: ${peerName}`); + console.log('joined topic:', b4a.toString(topic, 'hex')); + + setInterval(async () => { + try { + const version = gatekeeper.getVersion(); + + if (version) { + shareDb(); + } + } + catch (error) { + console.error(`Error: ${error}`); + } + }, 10000); +} + +function main() { + console.log(`connecting to gatekeeper at ${config.gatekeeperURL}`); + + const discovery = swarm.join(topic, { client: true, server: true }); + + // The flushed promise will resolve when the topic has been fully announced to the DHT + discovery.flushed().then(() => { + start(); + }); +} + +main();