diff --git a/CHANGELOG.md b/CHANGELOG.md index cdb88fb..66d7053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.42](https://github.com/DIG-Network/dig-incentive-server/compare/v0.0.1-alpha.41...v0.0.1-alpha.42) (2024-09-30) + + +### Features + +* more parrallel payout processing ([5bee0c2](https://github.com/DIG-Network/dig-incentive-server/commit/5bee0c2a78e13a62ce88e6708199e10f34958da0)) + ### [0.0.1-alpha.41](https://github.com/DIG-Network/dig-incentive-server/compare/v0.0.1-alpha.40...v0.0.1-alpha.41) (2024-09-30) ### [0.0.1-alpha.40](https://github.com/DIG-Network/dig-incentive-server/compare/v0.0.1-alpha.39...v0.0.1-alpha.40) (2024-09-30) diff --git a/package-lock.json b/package-lock.json index 6443dfe..634ebce 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,18 +1,16 @@ { "name": "dig-incentive-server", - "version": "0.0.1-alpha.41", + "version": "0.0.1-alpha.42", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "dig-incentive-server", - "version": "0.0.1-alpha.41", + "version": "0.0.1-alpha.42", "license": "ISC", "dependencies": { - "@dignetwork/datalayer-driver": "^0.1.28", - "@dignetwork/dig-sdk": "^0.0.1-alpha.103", + "@dignetwork/dig-sdk": "^0.0.1-alpha.104", "async-mutex": "^0.5.0", - "chia-server-coin": "^0.0.5", "datalayer-driver": "^0.1.21", "express": "^4.19.2", "lodash": "^4.17.21", @@ -243,9 +241,9 @@ } }, "node_modules/@dignetwork/dig-sdk": { - "version": "0.0.1-alpha.103", - "resolved": "https://registry.npmjs.org/@dignetwork/dig-sdk/-/dig-sdk-0.0.1-alpha.103.tgz", - "integrity": "sha512-nFi+tg3H+T6Vt8dSbqA1blHNmwJZdjYZD4EpxvCRnjw246XYenaFcpZbe7sJhkHGkFyfqVB3tNiaYVS633PXOw==", + "version": "0.0.1-alpha.104", + "resolved": "https://registry.npmjs.org/@dignetwork/dig-sdk/-/dig-sdk-0.0.1-alpha.104.tgz", + "integrity": "sha512-BcHaLmggt2D6DhOjYAZNRfvf61j7mWDgwq+rL9aRHgTx3C/zxCfre5anhm1JY4YLFXMQZYNQnlOCUeized7Hrg==", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", "archiver": "^7.0.1", @@ -1420,41 +1418,6 @@ "resolved": "https://registry.npmjs.org/chia-root-resolver/-/chia-root-resolver-1.0.0.tgz", "integrity": "sha512-oZOYbZxzK0688hU9S2EYWZ8ks5NI9zOlABFDsqBtvZwvYZbXEUYaLob+AqMhxLG3p0Dr0Ry+Rm6VPKOl9fQvAA==" }, - "node_modules/chia-server-coin": { - "version": "0.0.5", - "resolved": "https://registry.npmjs.org/chia-server-coin/-/chia-server-coin-0.0.5.tgz", - "integrity": "sha512-LJBRv1oYVnj4sio94ZRg16Pwhe8Z8JIcneLip1X0LgjkU7Bm0peGaFI3/Evu3DWwcToC3txN7nx0I5k7DxPSMQ==", - "engines": { - "node": ">= 10" - }, - "optionalDependencies": { - "chia-server-coin-android-arm64": "0.0.5", - "chia-server-coin-darwin-arm64": "0.0.5", - "chia-server-coin-darwin-universal": "0.0.5", - "chia-server-coin-darwin-x64": "0.0.5", - "chia-server-coin-linux-arm64-gnu": "0.0.5", - "chia-server-coin-linux-arm64-musl": "0.0.5", - "chia-server-coin-linux-x64-gnu": "0.0.5", - "chia-server-coin-linux-x64-musl": "0.0.5", - "chia-server-coin-win32-arm64-msvc": "0.0.5", - "chia-server-coin-win32-x64-msvc": "0.0.5" - } - }, - "node_modules/chia-server-coin-win32-x64-msvc": { - "version": "0.0.5", - "resolved": "https://registry.npmjs.org/chia-server-coin-win32-x64-msvc/-/chia-server-coin-win32-x64-msvc-0.0.5.tgz", - "integrity": "sha512-DgcCQ039zNHlxeYPg0JDKbzjB4W+C3xopIvh2XG8vQH8V1hGwFo/xqC3ROXT5kcU2PsVmzoYLxLcDUAc+NeqTw==", - "cpu": [ - "x64" - ], - "optional": true, - "os": [ - "win32" - ], - "engines": { - "node": ">= 10" - } - }, "node_modules/chia-wallet": { "version": "1.0.18", "resolved": "https://registry.npmjs.org/chia-wallet/-/chia-wallet-1.0.18.tgz", diff --git a/package.json b/package.json index 5c85bde..5fd27f5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dig-incentive-server", - "version": "0.0.1-alpha.41", + "version": "0.0.1-alpha.42", "description": "", "type": "commonjs", "main": "./dist/index.js", @@ -24,7 +24,7 @@ "LICENSE" ], "dependencies": { - "@dignetwork/dig-sdk": "^0.0.1-alpha.103", + "@dignetwork/dig-sdk": "^0.0.1-alpha.104", "async-mutex": "^0.5.0", "datalayer-driver": "^0.1.21", "express": "^4.19.2", diff --git a/src/tasks/payouts.ts b/src/tasks/payouts.ts index ab4c549..84cee1e 100644 --- a/src/tasks/payouts.ts +++ b/src/tasks/payouts.ts @@ -7,16 +7,64 @@ import { DataStore, getStoresList, Environment, + asyncPool, } from "@dignetwork/dig-sdk"; import { Mutex } from "async-mutex"; import { IncentiveProgram } from "../utils/IncentiveProgram"; import { hexToUtf8 } from "../utils/hexUtils"; +import crypto from "crypto"; + +function md5(input: string) { + return crypto.createHash('md5').update(input).digest('hex'); +} const mutex = new Mutex(); const roundsPerEpoch = 1008; // 1 round every 10 mins starting on the first hour of the epoch const mojosPerXch = BigInt(1000000000000); +/** + * Request queue that holds all the network requests (getKey, headStore) + */ +type RequestTask = () => Promise; +type RequestCallback = (result: any) => void; + +interface RequestQueueItem { + task: RequestTask; + callback: RequestCallback; +} + +const requestQueue: RequestQueueItem[] = []; + +/** + * Adds a request task to the global queue. + * @param task The request task to be added. + * @param callback The callback to be invoked with the result. + */ +const addToRequestQueue = (task: RequestTask, callback: RequestCallback) => { + requestQueue.push({ task, callback }); +}; + +/** + * Continuously processes the request queue in controlled batches. + * This runs in the background, without blocking. + * @param limit The number of requests to process simultaneously. + */ +const processRequestQueue = async (limit: number) => { + while (true) { + if (requestQueue.length > 0) { + const batch = requestQueue.splice(0, limit); + await asyncPool(limit, batch, async (item) => { + const result = await item.task(); + item.callback(result); + }); + } else { + // If no requests are in the queue, delay a bit before checking again + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } +}; + /** * Helper function to add a timeout to a promise. * @param promise The original promise. @@ -46,6 +94,48 @@ const calculateSampleSize = (totalKeys: number): number => { return Math.ceil(totalKeys * 0.05); }; +/** + * Cache to store the expected challenges for each peer for the current round. + */ +const challengeCache: Map = new Map(); + +/** + * Generates and caches the expected challenge response. + * @param storeId The store ID. + * @param hexKey The key. + * @param rootHash The root hash. + * @returns The expected challenge response. + */ +const getExpectedChallengeResponse = async ( + storeId: string, + hexKey: string, + rootHash: string +): Promise => { + const cacheKey = `${storeId}-${hexKey}-${rootHash}`; + + if (challengeCache.has(cacheKey)) { + return challengeCache.get(cacheKey) as string; + } + + const digChallenge = new DigChallenge(storeId, hexKey, rootHash); + const seed = DigChallenge.generateSeed(); + const challenge = await digChallenge.generateChallenge(seed); + const serializedChallenge = DigChallenge.serializeChallenge(challenge); + + const expectedChallengeResponse = await digChallenge.createChallengeResponse( + challenge + ); + + challengeCache.set(cacheKey, expectedChallengeResponse); + + return serializedChallenge; +}; + +/** + * Runs the incentive program. + * @param program The incentive program. + * @param currentEpoch The current epoch. + */ const runIncentiveProgram = async ( program: IncentiveProgram, currentEpoch: number @@ -65,8 +155,7 @@ const runIncentiveProgram = async ( console.log(`Root hash for current epoch: ${rootHash}`); const rewardThisRound = - (BigInt(program.xchRewardPerEpoch) * mojosPerXch) / - BigInt(roundsPerEpoch); + (BigInt(program.xchRewardPerEpoch) * mojosPerXch) / BigInt(roundsPerEpoch); console.log(`Reward for this round: ${rewardThisRound} mojos`); @@ -91,13 +180,13 @@ const runIncentiveProgram = async ( let validPeers: DigPeer[] = []; let payoutMade = false; + // Track requests we care about for this run + const pendingRequests: Promise[] = []; + while (!payoutMade) { console.log("Sampling up to 50 peers from the current epoch..."); - const serverCoins = await serverCoin.sampleCurrentEpoch( - 50, - peerBlackList - ); + const serverCoins = await serverCoin.sampleCurrentEpoch(50, peerBlackList); if (serverCoins.length === 0) { console.log(`No more peers available for storeId ${program.storeId}`); @@ -106,79 +195,75 @@ const runIncentiveProgram = async ( console.log(`Sampled ${serverCoins.length} peers for challenge.`); + // Add network requests (headStore and getKey) to the shared request queue for (const peerIp of serverCoins) { console.log(`Initiating challenge for peer: ${peerIp}`); const digPeer = new DigPeer(peerIp, program.storeId); - try { - // Timeout of 5 seconds for headStore request - const response = await withTimeout( - digPeer.contentServer.headStore(), - 10000, - `headStore timed out for peer ${peerIp}` - ); - console.log(`Peer ${peerIp} responded to headStore request`); - - const peerGenerationHash = response.headers?.["x-generation-hash"]; - if (peerGenerationHash === rootHash) { - console.log(`Peer ${peerIp} has correct generation hash.`); - - const challengePromises = randomKeysHex.map(async (hexKey) => { - try { - const digChallenge = new DigChallenge( - program.storeId, - hexKey, - rootHash + // Track request results in the current run + pendingRequests.push(new Promise((resolve) => { + addToRequestQueue(async () => { + try { + // Add headStore request to the queue + const response = await withTimeout( + digPeer.contentServer.headStore(), + 10000, + `headStore timed out for peer ${peerIp}` + ); + console.log(`Peer ${peerIp} responded to headStore request`); + + const peerGenerationHash = response.headers?.["x-generation-hash"]; + if (peerGenerationHash === rootHash) { + console.log(`Peer ${peerIp} has correct generation hash.`); + + // Use Promise.all so that any failure immediately marks the peer as invalid + await Promise.all( + randomKeysHex.map(async (hexKey) => { + const expectedChallengeResponse = await getExpectedChallengeResponse( + program.storeId, + hexKey, + rootHash + ); + + const peerChallengeResponse = await withTimeout( + digPeer.contentServer.getKey( + hexToUtf8(hexKey), + rootHash, + expectedChallengeResponse + ), + 10000, + `getKey timed out for peer ${peerIp}` + ); + + if (peerChallengeResponse !== expectedChallengeResponse) { + throw new Error(`Challenge response does not match for peer ${peerIp}`); + } + }) ); - const seed = DigChallenge.generateSeed(); - const challenge = await digChallenge.generateChallenge(seed); - const serializedChallenge = - DigChallenge.serializeChallenge(challenge); - - // Timeout of 5 seconds for getKey request - const peerChallengeResponse = await withTimeout( - digPeer.contentServer.getKey( - hexToUtf8(hexKey), - rootHash, - serializedChallenge - ), - 10000, - `getKey timed out for peer ${peerIp}` - ); - - const expectedChallengeResponse = - await digChallenge.createChallengeResponse(challenge); - console.log(`${peerIp} - ${hexToUtf8(hexKey)} - ${peerChallengeResponse} - ${expectedChallengeResponse}`); - - return peerChallengeResponse === expectedChallengeResponse; - } catch (error: any) { - console.error( - `Error during challenge for peer ${peerIp}: ${error.message}` - ); - return false; + validPeers.push(digPeer); + console.log(`Peer ${peerIp} passed all challenges and is valid.`); + resolve(true); + } else { + console.log(`Peer ${peerIp} has an incorrect generation hash.`); + resolve(false); } - }); - - const challengeResults = await Promise.all(challengePromises); - const valid = challengeResults.every((result) => result); - - if (valid) { - validPeers.push(digPeer); - console.log(`Peer ${peerIp} passed all challenges and is valid.`); - } else { - console.log(`Peer ${peerIp} failed one or more challenges.`); + } catch (error: any) { + console.error(`Error with peer ${peerIp}: ${error.message}`); + resolve(false); // Skip this peer and continue to the next } - } else { - console.log(`Peer ${peerIp} has an incorrect generation hash.`); - } - } catch (error: any) { - console.error(`Error with peer ${peerIp}: ${error.message}`); - // Skip this peer and continue to the next - } + }, (result) => { + // The callback processes the result + console.log(`Callback for peer ${peerIp}, result: ${result}`); + }); + })); } - if (validPeers.length > 0) { + // Wait for all pending requests for this program to complete + const results = await Promise.all(pendingRequests); + const validCount = results.filter((result) => result === true).length; + + if (validCount > 0) { const paymentAddresses = Array.from( new Set( ( @@ -193,20 +278,11 @@ const runIncentiveProgram = async ( const { epoch: currentEpoch, round: currentRound } = ServerCoin.getCurrentEpoch(); - const paymentHint = DigPeer.createPaymentHint( - Buffer.from(program.storeId, "hex") - ); const message = Buffer.from( `DIG Network payout: Store Id ${program.storeId}, Epoch ${currentEpoch}, Round ${currentRound}`, "utf-8" ); - console.log( - `Payment hint: ${paymentHint.toString("hex")} - ${message.toString( - "utf-8" - )}` - ); - // For the alpha program we are going to forgo the hint and just use the message so people can see it in their chia wallet - //const memo = [paymentHint, message]; + const memos = [message]; console.log(`Sending equal bulk payments to ${paymentAddresses.length} valid peers...`); @@ -238,7 +314,7 @@ const runIncentiveProgram = async ( }; /** - * Function to run payouts for all stores + * Function to run payouts for all stores concurrently. */ const runPayouts = async (): Promise => { const { epoch: currentEpoch } = ServerCoin.getCurrentEpoch(); @@ -247,7 +323,8 @@ const runPayouts = async (): Promise => { console.log(`Running payouts for epoch: ${currentEpoch}`); console.log(`Store list: ${storeList.join(", ")}`); - for (const storeId of storeList) { + // Use asyncPool to process stores concurrently (with a limit of 5 stores at a time) + await asyncPool(5, storeList, async (storeId) => { console.log(`Starting payouts for storeId: ${storeId}`); const program = await IncentiveProgram.from(storeId); if (program?.active) { @@ -256,10 +333,10 @@ const runPayouts = async (): Promise => { } else { console.log(`Program inactive for storeId: ${storeId}`); } - } + }); }; -// Task that runs at a regular interval to handle payouts +// Task that runs at a regular interval to handle payouts. const task = new Task("payouts", async () => { if (!mutex.isLocked()) { const releaseMutex = await mutex.acquire(); @@ -277,6 +354,7 @@ const task = new Task("payouts", async () => { } }); +// Schedule the payouts job to run every 10 minutes const job = new SimpleIntervalJob( { minutes: 10, @@ -286,4 +364,7 @@ const job = new SimpleIntervalJob( { id: "payouts", preventOverrun: true } ); +// Start processing the global request queue +processRequestQueue(10); // Fire and forget + export default job;