Skip to content

Commit

Permalink
Added msg queue to deal with concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
macterra committed Mar 5, 2024
1 parent 15bedea commit 580d4d9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
46 changes: 22 additions & 24 deletions hyperswarm-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import goodbye from 'graceful-goodbye';
import b4a from 'b4a';
import { sha256 } from '@noble/hashes/sha256';
import fs from 'fs';
import asyncLib from 'async';

import * as gatekeeper from './gatekeeper-sdk.js';
import * as cipher from './cipher.js';

Expand All @@ -12,6 +14,8 @@ EventEmitter.defaultMaxListeners = 100;
const protocol = '/MDIP/v22.03.01';

const swarm = new Hyperswarm();
const peerName = b4a.toString(swarm.keyPair.publicKey, 'hex');

goodbye(() => swarm.destroy())

const nodes = {};
Expand All @@ -21,12 +25,16 @@ const messagesSeen = {};
const conns = [];
swarm.on('connection', conn => {
const name = b4a.toString(conn.remotePublicKey, 'hex');
console.log('* got a connection from:', name, '*');
console.log('* got a connection from:', shortName(name), '*');
conns.push(conn);
conn.once('close', () => conns.splice(conns.indexOf(conn), 1));
conn.on('data', data => receiveMsg(name, data));
});

function shortName(name) {
return name.slice(0, 4) + '-' + name.slice(-4);
}

function loadDb() {
const dbName = 'data/mdip.json';

Expand Down Expand Up @@ -61,37 +69,22 @@ async function shareDb() {
async function relayDb(msg) {
const json = JSON.stringify(msg);

console.log(`* publishing db: ${msg.hash} *`);
console.log(`publishing my db: ${shortName(msg.hash)} from: ${shortName(peerName)}`);

for (const conn of conns) {
const name = b4a.toString(conn.remotePublicKey, 'hex');

if (!msg.relays.includes(name)) {
conn.write(json);
console.log(`* relaying to: ${name} *`);
console.log(`* relaying to: ${shortName(name)} *`);
}
else {
console.log(`* skipping relay to: ${name} *`);
console.log(`* skipping relay to: ${shortName(name)} *`);
}
}
}

async function mergeDb(db) {
// if (db.anchors) {
// for (const did of Object.keys(db.anchors)) {
// try {
// const imported = await gatekeeper.createDID(db.anchors[did]);
// if (imported === 1) {
// console.log(JSON.stringify(db.anchors[did], null, 4));
// console.log(`* imported anchor ${did} *`);
// }
// }
// catch (error) {
// console.error(`error importing anchor: ${did}: ${error}`);
// }
// }
// }

if (db.hyperswarm) {
// Import DIDs by creation time order to avoid dependency errors
let dids = Object.keys(db.hyperswarm);
Expand All @@ -105,7 +98,6 @@ async function mergeDb(db) {
try {
const imported = await gatekeeper.importDID(db.hyperswarm[did]);
if (imported > 0) {
//console.log(JSON.stringify(db.hyperswarm[did], null, 4));
console.log(`* imported DID ${did} *`);
}
}
Expand All @@ -116,7 +108,8 @@ async function mergeDb(db) {
}
}

async function receiveMsg(name, json) {
let queue = asyncLib.queue(async function (task, callback) {
const { name, json } = task;
try {
const msg = JSON.parse(json);
const hash = cipher.hashJSON(msg.data);
Expand All @@ -131,20 +124,24 @@ async function receiveMsg(name, json) {
await mergeDb(msg.data);
}
else {
console.log(`* received already seen ${hash} *`);
console.log(`received old db: ${shortName(hash)} from: ${shortName(name)}`);
}
}
catch (error) {
console.log('receiveMsg error:', error);
}
callback();
}, 1); // concurrency is 1

async function receiveMsg(name, json) {
queue.push({ name, json });
}

function logMsg(name, hash) {
nodes[name] = (nodes[name] || 0) + 1;
const detected = Object.keys(nodes).length;

console.log(`from: ${name}`);
console.log(`received: ${hash}`);
console.log(`received new db: ${shortName(hash)} from: ${shortName(name)}`);
console.log(`--- ${conns.length} nodes connected, ${detected} nodes detected`);
}

Expand All @@ -166,6 +163,7 @@ const discovery = swarm.join(topic, { client: true, server: true });
// The flushed promise will resolve when the topic has been fully announced to the DHT
discovery.flushed().then(() => {
console.log(`connecting to gatekeeper at ${gatekeeper.URL}`);
console.log(`hyperswarm peer id: ${peerName}`);
console.log('joined topic:', b4a.toString(topic, 'hex'));
});

Expand Down
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"@noble/ciphers": "^0.4.1",
"@noble/hashes": "^1.3.3",
"@noble/secp256k1": "^2.0.0",
"async": "^3.2.5",
"axios": "^1.6.2",
"b4a": "^1.6.6",
"bip39": "^3.1.0",
Expand Down

0 comments on commit 580d4d9

Please sign in to comment.