Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve(relayer): Execute destination transactions async #1355

Merged
merged 11 commits into from
Apr 3, 2024
13 changes: 9 additions & 4 deletions src/clients/MultiCallerClient.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { utils as sdkUtils } from "@across-protocol/sdk-v2";
import { BigNumber } from "ethers";
import { DEFAULT_MULTICALL_CHUNK_SIZE, DEFAULT_CHAIN_MULTICALL_CHUNK_SIZE, Multicall2Call } from "../common";
import {
Expand Down Expand Up @@ -120,13 +121,17 @@ export class MultiCallerClient {
}

// For each chain, collate the enqueued transactions and process them in parallel.
async executeTxnQueues(simulate = false): Promise<Record<number, string[]>> {
const chainIds = [...new Set(Object.keys(this.valueTxns).concat(Object.keys(this.txns)))];
async executeTxnQueues(simulate = false, chainIds: number[] = []): Promise<Record<number, string[]>> {
if (chainIds.length === 0) {
chainIds = sdkUtils.dedupArray([
pxrl marked this conversation as resolved.
Show resolved Hide resolved
...Object.keys(this.valueTxns).map(Number),
...Object.keys(this.txns).map(Number),
]);
}

// One promise per chain for parallel execution.
const resultsByChain = await Promise.allSettled(
chainIds.map((_chainId) => {
const chainId = Number(_chainId);
chainIds.map((chainId) => {
const txns: AugmentedTransaction[] | undefined = this.txns[chainId];
const valueTxns: AugmentedTransaction[] | undefined = this.valueTxns[chainId];

Expand Down
22 changes: 19 additions & 3 deletions src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,25 +345,32 @@ export class Relayer {
}
}

async checkForUnfilledDepositsAndFill(sendSlowRelays = true): Promise<void> {
async checkForUnfilledDepositsAndFill(
sendSlowRelays = true,
simulate = false
): Promise<{ [chainId: number]: string[] }> {
// Fetch all unfilled deposits, order by total earnable fee.
const { profitClient, spokePoolClients, tokenClient, multiCallerClient } = this.clients;

// Flush any pre-existing enqueued transactions that might not have been executed.
multiCallerClient.clearTransactionQueue();
const txnReceipts: { [chainId: number]: string[] } = Object.fromEntries(
Object.values(spokePoolClients).map(({ chainId }) => [chainId, []])
);

// Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation
// per chain, which is based on the deposit volume we could fill.
const unfilledDeposits = await this._getUnfilledDeposits();
const allUnfilledDeposits = Object.values(unfilledDeposits)
.flat()
.map(({ deposit }) => deposit);

this.logger.debug({
at: "Relayer#checkForUnfilledDepositsAndFill",
message: `${allUnfilledDeposits.length} unfilled deposits found.`,
});
if (allUnfilledDeposits.length === 0) {
return;
return txnReceipts;
}

const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits);
Expand All @@ -374,15 +381,22 @@ export class Relayer {
])
);

await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => {
await sdkUtils.forEachAsync(Object.entries(unfilledDeposits), async ([chainId, unfilledDeposits]) => {
if (unfilledDeposits.length === 0) {
return;
}

await this.evaluateFills(
unfilledDeposits.map(({ deposit }) => deposit),
maxBlockNumbers,
sendSlowRelays
);

const destinationChainId = Number(chainId);
if (multiCallerClient.getQueuedTransactions(destinationChainId).length > 0) {
const receipts = await multiCallerClient.executeTxnQueues(simulate, [destinationChainId]);
txnReceipts[destinationChainId] = receipts[destinationChainId];
}
});

// If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs.
Expand All @@ -392,6 +406,8 @@ export class Relayer {
if (profitClient.anyCapturedUnprofitableFills()) {
this.handleUnprofitableFill();
}

return txnReceipts;
}

requestSlowFill(deposit: V3Deposit): void {
Expand Down
4 changes: 2 additions & 2 deletions src/relayer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
await updateRelayerClients(relayerClients, config);

if (!config.skipRelays) {
await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled);
await relayerClients.multiCallerClient.executeTransactionQueue(!config.sendingRelaysEnabled);
const simulate = !config.sendingRelaysEnabled;
await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled, simulate);
}

// Unwrap WETH after filling deposits so we don't mess up slow fill logic, but before rebalancing
Expand Down
116 changes: 64 additions & 52 deletions test/Relayer.BasicFill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
getLastBlockTime,
getRelayDataHash,
lastSpyLogIncludes,
spyLogIncludes,
randomAddress,
setupTokensForWallet,
sinon,
Expand Down Expand Up @@ -152,6 +153,7 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
{
relayerTokens: [],
minDepositConfirmations: defaultMinDepositConfirmations,
sendingRelaysEnabled: true,
} as unknown as RelayerConfig
);

Expand Down Expand Up @@ -196,12 +198,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

const tx = await multiCallerClient.executeTransactionQueue();
expect(tx.length).to.equal(1); // There should have been exactly one transaction.
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
pxrl marked this conversation as resolved.
Show resolved Hide resolved

await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
let fill = spokePoolClient_2.getFillsForOriginChain(deposit.originChainId).at(-1);
Expand All @@ -214,24 +213,23 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {

// Re-run the execution loop and validate that no additional relays are sent.
multiCallerClient.clearTransactionQueue();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send.
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
});

it("Shouldn't double fill a deposit", async function () {
await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.
await multiCallerClient.executeTxnQueues();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

// The first fill is still pending but if we rerun the relayer loop, it shouldn't try to fill a second time.
await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no new transactions were enqueued.
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
});

it("Queries the latest onchain fill status for all deposits", async function () {
Expand All @@ -248,9 +246,11 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
let unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient);
expect(Object.values(unfilledDeposits).flat().length).to.equal(1);

await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.
// Run the relayer in simulation mode so it doesn't fill the relay.
const simulate = true;
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false, simulate);
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spyLogIncludes(spy, -2, "Filled v3 deposit")).is.true;

// Verify that the deposit is still unfilled (relayer didn't execute it).
unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient);
Expand All @@ -262,9 +262,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
expect(Object.values(unfilledDeposits).flat().length).to.equal(0);

// Verify that the relayer now sees that the deposit has been filled.
await relayerInstance.checkForUnfilledDepositsAndFill();
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(0);
});

it("Respects configured relayer routes", async function () {
Expand Down Expand Up @@ -301,15 +301,16 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Deposit on originChainId, destined for destinationChainId => expect ignored.
await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);
await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(
spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping deposit from or to disabled chains"))
).to.not.be.undefined;
});

it("Correctly validates self-relays", async function () {
outputAmount = inputAmount.sub(bnOne);
for (const testDepositor of [depositor, relayer]) {
outputAmount = inputAmount.add(bnOne);
for (const testDepositor of [relayer, depositor]) {
await depositV3(
spokePool_1,
destinationChainId,
Expand All @@ -321,9 +322,13 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const expectedTransactions = testDepositor.address === relayer.address ? 1 : 0;
expect(multiCallerClient.transactionCount()).to.equal(expectedTransactions);
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false);
const selfRelay = testDepositor.address === relayer.address;
const [expectedLog, expectedReceipts] = selfRelay
? ["Filled v3 deposit", 1]
: ["Not relaying unprofitable deposit", 0];
expect(txnReceipts[destinationChainId].length).to.equal(expectedReceipts);
expect(lastSpyLogIncludes(spy, expectedLog)).to.be.true;
}
});

Expand All @@ -336,7 +341,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
fillDeadline,
});
await spokePool_2.setCurrentTime(fillDeadline);
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(multiCallerClient.transactionCount()).to.equal(0);
});

Expand All @@ -360,15 +366,21 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
}

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(1);
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(0);
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;

await spokePool_2.setCurrentTime(exclusivityDeadline + 1);
await updateAllClients();

// Relayer can unconditionally fill after the exclusivityDeadline.
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(2);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
});

it("Ignores deposits older than min deposit confirmation threshold", async function () {
Expand Down Expand Up @@ -399,7 +411,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "due to insufficient deposit confirmations")).to.be.true;
});

Expand All @@ -417,13 +430,15 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Override hub pool client timestamp to make deposit look like its in the future
await updateAllClients();
hubPoolClient.currentTime = quoteTimestamp - 1;
await relayerInstance.checkForUnfilledDepositsAndFill();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;

// If we reset the timestamp, the relayer will fill the deposit:
hubPoolClient.currentTime = quoteTimestamp;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(1);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
});

it("Ignores deposit with non-empty message", async function () {
Expand All @@ -446,14 +461,14 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));

// Dynamic fill simulation fails in test, so the deposit will
// appear as unprofitable when message filling is enabled.
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")))
.to.not.be.undefined;
expect(profitClient.anyCapturedUnprofitableFills()).to.equal(sendingMessageRelaysEnabled);
expect(multiCallerClient.transactionCount()).to.equal(0);
}
});

Expand All @@ -462,10 +477,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {

await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);
await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();

const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Ignoring deposit"))).to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);
});

it("Uses lowest outputAmount on updated deposits", async function () {
Expand Down Expand Up @@ -501,19 +516,16 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
if (update.ignored) {
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(
spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message"))
).to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);
} else {
// Now speed up deposit again with a higher fee and a message of 0x. This should be filled.
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

const tx = await multiCallerClient.executeTransactionQueue();
expect(tx.length).to.equal(1); // There should have been exactly one transaction.
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

await spokePoolClient_2.update();
let fill = spokePoolClient_2.getFillsForRelayer(relayer.address).at(-1);
Expand All @@ -538,8 +550,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Re-run the execution loop and validate that no additional relays are sent.
multiCallerClient.clearTransactionQueue();
await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send.
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
});

Expand All @@ -566,10 +578,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")))
.to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);

// Deposit is updated again with a nullified message.
updatedOutputAmount = updatedOutputAmount.sub(bnOne);
Expand All @@ -581,9 +593,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
});
});
});
Loading
Loading