From 57c7e9b2f360e8d38e9c5ad26d57db9888ce3e01 Mon Sep 17 00:00:00 2001 From: David McFadzean Date: Thu, 21 Nov 2024 12:53:24 -0500 Subject: [PATCH] fix: hyperswarm-mediator limit msg size (#428) * Export batches in chunks * Refactored shareDb * Fixed mergeBatch --- .../hyperswarm/src/hyperswarm-mediator.js | 79 ++++++++++++++----- 1 file changed, 59 insertions(+), 20 deletions(-) diff --git a/services/mediators/hyperswarm/src/hyperswarm-mediator.js b/services/mediators/hyperswarm/src/hyperswarm-mediator.js index 50e75862..3c90963d 100644 --- a/services/mediators/hyperswarm/src/hyperswarm-mediator.js +++ b/services/mediators/hyperswarm/src/hyperswarm-mediator.js @@ -84,16 +84,6 @@ function shortName(name) { return name.slice(0, 4) + '-' + name.slice(-4); } -async function createBatch() { - console.time('exportBatch'); - const allEvents = await gatekeeper.exportBatch(); - console.timeEnd('exportBatch'); - console.log(`${allEvents.length} events fetched`); - - // hyperswarm distributes only operations - return allEvents.map(event => event.operation); -} - function logBatch(batch, name) { const debugFolder = 'data/debug'; @@ -111,19 +101,65 @@ function logBatch(batch, name) { fs.writeFileSync(batchfile, batchJSON); } +function sendBatch(conn, batch) { + const limit = 8 * 1024 * 1014; // 8 MB limit + + const msg = { + type: 'batch', + data: batch, + relays: [], + node: config.nodeName, + }; + + const json = JSON.stringify(msg); + + if (json.length < limit) { + conn.write(json); + console.log(` * sent ${batch.length} ops in ${json.length} bytes`); + return batch.length; + } + else { + if (batch.length < 2) { + console.error(`Error: Single operation exceeds the limit of ${limit} bytes. Unable to send.`); + return 0; + } + + // split batch into 2 halves + const midIndex = Math.floor(batch.length / 2); + const batch1 = batch.slice(0, midIndex); + const batch2 = batch.slice(midIndex); + + return sendBatch(conn, batch1) + sendBatch(conn, batch2); + } +} + async function shareDb(conn) { try { - const batch = await createBatch(); + console.time('exportBatch in chunks'); + const batchSize = 1000; // export DIDs in batches of 1000 for scalability + const dids = await gatekeeper.getDIDs(); + let allEvents = []; + + for (let i = 0; i < dids.length; i += batchSize) { + const didBatch = dids.slice(i, i + batchSize); + const exports = await gatekeeper.exportBatch(didBatch); + allEvents = allEvents.concat(exports); + } + console.timeEnd('exportBatch in chunks'); - const msg = { - type: 'batch', - data: batch, - relays: [], - node: config.nodeName, - }; + // hyperswarm distributes only operations + const batch = allEvents.map(event => event.operation); + console.log(`${batch.length} operations fetched`); - const json = JSON.stringify(msg); - conn.write(json); + if (!batch || batch.length === 0) { + return; + } + + const opsCount = batch.length; + console.time('sendBatch'); + const opsSent = sendBatch(conn, batch); + console.timeEnd('sendBatch'); + console.log(` * sent ${opsSent}/${opsCount} operations`); } catch (error) { console.log(error); @@ -205,7 +241,10 @@ async function mergeBatch(batch) { } } - await importBatch(chunk); + if (chunk.length > 0) { + await importBatch(chunk); + } + console.time('processEvents'); const response = await gatekeeper.processEvents(); console.timeEnd('processEvents');