Skip to content

Commit

Permalink
improve(relayer): Execute destination transactions async (#1355)
Browse files Browse the repository at this point in the history
This change updates the relayer to submit transactions as soon as they
are ready for each destination chain, rather than waiting until all
destination transaction queues are ready. This should reduce the overall
execution time for the relayer when it has transactions to submit on
multiple chains, and should improve relayer performance on chains with a
high chainId.
  • Loading branch information
pxrl authored Apr 3, 2024
1 parent a0ce674 commit 2c1d2ae
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 71 deletions.
13 changes: 9 additions & 4 deletions src/clients/MultiCallerClient.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { utils as sdkUtils } from "@across-protocol/sdk-v2";
import { BigNumber } from "ethers";
import { DEFAULT_MULTICALL_CHUNK_SIZE, DEFAULT_CHAIN_MULTICALL_CHUNK_SIZE, Multicall2Call } from "../common";
import {
Expand Down Expand Up @@ -120,13 +121,17 @@ export class MultiCallerClient {
}

// For each chain, collate the enqueued transactions and process them in parallel.
async executeTxnQueues(simulate = false): Promise<Record<number, string[]>> {
const chainIds = [...new Set(Object.keys(this.valueTxns).concat(Object.keys(this.txns)))];
async executeTxnQueues(simulate = false, chainIds: number[] = []): Promise<Record<number, string[]>> {
if (chainIds.length === 0) {
chainIds = sdkUtils.dedupArray([
...Object.keys(this.valueTxns).map(Number),
...Object.keys(this.txns).map(Number),
]);
}

// One promise per chain for parallel execution.
const resultsByChain = await Promise.allSettled(
chainIds.map((_chainId) => {
const chainId = Number(_chainId);
chainIds.map((chainId) => {
const txns: AugmentedTransaction[] | undefined = this.txns[chainId];
const valueTxns: AugmentedTransaction[] | undefined = this.valueTxns[chainId];

Expand Down
22 changes: 19 additions & 3 deletions src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,25 +345,32 @@ export class Relayer {
}
}

async checkForUnfilledDepositsAndFill(sendSlowRelays = true): Promise<void> {
async checkForUnfilledDepositsAndFill(
sendSlowRelays = true,
simulate = false
): Promise<{ [chainId: number]: string[] }> {
// Fetch all unfilled deposits, order by total earnable fee.
const { profitClient, spokePoolClients, tokenClient, multiCallerClient } = this.clients;

// Flush any pre-existing enqueued transactions that might not have been executed.
multiCallerClient.clearTransactionQueue();
const txnReceipts: { [chainId: number]: string[] } = Object.fromEntries(
Object.values(spokePoolClients).map(({ chainId }) => [chainId, []])
);

// Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation
// per chain, which is based on the deposit volume we could fill.
const unfilledDeposits = await this._getUnfilledDeposits();
const allUnfilledDeposits = Object.values(unfilledDeposits)
.flat()
.map(({ deposit }) => deposit);

this.logger.debug({
at: "Relayer#checkForUnfilledDepositsAndFill",
message: `${allUnfilledDeposits.length} unfilled deposits found.`,
});
if (allUnfilledDeposits.length === 0) {
return;
return txnReceipts;
}

const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits);
Expand All @@ -374,15 +381,22 @@ export class Relayer {
])
);

await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => {
await sdkUtils.forEachAsync(Object.entries(unfilledDeposits), async ([chainId, unfilledDeposits]) => {
if (unfilledDeposits.length === 0) {
return;
}

await this.evaluateFills(
unfilledDeposits.map(({ deposit }) => deposit),
maxBlockNumbers,
sendSlowRelays
);

const destinationChainId = Number(chainId);
if (multiCallerClient.getQueuedTransactions(destinationChainId).length > 0) {
const receipts = await multiCallerClient.executeTxnQueues(simulate, [destinationChainId]);
txnReceipts[destinationChainId] = receipts[destinationChainId];
}
});

// If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs.
Expand All @@ -392,6 +406,8 @@ export class Relayer {
if (profitClient.anyCapturedUnprofitableFills()) {
this.handleUnprofitableFill();
}

return txnReceipts;
}

requestSlowFill(deposit: V3Deposit): void {
Expand Down
4 changes: 2 additions & 2 deletions src/relayer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
await updateRelayerClients(relayerClients, config);

if (!config.skipRelays) {
await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled);
await relayerClients.multiCallerClient.executeTransactionQueue(!config.sendingRelaysEnabled);
const simulate = !config.sendingRelaysEnabled;
await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled, simulate);
}

// Unwrap WETH after filling deposits so we don't mess up slow fill logic, but before rebalancing
Expand Down
116 changes: 64 additions & 52 deletions test/Relayer.BasicFill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
getLastBlockTime,
getRelayDataHash,
lastSpyLogIncludes,
spyLogIncludes,
randomAddress,
setupTokensForWallet,
sinon,
Expand Down Expand Up @@ -152,6 +153,7 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
{
relayerTokens: [],
minDepositConfirmations: defaultMinDepositConfirmations,
sendingRelaysEnabled: true,
} as unknown as RelayerConfig
);

Expand Down Expand Up @@ -196,12 +198,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

const tx = await multiCallerClient.executeTransactionQueue();
expect(tx.length).to.equal(1); // There should have been exactly one transaction.
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
let fill = spokePoolClient_2.getFillsForOriginChain(deposit.originChainId).at(-1);
Expand All @@ -214,24 +213,23 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {

// Re-run the execution loop and validate that no additional relays are sent.
multiCallerClient.clearTransactionQueue();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send.
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
});

it("Shouldn't double fill a deposit", async function () {
await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.
await multiCallerClient.executeTxnQueues();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

// The first fill is still pending but if we rerun the relayer loop, it shouldn't try to fill a second time.
await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no new transactions were enqueued.
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
});

it("Queries the latest onchain fill status for all deposits", async function () {
Expand All @@ -248,9 +246,11 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
let unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient);
expect(Object.values(unfilledDeposits).flat().length).to.equal(1);

await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.
// Run the relayer in simulation mode so it doesn't fill the relay.
const simulate = true;
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false, simulate);
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spyLogIncludes(spy, -2, "Filled v3 deposit")).is.true;

// Verify that the deposit is still unfilled (relayer didn't execute it).
unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient);
Expand All @@ -262,9 +262,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
expect(Object.values(unfilledDeposits).flat().length).to.equal(0);

// Verify that the relayer now sees that the deposit has been filled.
await relayerInstance.checkForUnfilledDepositsAndFill();
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(0);
});

it("Respects configured relayer routes", async function () {
Expand Down Expand Up @@ -301,15 +301,16 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Deposit on originChainId, destined for destinationChainId => expect ignored.
await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);
await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(
spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping deposit from or to disabled chains"))
).to.not.be.undefined;
});

it("Correctly validates self-relays", async function () {
outputAmount = inputAmount.sub(bnOne);
for (const testDepositor of [depositor, relayer]) {
outputAmount = inputAmount.add(bnOne);
for (const testDepositor of [relayer, depositor]) {
await depositV3(
spokePool_1,
destinationChainId,
Expand All @@ -321,9 +322,13 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const expectedTransactions = testDepositor.address === relayer.address ? 1 : 0;
expect(multiCallerClient.transactionCount()).to.equal(expectedTransactions);
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false);
const selfRelay = testDepositor.address === relayer.address;
const [expectedLog, expectedReceipts] = selfRelay
? ["Filled v3 deposit", 1]
: ["Not relaying unprofitable deposit", 0];
expect(txnReceipts[destinationChainId].length).to.equal(expectedReceipts);
expect(lastSpyLogIncludes(spy, expectedLog)).to.be.true;
}
});

Expand All @@ -336,7 +341,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
fillDeadline,
});
await spokePool_2.setCurrentTime(fillDeadline);
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(multiCallerClient.transactionCount()).to.equal(0);
});

Expand All @@ -360,15 +366,21 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
}

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(1);
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(0);
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;

await spokePool_2.setCurrentTime(exclusivityDeadline + 1);
await updateAllClients();

// Relayer can unconditionally fill after the exclusivityDeadline.
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(2);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
});

it("Ignores deposits older than min deposit confirmation threshold", async function () {
Expand Down Expand Up @@ -399,7 +411,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "due to insufficient deposit confirmations")).to.be.true;
});

Expand All @@ -417,13 +430,15 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Override hub pool client timestamp to make deposit look like its in the future
await updateAllClients();
hubPoolClient.currentTime = quoteTimestamp - 1;
await relayerInstance.checkForUnfilledDepositsAndFill();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;

// If we reset the timestamp, the relayer will fill the deposit:
hubPoolClient.currentTime = quoteTimestamp;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(1);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
});

it("Ignores deposit with non-empty message", async function () {
Expand All @@ -446,14 +461,14 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));

// Dynamic fill simulation fails in test, so the deposit will
// appear as unprofitable when message filling is enabled.
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")))
.to.not.be.undefined;
expect(profitClient.anyCapturedUnprofitableFills()).to.equal(sendingMessageRelaysEnabled);
expect(multiCallerClient.transactionCount()).to.equal(0);
}
});

Expand All @@ -462,10 +477,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {

await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);
await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();

const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Ignoring deposit"))).to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);
});

it("Uses lowest outputAmount on updated deposits", async function () {
Expand Down Expand Up @@ -501,19 +516,16 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
if (update.ignored) {
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(
spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message"))
).to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);
} else {
// Now speed up deposit again with a higher fee and a message of 0x. This should be filled.
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

const tx = await multiCallerClient.executeTransactionQueue();
expect(tx.length).to.equal(1); // There should have been exactly one transaction.
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

await spokePoolClient_2.update();
let fill = spokePoolClient_2.getFillsForRelayer(relayer.address).at(-1);
Expand All @@ -538,8 +550,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Re-run the execution loop and validate that no additional relays are sent.
multiCallerClient.clearTransactionQueue();
await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send.
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
});

Expand All @@ -566,10 +578,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")))
.to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);

// Deposit is updated again with a nullified message.
updatedOutputAmount = updatedOutputAmount.sub(bnOne);
Expand All @@ -581,9 +593,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
});
});
});
Loading

0 comments on commit 2c1d2ae

Please sign in to comment.