Skip to content

Commit

Permalink
Refactored mergeDb
Browse files Browse the repository at this point in the history
  • Loading branch information
macterra committed Mar 18, 2024
1 parent ef4b0d4 commit 828f971
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 66 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
image: macterra/gatekeeper
volumes:
- ./data:/app/data
ports:
- "3000:3000"

hyperswarm:
build:
Expand Down
24 changes: 19 additions & 5 deletions gatekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,29 @@ export async function importDID(txns) {
}

export async function mergeBatch(batch) {
let merged = 0;
let verified = 0;
let updated = 0;
let failed = 0;

for (const txns of batch) {
const diff = await importDID(txns);
try {
const diff = await importDID(txns);

if (diff > 0) {
merged += 1;
if (diff > 0) {
updated += 1;
}
else {
verified += 1;
}
}
catch {
failed += 1;
}
}

return merged;
return {
verified: verified,
updated: updated,
failed: failed,
};
}
139 changes: 78 additions & 61 deletions hyperswarm-mediator.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import config from './config.js';
import { EventEmitter } from 'events';
EventEmitter.defaultMaxListeners = 100;

const protocol = '/MDIP/v22.03.01';
const protocol = '/MDIP/v22.03.18';

const swarm = new Hyperswarm();
const peerName = b4a.toString(swarm.keyPair.publicKey, 'hex');
Expand All @@ -37,6 +37,10 @@ function shortName(name) {
return name.slice(0, 4) + '-' + name.slice(-4);
}

function isEmpty(obj) {
return Object.keys(obj).length === 0 && obj.constructor === Object;
}

function loadDb() {
const dbName = 'data/mdip.json';

Expand All @@ -55,13 +59,18 @@ async function shareDb() {

try {
const db = loadDb();
const hash = cipher.hashJSON(db);

if (isEmpty(db) || !db.hyperswarm || isEmpty(db.hyperswarm)) {
return;
}

const hash = cipher.hashJSON(db.hyperswarm);

messagesSeen[hash] = true;

const msg = {
hash: hash.toString(),
data: db,
data: db.hyperswarm,
relays: [],
};

Expand Down Expand Up @@ -90,48 +99,36 @@ async function relayDb(msg) {
}
}

async function mergeBatch(batch) {
try {
console.log(`mergeBatch: merging ${batch.length} DIDs...`);
const { verified, updated, failed } = await gatekeeper.mergeBatch(batch);
console.log(`* ${verified} verified, ${updated} updated, ${failed} failed`);
}
catch (error) {
console.error(`mergeBatch error: ${error}`);
}
}

async function mergeDb(db) {
merging = true;
if (db.hyperswarm) {
if (db) {
// Import DIDs by creation time order to avoid dependency errors
let dids = Object.keys(db.hyperswarm);
dids.sort((a, b) => db.hyperswarm[a][0].time - db.hyperswarm[b][0].time);
let dids = Object.keys(db);
dids.sort((a, b) => db[a][0].time - db[b][0].time);

let batch = [];
for (const did of dids) {
console.log(`Adding to batch: ${did} ${db.hyperswarm[did][0].time}`);
batch.push(db.hyperswarm[did]);
//console.log(`Adding to batch: ${did} ${db.hyperswarm[did][0].time}`);
batch.push(db[did]);

if (batch.length >= 100) {
try {
const imported = await gatekeeper.mergeBatch(batch);
if (imported > 0) {
console.log(`* imported ${imported} DIDs *`);
}
else {
console.log(`* DID synchronization confirmed *`);
}
}
catch (error) {
console.error(`error importing DID: ${did}: ${error}`);
}

await mergeBatch(batch);
batch = [];
}
}

try {
const imported = await gatekeeper.mergeBatch(batch);
if (imported > 0) {
console.log(`* imported ${imported} DIDs *`);
}
else {
console.log(`* DID synchronization confirmed *`);
}
}
catch (error) {
console.error(`error importing DID: ${did}: ${error}`);
}
await mergeBatch(batch);
}
merging = false;
}
Expand All @@ -145,11 +142,21 @@ let queue = asyncLib.queue(async function (task, callback) {

if (!seen) {
messagesSeen[hash] = true;

const db = msg.data;

if (isEmpty(db)) {
return;
}

// const dbName = `${hash}.json`
// fs.writeFileSync(dbName, JSON.stringify(db, null, 4));

msg.relays.push(name);
logMsg(msg.relays[0], hash);
relayDb(msg);
console.log(`* merging db ${hash} *`);
await mergeDb(msg.data);
console.log(`* merging db ${shortName(hash)} *`);
await mergeDb(db);
}
else {
console.log(`received old db: ${shortName(hash)} from: ${shortName(name)}`);
Expand All @@ -173,32 +180,6 @@ function logMsg(name, hash) {
console.log(`--- ${conns.length} nodes connected, ${detected} nodes detected`);
}

setInterval(async () => {
try {
const version = gatekeeper.getVersion();

if (version) {
shareDb();
}
}
catch (error) {
console.error(`Error: ${error}`);
}
}, 10000);

// Join a common topic
const hash = sha256(protocol);
const networkID = Buffer.from(hash).toString('hex');
const topic = b4a.from(networkID, 'hex');
const discovery = swarm.join(topic, { client: true, server: true });

// The flushed promise will resolve when the topic has been fully announced to the DHT
discovery.flushed().then(() => {
console.log(`connecting to gatekeeper at ${config.gatekeeperURL}`);
console.log(`hyperswarm peer id: ${peerName}`);
console.log('joined topic:', b4a.toString(topic, 'hex'));
});

process.on('uncaughtException', (error) => {
//console.error('Unhandled exception caught');
console.error('Unhandled exception caught', error);
Expand All @@ -214,3 +195,39 @@ process.stdin.on('data', d => {
process.exit();
}
});

// Join a common topic
const hash = sha256(protocol);
const networkID = Buffer.from(hash).toString('hex');
const topic = b4a.from(networkID, 'hex');

async function start() {
console.log(`hyperswarm peer id: ${peerName}`);
console.log('joined topic:', b4a.toString(topic, 'hex'));

setInterval(async () => {
try {
const version = gatekeeper.getVersion();

if (version) {
shareDb();
}
}
catch (error) {
console.error(`Error: ${error}`);
}
}, 10000);
}

function main() {
console.log(`connecting to gatekeeper at ${config.gatekeeperURL}`);

const discovery = swarm.join(topic, { client: true, server: true });

// The flushed promise will resolve when the topic has been fully announced to the DHT
discovery.flushed().then(() => {
start();
});
}

main();

0 comments on commit 828f971

Please sign in to comment.