From 2c1d2aed73346fc20403adb8ff036117b3ddba00 Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Wed, 3 Apr 2024 22:15:16 +1100 Subject: [PATCH] improve(relayer): Execute destination transactions async (#1355) This change updates the relayer to submit transactions as soon as they are ready for each destination chain, rather than waiting until all destination transaction queues are ready. This should reduce the overall execution time for the relayer when it has transactions to submit on multiple chains, and should improve relayer performance on chains with a high chainId. --- src/clients/MultiCallerClient.ts | 13 ++-- src/relayer/Relayer.ts | 22 +++++- src/relayer/index.ts | 4 +- test/Relayer.BasicFill.ts | 116 +++++++++++++++++-------------- test/Relayer.SlowFill.ts | 7 +- test/Relayer.TokenShortfall.ts | 6 +- 6 files changed, 97 insertions(+), 71 deletions(-) diff --git a/src/clients/MultiCallerClient.ts b/src/clients/MultiCallerClient.ts index f62628f70..342692f11 100644 --- a/src/clients/MultiCallerClient.ts +++ b/src/clients/MultiCallerClient.ts @@ -1,3 +1,4 @@ +import { utils as sdkUtils } from "@across-protocol/sdk-v2"; import { BigNumber } from "ethers"; import { DEFAULT_MULTICALL_CHUNK_SIZE, DEFAULT_CHAIN_MULTICALL_CHUNK_SIZE, Multicall2Call } from "../common"; import { @@ -120,13 +121,17 @@ export class MultiCallerClient { } // For each chain, collate the enqueued transactions and process them in parallel. - async executeTxnQueues(simulate = false): Promise> { - const chainIds = [...new Set(Object.keys(this.valueTxns).concat(Object.keys(this.txns)))]; + async executeTxnQueues(simulate = false, chainIds: number[] = []): Promise> { + if (chainIds.length === 0) { + chainIds = sdkUtils.dedupArray([ + ...Object.keys(this.valueTxns).map(Number), + ...Object.keys(this.txns).map(Number), + ]); + } // One promise per chain for parallel execution. const resultsByChain = await Promise.allSettled( - chainIds.map((_chainId) => { - const chainId = Number(_chainId); + chainIds.map((chainId) => { const txns: AugmentedTransaction[] | undefined = this.txns[chainId]; const valueTxns: AugmentedTransaction[] | undefined = this.valueTxns[chainId]; diff --git a/src/relayer/Relayer.ts b/src/relayer/Relayer.ts index 5dab289dd..5203fa4b4 100644 --- a/src/relayer/Relayer.ts +++ b/src/relayer/Relayer.ts @@ -345,12 +345,18 @@ export class Relayer { } } - async checkForUnfilledDepositsAndFill(sendSlowRelays = true): Promise { + async checkForUnfilledDepositsAndFill( + sendSlowRelays = true, + simulate = false + ): Promise<{ [chainId: number]: string[] }> { // Fetch all unfilled deposits, order by total earnable fee. const { profitClient, spokePoolClients, tokenClient, multiCallerClient } = this.clients; // Flush any pre-existing enqueued transactions that might not have been executed. multiCallerClient.clearTransactionQueue(); + const txnReceipts: { [chainId: number]: string[] } = Object.fromEntries( + Object.values(spokePoolClients).map(({ chainId }) => [chainId, []]) + ); // Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation // per chain, which is based on the deposit volume we could fill. @@ -358,12 +364,13 @@ export class Relayer { const allUnfilledDeposits = Object.values(unfilledDeposits) .flat() .map(({ deposit }) => deposit); + this.logger.debug({ at: "Relayer#checkForUnfilledDepositsAndFill", message: `${allUnfilledDeposits.length} unfilled deposits found.`, }); if (allUnfilledDeposits.length === 0) { - return; + return txnReceipts; } const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits); @@ -374,15 +381,22 @@ export class Relayer { ]) ); - await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => { + await sdkUtils.forEachAsync(Object.entries(unfilledDeposits), async ([chainId, unfilledDeposits]) => { if (unfilledDeposits.length === 0) { return; } + await this.evaluateFills( unfilledDeposits.map(({ deposit }) => deposit), maxBlockNumbers, sendSlowRelays ); + + const destinationChainId = Number(chainId); + if (multiCallerClient.getQueuedTransactions(destinationChainId).length > 0) { + const receipts = await multiCallerClient.executeTxnQueues(simulate, [destinationChainId]); + txnReceipts[destinationChainId] = receipts[destinationChainId]; + } }); // If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs. @@ -392,6 +406,8 @@ export class Relayer { if (profitClient.anyCapturedUnprofitableFills()) { this.handleUnprofitableFill(); } + + return txnReceipts; } requestSlowFill(deposit: V3Deposit): void { diff --git a/src/relayer/index.ts b/src/relayer/index.ts index 402085cc2..72d130e2f 100644 --- a/src/relayer/index.ts +++ b/src/relayer/index.ts @@ -34,8 +34,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P await updateRelayerClients(relayerClients, config); if (!config.skipRelays) { - await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled); - await relayerClients.multiCallerClient.executeTransactionQueue(!config.sendingRelaysEnabled); + const simulate = !config.sendingRelaysEnabled; + await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled, simulate); } // Unwrap WETH after filling deposits so we don't mess up slow fill logic, but before rebalancing diff --git a/test/Relayer.BasicFill.ts b/test/Relayer.BasicFill.ts index 4d71dfc46..1fd60930c 100644 --- a/test/Relayer.BasicFill.ts +++ b/test/Relayer.BasicFill.ts @@ -31,6 +31,7 @@ import { getLastBlockTime, getRelayDataHash, lastSpyLogIncludes, + spyLogIncludes, randomAddress, setupTokensForWallet, sinon, @@ -152,6 +153,7 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { { relayerTokens: [], minDepositConfirmations: defaultMinDepositConfirmations, + sendingRelaysEnabled: true, } as unknown as RelayerConfig ); @@ -196,12 +198,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true; - expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit. - - const tx = await multiCallerClient.executeTransactionQueue(); - expect(tx.length).to.equal(1); // There should have been exactly one transaction. + let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]); let fill = spokePoolClient_2.getFillsForOriginChain(deposit.originChainId).at(-1); @@ -214,8 +213,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { // Re-run the execution loop and validate that no additional relays are sent. multiCallerClient.clearTransactionQueue(); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send. + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true; }); @@ -223,15 +222,14 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true; - expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit. - await multiCallerClient.executeTxnQueues(); + let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; // The first fill is still pending but if we rerun the relayer loop, it shouldn't try to fill a second time. await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(0); // no new transactions were enqueued. + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); }); it("Queries the latest onchain fill status for all deposits", async function () { @@ -248,9 +246,11 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { let unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient); expect(Object.values(unfilledDeposits).flat().length).to.equal(1); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true; - expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit. + // Run the relayer in simulation mode so it doesn't fill the relay. + const simulate = true; + let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false, simulate); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); + expect(spyLogIncludes(spy, -2, "Filled v3 deposit")).is.true; // Verify that the deposit is still unfilled (relayer didn't execute it). unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient); @@ -262,9 +262,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { expect(Object.values(unfilledDeposits).flat().length).to.equal(0); // Verify that the relayer now sees that the deposit has been filled. - await relayerInstance.checkForUnfilledDepositsAndFill(); + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true; - expect(multiCallerClient.transactionCount()).to.equal(0); }); it("Respects configured relayer routes", async function () { @@ -301,15 +301,16 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { // Deposit on originChainId, destined for destinationChainId => expect ignored. await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect( spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping deposit from or to disabled chains")) ).to.not.be.undefined; }); it("Correctly validates self-relays", async function () { - outputAmount = inputAmount.sub(bnOne); - for (const testDepositor of [depositor, relayer]) { + outputAmount = inputAmount.add(bnOne); + for (const testDepositor of [relayer, depositor]) { await depositV3( spokePool_1, destinationChainId, @@ -321,9 +322,13 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); - const expectedTransactions = testDepositor.address === relayer.address ? 1 : 0; - expect(multiCallerClient.transactionCount()).to.equal(expectedTransactions); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false); + const selfRelay = testDepositor.address === relayer.address; + const [expectedLog, expectedReceipts] = selfRelay + ? ["Filled v3 deposit", 1] + : ["Not relaying unprofitable deposit", 0]; + expect(txnReceipts[destinationChainId].length).to.equal(expectedReceipts); + expect(lastSpyLogIncludes(spy, expectedLog)).to.be.true; } }); @@ -336,7 +341,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { fillDeadline, }); await spokePool_2.setCurrentTime(fillDeadline); - await relayerInstance.checkForUnfilledDepositsAndFill(); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(multiCallerClient.transactionCount()).to.equal(0); }); @@ -360,15 +366,21 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { } await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(1); + let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; + + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(0); + expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true; await spokePool_2.setCurrentTime(exclusivityDeadline + 1); await updateAllClients(); // Relayer can unconditionally fill after the exclusivityDeadline. - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(2); + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; }); it("Ignores deposits older than min deposit confirmation threshold", async function () { @@ -399,7 +411,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(lastSpyLogIncludes(spy, "due to insufficient deposit confirmations")).to.be.true; }); @@ -417,13 +430,15 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { // Override hub pool client timestamp to make deposit look like its in the future await updateAllClients(); hubPoolClient.currentTime = quoteTimestamp - 1; - await relayerInstance.checkForUnfilledDepositsAndFill(); + let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true; // If we reset the timestamp, the relayer will fill the deposit: hubPoolClient.currentTime = quoteTimestamp; - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(1); + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; }); it("Ignores deposit with non-empty message", async function () { @@ -446,14 +461,14 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); // Dynamic fill simulation fails in test, so the deposit will // appear as unprofitable when message filling is enabled. expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message"))) .to.not.be.undefined; expect(profitClient.anyCapturedUnprofitableFills()).to.equal(sendingMessageRelaysEnabled); - expect(multiCallerClient.transactionCount()).to.equal(0); } }); @@ -462,10 +477,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Ignoring deposit"))).to.not.be.undefined; - expect(multiCallerClient.transactionCount()).to.equal(0); }); it("Uses lowest outputAmount on updated deposits", async function () { @@ -501,19 +516,16 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); if (update.ignored) { + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect( spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")) ).to.not.be.undefined; - expect(multiCallerClient.transactionCount()).to.equal(0); } else { // Now speed up deposit again with a higher fee and a message of 0x. This should be filled. - expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true; - expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit. - - const tx = await multiCallerClient.executeTransactionQueue(); - expect(tx.length).to.equal(1); // There should have been exactly one transaction. + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; await spokePoolClient_2.update(); let fill = spokePoolClient_2.getFillsForRelayer(relayer.address).at(-1); @@ -538,8 +550,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { // Re-run the execution loop and validate that no additional relays are sent. multiCallerClient.clearTransactionQueue(); await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send. + const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true; }); @@ -566,10 +578,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); + let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0)); expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message"))) .to.not.be.undefined; - expect(multiCallerClient.transactionCount()).to.equal(0); // Deposit is updated again with a nullified message. updatedOutputAmount = updatedOutputAmount.sub(bnOne); @@ -581,9 +593,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () { ); await updateAllClients(); - await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true; - expect(multiCallerClient.transactionCount()).to.equal(1); + txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(); + expect(txnReceipts[destinationChainId].length).to.equal(1); + expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true; }); }); }); diff --git a/test/Relayer.SlowFill.ts b/test/Relayer.SlowFill.ts index 9e8b5f32c..07c741d21 100644 --- a/test/Relayer.SlowFill.ts +++ b/test/Relayer.SlowFill.ts @@ -193,13 +193,9 @@ describe("Relayer: Initiates slow fill requests", async function () { await updateAllClients(); await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(1); // Should be requestV3SlowFill() - expect(spyLogIncludes(spy, -2, "Enqueuing slow fill request.")).to.be.true; + expect(spyLogIncludes(spy, -2, "Requested slow fill for deposit.")).to.be.true; expect(lastSpyLogIncludes(spy, "Insufficient balance to fill all deposits")).to.be.true; - const tx = await multiCallerClient.executeTransactionQueue(); - expect(tx.length).to.equal(1); - // Verify that the slowFill request was received by the destination SpokePoolClient. await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]); let slowFillRequest = spokePoolClient_2.getSlowFillRequest(deposit); @@ -211,7 +207,6 @@ describe("Relayer: Initiates slow fill requests", async function () { ); await relayerInstance.checkForUnfilledDepositsAndFill(); - expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send. expect(lastSpyLogIncludes(spy, "Insufficient balance to fill all deposits")).to.be.true; }); }); diff --git a/test/Relayer.TokenShortfall.ts b/test/Relayer.TokenShortfall.ts index 783df2209..d75a93b36 100644 --- a/test/Relayer.TokenShortfall.ts +++ b/test/Relayer.TokenShortfall.ts @@ -35,6 +35,7 @@ import { expect, getLastBlockTime, lastSpyLogIncludes, + spyLogIncludes, setupTokensForWallet, sinon, toBN, @@ -207,12 +208,9 @@ describe("Relayer: Token balance shortfall", async function () { await erc20_2.mint(relayer.address, toBN(60).mul(bn10.pow(inputTokenDecimals))); await updateAllClients(); await relayerInstance.checkForUnfilledDepositsAndFill(noSlowRelays); + expect(spyLogIncludes(spy, -2, "Relayed depositId 0")).to.be.true; expect(lastSpyLogIncludes(spy, `${await l1Token.symbol()} cumulative shortfall of 190.00`)).to.be.true; expect(lastSpyLogIncludes(spy, "blocking deposits: 1,2")).to.be.true; - - const tx = await multiCallerClient.executeTransactionQueue(); - expect(lastSpyLogIncludes(spy, "Relayed depositId 0")).to.be.true; - expect(tx.length).to.equal(1); // There should have been exactly one transaction. }); it("Produces expected logs based on insufficient multiple token balance", async function () {