Skip to content

Commit

Permalink
Revert "improve(relayer): Execute destination transactions async (#1355
Browse files Browse the repository at this point in the history
…)"

This reverts commit 2c1d2ae.
  • Loading branch information
mrice32 committed Apr 4, 2024
1 parent b4e008c commit da9eb0f
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 97 deletions.
13 changes: 4 additions & 9 deletions src/clients/MultiCallerClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { utils as sdkUtils } from "@across-protocol/sdk-v2";
import { BigNumber } from "ethers";
import { DEFAULT_MULTICALL_CHUNK_SIZE, DEFAULT_CHAIN_MULTICALL_CHUNK_SIZE, Multicall2Call } from "../common";
import {
Expand Down Expand Up @@ -121,17 +120,13 @@ export class MultiCallerClient {
}

// For each chain, collate the enqueued transactions and process them in parallel.
async executeTxnQueues(simulate = false, chainIds: number[] = []): Promise<Record<number, string[]>> {
if (chainIds.length === 0) {
chainIds = sdkUtils.dedupArray([
...Object.keys(this.valueTxns).map(Number),
...Object.keys(this.txns).map(Number),
]);
}
async executeTxnQueues(simulate = false): Promise<Record<number, string[]>> {
const chainIds = [...new Set(Object.keys(this.valueTxns).concat(Object.keys(this.txns)))];

// One promise per chain for parallel execution.
const resultsByChain = await Promise.allSettled(
chainIds.map((chainId) => {
chainIds.map((_chainId) => {
const chainId = Number(_chainId);
const txns: AugmentedTransaction[] | undefined = this.txns[chainId];
const valueTxns: AugmentedTransaction[] | undefined = this.valueTxns[chainId];

Expand Down
22 changes: 3 additions & 19 deletions src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,32 +348,25 @@ export class Relayer {
}
}

async checkForUnfilledDepositsAndFill(
sendSlowRelays = true,
simulate = false
): Promise<{ [chainId: number]: string[] }> {
async checkForUnfilledDepositsAndFill(sendSlowRelays = true): Promise<void> {
// Fetch all unfilled deposits, order by total earnable fee.
const { profitClient, spokePoolClients, tokenClient, multiCallerClient } = this.clients;

// Flush any pre-existing enqueued transactions that might not have been executed.
multiCallerClient.clearTransactionQueue();
const txnReceipts: { [chainId: number]: string[] } = Object.fromEntries(
Object.values(spokePoolClients).map(({ chainId }) => [chainId, []])
);

// Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation
// per chain, which is based on the deposit volume we could fill.
const unfilledDeposits = await this._getUnfilledDeposits();
const allUnfilledDeposits = Object.values(unfilledDeposits)
.flat()
.map(({ deposit }) => deposit);

this.logger.debug({
at: "Relayer::checkForUnfilledDepositsAndFill",
message: `${allUnfilledDeposits.length} unfilled deposits found.`,
});
if (allUnfilledDeposits.length === 0) {
return txnReceipts;
return;
}

const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits);
Expand All @@ -384,22 +377,15 @@ export class Relayer {
])
);

await sdkUtils.forEachAsync(Object.entries(unfilledDeposits), async ([chainId, unfilledDeposits]) => {
await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => {
if (unfilledDeposits.length === 0) {
return;
}

await this.evaluateFills(
unfilledDeposits.map(({ deposit }) => deposit),
maxBlockNumbers,
sendSlowRelays
);

const destinationChainId = Number(chainId);
if (multiCallerClient.getQueuedTransactions(destinationChainId).length > 0) {
const receipts = await multiCallerClient.executeTxnQueues(simulate, [destinationChainId]);
txnReceipts[destinationChainId] = receipts[destinationChainId];
}
});

// If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs.
Expand All @@ -409,8 +395,6 @@ export class Relayer {
if (profitClient.anyCapturedUnprofitableFills()) {
this.handleUnprofitableFill();
}

return txnReceipts;
}

requestSlowFill(deposit: V3Deposit): void {
Expand Down
4 changes: 2 additions & 2 deletions src/relayer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export async function runRelayer(_logger: winston.Logger, baseSigner: Signer): P
await updateRelayerClients(relayerClients, config);

if (!config.skipRelays) {
const simulate = !config.sendingRelaysEnabled;
await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled, simulate);
await relayer.checkForUnfilledDepositsAndFill(config.sendingSlowRelaysEnabled);
await relayerClients.multiCallerClient.executeTransactionQueue(!config.sendingRelaysEnabled);
}

// Unwrap WETH after filling deposits so we don't mess up slow fill logic, but before rebalancing
Expand Down
116 changes: 52 additions & 64 deletions test/Relayer.BasicFill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import {
getLastBlockTime,
getRelayDataHash,
lastSpyLogIncludes,
spyLogIncludes,
randomAddress,
setupTokensForWallet,
sinon,
Expand Down Expand Up @@ -153,7 +152,6 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
{
relayerTokens: [],
minDepositConfirmations: defaultMinDepositConfirmations,
sendingRelaysEnabled: true,
} as unknown as RelayerConfig
);

Expand Down Expand Up @@ -198,9 +196,12 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

const tx = await multiCallerClient.executeTransactionQueue();
expect(tx.length).to.equal(1); // There should have been exactly one transaction.

await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
let fill = spokePoolClient_2.getFillsForOriginChain(deposit.originChainId).at(-1);
Expand All @@ -213,23 +214,24 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {

// Re-run the execution loop and validate that no additional relays are sent.
multiCallerClient.clearTransactionQueue();
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send.
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
});

it("Shouldn't double fill a deposit", async function () {
await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);

await updateAllClients();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.
await multiCallerClient.executeTxnQueues();

// The first fill is still pending but if we rerun the relayer loop, it shouldn't try to fill a second time.
await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no new transactions were enqueued.
});

it("Queries the latest onchain fill status for all deposits", async function () {
Expand All @@ -246,11 +248,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
let unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient);
expect(Object.values(unfilledDeposits).flat().length).to.equal(1);

// Run the relayer in simulation mode so it doesn't fill the relay.
const simulate = true;
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false, simulate);
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spyLogIncludes(spy, -2, "Filled v3 deposit")).is.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

// Verify that the deposit is still unfilled (relayer didn't execute it).
unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient);
Expand All @@ -262,9 +262,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
expect(Object.values(unfilledDeposits).flat().length).to.equal(0);

// Verify that the relayer now sees that the deposit has been filled.
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(0);
});

it("Respects configured relayer routes", async function () {
Expand Down Expand Up @@ -301,16 +301,15 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Deposit on originChainId, destined for destinationChainId => expect ignored.
await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);
await updateAllClients();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(
spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping deposit from or to disabled chains"))
).to.not.be.undefined;
});

it("Correctly validates self-relays", async function () {
outputAmount = inputAmount.add(bnOne);
for (const testDepositor of [relayer, depositor]) {
outputAmount = inputAmount.sub(bnOne);
for (const testDepositor of [depositor, relayer]) {
await depositV3(
spokePool_1,
destinationChainId,
Expand All @@ -322,13 +321,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill(false);
const selfRelay = testDepositor.address === relayer.address;
const [expectedLog, expectedReceipts] = selfRelay
? ["Filled v3 deposit", 1]
: ["Not relaying unprofitable deposit", 0];
expect(txnReceipts[destinationChainId].length).to.equal(expectedReceipts);
expect(lastSpyLogIncludes(spy, expectedLog)).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
const expectedTransactions = testDepositor.address === relayer.address ? 1 : 0;
expect(multiCallerClient.transactionCount()).to.equal(expectedTransactions);
}
});

Expand All @@ -341,8 +336,7 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
fillDeadline,
});
await spokePool_2.setCurrentTime(fillDeadline);
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0);
});

Expand All @@ -366,21 +360,15 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
}

await updateAllClients();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;

txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(0);
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(1);

await spokePool_2.setCurrentTime(exclusivityDeadline + 1);
await updateAllClients();

// Relayer can unconditionally fill after the exclusivityDeadline.
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(2);
});

it("Ignores deposits older than min deposit confirmation threshold", async function () {
Expand Down Expand Up @@ -411,8 +399,7 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "due to insufficient deposit confirmations")).to.be.true;
});

Expand All @@ -430,15 +417,13 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Override hub pool client timestamp to make deposit look like its in the future
await updateAllClients();
hubPoolClient.currentTime = quoteTimestamp - 1;
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;

// If we reset the timestamp, the relayer will fill the deposit:
hubPoolClient.currentTime = quoteTimestamp;
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(1);
});

it("Ignores deposit with non-empty message", async function () {
Expand All @@ -461,14 +446,14 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();

// Dynamic fill simulation fails in test, so the deposit will
// appear as unprofitable when message filling is enabled.
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")))
.to.not.be.undefined;
expect(profitClient.anyCapturedUnprofitableFills()).to.equal(sendingMessageRelaysEnabled);
expect(multiCallerClient.transactionCount()).to.equal(0);
}
});

Expand All @@ -477,10 +462,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {

await depositV3(spokePool_1, destinationChainId, depositor, inputToken, inputAmount, outputToken, outputAmount);
await updateAllClients();
await relayerInstance.checkForUnfilledDepositsAndFill();

const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Ignoring deposit"))).to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);
});

it("Uses lowest outputAmount on updated deposits", async function () {
Expand Down Expand Up @@ -516,16 +501,19 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
await relayerInstance.checkForUnfilledDepositsAndFill();
if (update.ignored) {
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
expect(
spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message"))
).to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);
} else {
// Now speed up deposit again with a higher fee and a message of 0x. This should be filled.
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1); // One transaction, filling the one deposit.

const tx = await multiCallerClient.executeTransactionQueue();
expect(tx.length).to.equal(1); // There should have been exactly one transaction.

await spokePoolClient_2.update();
let fill = spokePoolClient_2.getFillsForRelayer(relayer.address).at(-1);
Expand All @@ -550,8 +538,8 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
// Re-run the execution loop and validate that no additional relays are sent.
multiCallerClient.clearTransactionQueue();
await Promise.all([spokePoolClient_1.update(), spokePoolClient_2.update(), hubPoolClient.update()]);
const txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(multiCallerClient.transactionCount()).to.equal(0); // no Transactions to send.
expect(lastSpyLogIncludes(spy, "0 unfilled deposits")).to.be.true;
});

Expand All @@ -578,10 +566,10 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
let txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
Object.values(txnReceipts).forEach((receipts) => expect(receipts.length).to.equal(0));
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(spy.getCalls().find(({ lastArg }) => lastArg.message.includes("Skipping fill for deposit with message")))
.to.not.be.undefined;
expect(multiCallerClient.transactionCount()).to.equal(0);

// Deposit is updated again with a nullified message.
updatedOutputAmount = updatedOutputAmount.sub(bnOne);
Expand All @@ -593,9 +581,9 @@ describe("Relayer: Check for Unfilled Deposits and Fill", async function () {
);

await updateAllClients();
txnReceipts = await relayerInstance.checkForUnfilledDepositsAndFill();
expect(txnReceipts[destinationChainId].length).to.equal(1);
expect(lastSpyLogIncludes(spy, "Filled v3 deposit")).to.be.true;
await relayerInstance.checkForUnfilledDepositsAndFill();
expect(lastSpyLogIncludes(spy, "Filling v3 deposit")).to.be.true;
expect(multiCallerClient.transactionCount()).to.equal(1);
});
});
});
Loading

0 comments on commit da9eb0f

Please sign in to comment.