Skip to content

Commit

Permalink
fix: hyperswarm-mediator limit msg size (#428)
Browse files Browse the repository at this point in the history
* Export batches in chunks

* Refactored shareDb

* Fixed mergeBatch
  • Loading branch information
macterra authored Nov 21, 2024
1 parent fc69af0 commit 57c7e9b
Showing 1 changed file with 59 additions and 20 deletions.
79 changes: 59 additions & 20 deletions services/mediators/hyperswarm/src/hyperswarm-mediator.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,6 @@ function shortName(name) {
return name.slice(0, 4) + '-' + name.slice(-4);
}

async function createBatch() {
console.time('exportBatch');
const allEvents = await gatekeeper.exportBatch();
console.timeEnd('exportBatch');
console.log(`${allEvents.length} events fetched`);

// hyperswarm distributes only operations
return allEvents.map(event => event.operation);
}

function logBatch(batch, name) {
const debugFolder = 'data/debug';

Expand All @@ -111,19 +101,65 @@ function logBatch(batch, name) {
fs.writeFileSync(batchfile, batchJSON);
}

function sendBatch(conn, batch) {
const limit = 8 * 1024 * 1014; // 8 MB limit

const msg = {
type: 'batch',
data: batch,
relays: [],
node: config.nodeName,
};

const json = JSON.stringify(msg);

if (json.length < limit) {
conn.write(json);
console.log(` * sent ${batch.length} ops in ${json.length} bytes`);
return batch.length;
}
else {
if (batch.length < 2) {
console.error(`Error: Single operation exceeds the limit of ${limit} bytes. Unable to send.`);
return 0;
}

// split batch into 2 halves
const midIndex = Math.floor(batch.length / 2);
const batch1 = batch.slice(0, midIndex);
const batch2 = batch.slice(midIndex);

return sendBatch(conn, batch1) + sendBatch(conn, batch2);
}
}

async function shareDb(conn) {
try {
const batch = await createBatch();
console.time('exportBatch in chunks');
const batchSize = 1000; // export DIDs in batches of 1000 for scalability
const dids = await gatekeeper.getDIDs();
let allEvents = [];

for (let i = 0; i < dids.length; i += batchSize) {
const didBatch = dids.slice(i, i + batchSize);
const exports = await gatekeeper.exportBatch(didBatch);
allEvents = allEvents.concat(exports);
}
console.timeEnd('exportBatch in chunks');

const msg = {
type: 'batch',
data: batch,
relays: [],
node: config.nodeName,
};
// hyperswarm distributes only operations
const batch = allEvents.map(event => event.operation);
console.log(`${batch.length} operations fetched`);

const json = JSON.stringify(msg);
conn.write(json);
if (!batch || batch.length === 0) {
return;
}

const opsCount = batch.length;
console.time('sendBatch');
const opsSent = sendBatch(conn, batch);
console.timeEnd('sendBatch');
console.log(` * sent ${opsSent}/${opsCount} operations`);
}
catch (error) {
console.log(error);
Expand Down Expand Up @@ -205,7 +241,10 @@ async function mergeBatch(batch) {
}
}

await importBatch(chunk);
if (chunk.length > 0) {
await importBatch(chunk);
}

console.time('processEvents');
const response = await gatekeeper.processEvents();
console.timeEnd('processEvents');
Expand Down

0 comments on commit 57c7e9b

Please sign in to comment.