From 734546337da3067676db0a0f045c0b65687c48a9 Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Fri, 22 Mar 2024 17:14:21 +0100 Subject: [PATCH 1/4] improve(relayer): Parallel processing for destination chain fills Relayer fills are currently processed as a flat array. When messages are involved this requires simulation of the fill on the destination chain, thus incurring delay. In these cases, take the opportunity to process other fills whilst awaiting the outcome of the simulation. This is also another step towards containing RPC misbehaviour to the affected chain and preventing contagion to other destinations. --- src/relayer/Relayer.ts | 286 +++++++++++++++++++++++------------------ 1 file changed, 158 insertions(+), 128 deletions(-) diff --git a/src/relayer/Relayer.ts b/src/relayer/Relayer.ts index b84981f0d..17b709809 100644 --- a/src/relayer/Relayer.ts +++ b/src/relayer/Relayer.ts @@ -1,3 +1,4 @@ +import assert from "assert"; import { utils as sdkUtils } from "@across-protocol/sdk-v2"; import { utils as ethersUtils } from "ethers"; import { L1Token, V3Deposit, V3DepositWithBlock } from "../interfaces"; @@ -40,141 +41,154 @@ export class Relayer { } /** - * @description Retrieve the complete array of unfilled deposits and filter out deposits we can't or choose - * not to support. - * @returns An array of filtered RelayerUnfilledDeposit objects. + * @description For a given deposit, apply relayer-specific filtering to determine whether it should be filled. + * @param deposit Deposit object. + * @param version Version identified for this deposit. + * @param invalidFills An array of any invalid fills detected for this deposit. + * @returns A boolean indicator determining whether the relayer configuration permits the deposit to be filled. */ - private async _getUnfilledDeposits(): Promise { - const { configStoreClient, hubPoolClient, spokePoolClients, acrossApiClient } = this.clients; - const { relayerTokens, ignoredAddresses, acceptInvalidFills } = this.config; - - // Flatten unfilledDeposits for now. @todo: Process deposits in parallel by destination chain. - const unfilledDeposits = Object.values( - await getUnfilledDeposits(spokePoolClients, hubPoolClient, this.config.maxRelayerLookBack) - ).flat(); - - const maxVersion = configStoreClient.configStoreVersion; - return sdkUtils.filterAsync(unfilledDeposits, async ({ deposit, version, invalidFills }) => { - const { depositId, depositor, recipient, originChainId, destinationChainId, inputToken, outputToken } = deposit; - const destinationChain = getNetworkName(destinationChainId); - - // If we don't have the latest code to support this deposit, skip it. - if (version > maxVersion) { - this.logger.warn({ - at: "Relayer::getUnfilledDeposits", - message: "Skipping deposit that is not supported by this relayer version.", - latestVersionSupported: maxVersion, - latestInConfigStore: configStoreClient.getConfigStoreVersionForTimestamp(), - deposit, - }); - return false; - } + filterDeposit({ deposit, version, invalidFills }: RelayerUnfilledDeposit): boolean { + const { depositId, originChainId, destinationChainId, depositor, recipient, inputToken, outputToken } = deposit; + const { acrossApiClient, configStoreClient, hubPoolClient } = this.clients; + const { ignoredAddresses, relayerTokens, acceptInvalidFills } = this.config; - if (!this.routeEnabled(originChainId, destinationChainId)) { - this.logger.debug({ - at: "Relayer::getUnfilledDeposits", - message: "Skipping deposit from or to disabled chains.", - deposit, - enabledOriginChains: this.config.relayerOriginChains, - enabledDestinationChains: this.config.relayerDestinationChains, - }); - return false; - } + // If we don't have the latest code to support this deposit, skip it. + if (version > configStoreClient.configStoreVersion) { + this.logger.warn({ + at: "Relayer::getUnfilledDeposits", + message: "Skipping deposit that is not supported by this relayer version.", + latestVersionSupported: configStoreClient.configStoreVersion, + latestInConfigStore: configStoreClient.getConfigStoreVersionForTimestamp(), + deposit, + }); + return false; + } - // Skip deposits with quoteTimestamp in the future (impossible to know HubPool utilization => LP fee cannot be computed). - if (deposit.quoteTimestamp > hubPoolClient.currentTime) { - return false; - } + if (!this.routeEnabled(originChainId, destinationChainId)) { + this.logger.debug({ + at: "Relayer::getUnfilledDeposits", + message: "Skipping deposit from or to disabled chains.", + deposit, + enabledOriginChains: this.config.relayerOriginChains, + enabledDestinationChains: this.config.relayerDestinationChains, + }); + return false; + } - if (ignoredAddresses?.includes(getAddress(depositor)) || ignoredAddresses?.includes(getAddress(recipient))) { - this.logger.debug({ - at: "Relayer::getUnfilledDeposits", - message: "Ignoring deposit", - depositor, - recipient, - }); - return false; - } + // Skip deposits with quoteTimestamp in the future (impossible to know HubPool utilization => LP fee cannot be computed). + if (deposit.quoteTimestamp > hubPoolClient.currentTime) { + return false; + } - // Skip any L1 tokens that are not specified in the config. - // If relayerTokens is an empty list, we'll assume that all tokens are supported. - const l1Token = hubPoolClient.getL1TokenInfoForL2Token(inputToken, originChainId); - if (relayerTokens.length > 0 && !relayerTokens.includes(l1Token.address)) { - this.logger.debug({ - at: "Relayer::getUnfilledDeposits", - message: "Skipping deposit for unwhitelisted token", - deposit, - l1Token, - }); - return false; - } + if (ignoredAddresses?.includes(getAddress(depositor)) || ignoredAddresses?.includes(getAddress(recipient))) { + this.logger.debug({ + at: "Relayer::getUnfilledDeposits", + message: "Ignoring deposit", + depositor, + recipient, + }); + return false; + } - // It would be preferable to use host time since it's more reliably up-to-date, but this creates issues in test. - const currentTime = this.clients.spokePoolClients[destinationChainId].getCurrentTime(); - if (deposit.fillDeadline <= currentTime) { - return false; - } + // Skip any L1 tokens that are not specified in the config. + // If relayerTokens is an empty list, we'll assume that all tokens are supported. + const l1Token = hubPoolClient.getL1TokenInfoForL2Token(inputToken, originChainId); + if (relayerTokens.length > 0 && !relayerTokens.includes(l1Token.address)) { + this.logger.debug({ + at: "Relayer::getUnfilledDeposits", + message: "Skipping deposit for unwhitelisted token", + deposit, + l1Token, + }); + return false; + } - if (deposit.exclusivityDeadline > currentTime && getAddress(deposit.exclusiveRelayer) !== this.relayerAddress) { - return false; - } + // It would be preferable to use host time since it's more reliably up-to-date, but this creates issues in test. + const currentTime = this.clients.spokePoolClients[destinationChainId].getCurrentTime(); + if (deposit.fillDeadline <= currentTime) { + return false; + } - if (!hubPoolClient.areTokensEquivalent(inputToken, originChainId, outputToken, destinationChainId)) { - this.logger.warn({ - at: "Relayer::getUnfilledDeposits", - message: "Skipping deposit including in-protocol token swap.", - deposit, - }); - return false; - } + if (deposit.exclusivityDeadline > currentTime && getAddress(deposit.exclusiveRelayer) !== this.relayerAddress) { + return false; + } - // Skip deposit with message if sending fills with messages is not supported. - if (!this.config.sendingMessageRelaysEnabled && !isMessageEmpty(resolveDepositMessage(deposit))) { - this.logger.warn({ - at: "Relayer::getUnfilledDeposits", - message: "Skipping fill for deposit with message", - depositUpdated: isDepositSpedUp(deposit), - deposit, - }); - return false; - } + if (!hubPoolClient.areTokensEquivalent(inputToken, originChainId, outputToken, destinationChainId)) { + this.logger.warn({ + at: "Relayer::getUnfilledDeposits", + message: "Skipping deposit including in-protocol token swap.", + deposit, + }); + return false; + } - // Skip deposits that contain invalid fills from the same relayer. This prevents potential corrupted data from - // making the same relayer fill a deposit multiple times. - if (!acceptInvalidFills && invalidFills.some((fill) => fill.relayer === this.relayerAddress)) { - this.logger.error({ - at: "Relayer::getUnfilledDeposits", - message: "๐Ÿ‘จโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Skipping deposit with invalid fills from the same relayer", - deposit, - invalidFills, - destinationChain, - }); - return false; - } + // Skip deposit with message if sending fills with messages is not supported. + if (!this.config.sendingMessageRelaysEnabled && !isMessageEmpty(resolveDepositMessage(deposit))) { + this.logger.warn({ + at: "Relayer::getUnfilledDeposits", + message: "Skipping fill for deposit with message", + depositUpdated: isDepositSpedUp(deposit), + deposit, + }); + return false; + } - // We query the relayer API to get the deposit limits for different token and destination combinations. - // The relayer should *not* be filling deposits that the HubPool doesn't have liquidity for otherwise the relayer's - // refund will be stuck for potentially 7 days. Note: Filter for supported tokens first, since the relayer only - // queries for limits on supported tokens. - const { inputAmount } = deposit; - if (acrossApiClient.updatedLimits && inputAmount.gt(acrossApiClient.getLimit(l1Token.address))) { - this.logger.warn({ - at: "Relayer::getUnfilledDeposits", - message: "๐Ÿ˜ฑ Skipping deposit with greater unfilled amount than API suggested limit", - limit: acrossApiClient.getLimit(l1Token.address), - l1Token: l1Token.address, - depositId, - inputToken, - inputAmount, - originChainId, - transactionHash: deposit.transactionHash, - }); - return false; - } + // Skip deposits that contain invalid fills from the same relayer. This prevents potential corrupted data from + // making the same relayer fill a deposit multiple times. + if (!acceptInvalidFills && invalidFills.some((fill) => fill.relayer === this.relayerAddress)) { + this.logger.error({ + at: "Relayer::getUnfilledDeposits", + message: "๐Ÿ‘จโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Skipping deposit with invalid fills from the same relayer", + deposit, + invalidFills, + destinationChainId, + }); + return false; + } - // The deposit passed all checks, so we can include it in the list of unfilled deposits. - return true; + // We query the relayer API to get the deposit limits for different token and destination combinations. + // The relayer should *not* be filling deposits that the HubPool doesn't have liquidity for otherwise the relayer's + // refund will be stuck for potentially 7 days. Note: Filter for supported tokens first, since the relayer only + // queries for limits on supported tokens. + const { inputAmount } = deposit; + if (acrossApiClient.updatedLimits && inputAmount.gt(acrossApiClient.getLimit(l1Token.address))) { + this.logger.warn({ + at: "Relayer::getUnfilledDeposits", + message: "๐Ÿ˜ฑ Skipping deposit with greater unfilled amount than API suggested limit", + limit: acrossApiClient.getLimit(l1Token.address), + l1Token: l1Token.address, + depositId, + inputToken, + inputAmount, + originChainId, + transactionHash: deposit.transactionHash, + }); + return false; + } + + // The deposit passed all checks, so we can include it in the list of unfilled deposits. + return true; + } + + /** + * @description Retrieve the complete array of unfilled deposits and filter out deposits we can't or choose + * not to support. + * @returns An array of filtered RelayerUnfilledDeposit objects. + */ + private async _getUnfilledDeposits(): Promise> { + const { hubPoolClient, spokePoolClients } = this.clients; + + const unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient, this.config.maxRelayerLookBack); + + // Filter the resulting unfilled deposits according to relayer configuration. + Object.keys(unfilledDeposits).forEach((_destinationChainId) => { + const destinationChainId = Number(_destinationChainId); + unfilledDeposits[destinationChainId] = unfilledDeposits[destinationChainId].filter((deposit) => + this.filterDeposit(deposit) + ); }); + + return unfilledDeposits; } /** @@ -323,7 +337,9 @@ export class Relayer { // Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation // per chain, which is based on the deposit volume we could fill. const unfilledDeposits = await this._getUnfilledDeposits(); - const allUnfilledDeposits = unfilledDeposits.map(({ deposit }) => deposit); + const allUnfilledDeposits = Object.values(unfilledDeposits) + .flat() + .map(({ deposit }) => deposit); this.logger.debug({ at: "Relayer#checkForUnfilledDepositsAndFill", message: `${allUnfilledDeposits.length} unfilled deposits found.`, @@ -333,11 +349,25 @@ export class Relayer { } const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits); - for (const deposit of allUnfilledDeposits) { - const { originChainId } = deposit; - const maxBlockNumber = spokePoolClients[originChainId].latestBlockSearched - mdcPerChain[originChainId]; - await this.evaluateFill(deposit, maxBlockNumber, sendSlowRelays); - } + const maxBlockNumbers = Object.fromEntries( + Object.values(spokePoolClients).map(({ chainId, latestBlockSearched }) => [ + chainId, + latestBlockSearched - mdcPerChain[chainId], + ]) + ); + + await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => { + if (unfilledDeposits.length === 0) { + return; + } + + const _destinationChainId = unfilledDeposits[0].deposit.destinationChainId; + for (const { deposit } of unfilledDeposits) { + const { originChainId, destinationChainId } = deposit; + assert(destinationChainId === _destinationChainId); + await this.evaluateFill(deposit, maxBlockNumbers[originChainId], sendSlowRelays); + } + }); // If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs. if (tokenClient.anyCapturedShortFallFills()) { From bc364a92d7c179c1d941c9dce3419303b84da44d Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Mon, 25 Mar 2024 20:12:22 +0100 Subject: [PATCH 2/4] Enforce sequential fills per destination chain --- src/relayer/Relayer.ts | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/relayer/Relayer.ts b/src/relayer/Relayer.ts index bd7b81583..92c561552 100644 --- a/src/relayer/Relayer.ts +++ b/src/relayer/Relayer.ts @@ -323,6 +323,24 @@ export class Relayer { } } + /** + * For a given destination chain, evaluate and optionally fill each unfilled deposit. Note that each fill should be + * evaluated sequentially in order to ensure atomic balance updates. + * @param deposits An array of deposits destined for the same destination chain. + * @param maxBlockNumbers A map of the highest block number per origin chain to fill. + * @returns void + */ + async evaluateFills( + deposits: V3DepositWithBlock[], + maxBlockNumbers: { [chainId: number]: number }, + sendSlowRelays: boolean + ): Promise { + for (let i = 0; i < deposits.length; ++i) { + const deposit = deposits[i]; + await this.evaluateFill(deposit, maxBlockNumbers[deposit.originChainId], sendSlowRelays); + } + } + async checkForUnfilledDepositsAndFill(sendSlowRelays = true): Promise { // Fetch all unfilled deposits, order by total earnable fee. const { profitClient, spokePoolClients, tokenClient, multiCallerClient } = this.clients; @@ -356,13 +374,7 @@ export class Relayer { if (unfilledDeposits.length === 0) { return; } - - const _destinationChainId = unfilledDeposits[0].deposit.destinationChainId; - for (const { deposit } of unfilledDeposits) { - const { originChainId, destinationChainId } = deposit; - assert(destinationChainId === _destinationChainId); - await this.evaluateFill(deposit, maxBlockNumbers[originChainId], sendSlowRelays); - } + await this.evaluateFills(unfilledDeposits.map(({ deposit }) => deposit), maxBlockNumbers, sendSlowRelays); }); // If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs. From 82317f1e7103eba0a85477085b46e379002a8d23 Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Mon, 25 Mar 2024 20:33:36 +0100 Subject: [PATCH 3/4] lint --- src/relayer/Relayer.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/relayer/Relayer.ts b/src/relayer/Relayer.ts index 92c561552..bf7051293 100644 --- a/src/relayer/Relayer.ts +++ b/src/relayer/Relayer.ts @@ -1,4 +1,3 @@ -import assert from "assert"; import { utils as sdkUtils } from "@across-protocol/sdk-v2"; import { utils as ethersUtils } from "ethers"; import { L1Token, V3Deposit, V3DepositWithBlock } from "../interfaces"; @@ -374,7 +373,11 @@ export class Relayer { if (unfilledDeposits.length === 0) { return; } - await this.evaluateFills(unfilledDeposits.map(({ deposit }) => deposit), maxBlockNumbers, sendSlowRelays); + await this.evaluateFills( + unfilledDeposits.map(({ deposit }) => deposit), + maxBlockNumbers, + sendSlowRelays + ); }); // If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs. From 451b67f15b4982c0f54afdf42d5525d65e9eeed4 Mon Sep 17 00:00:00 2001 From: Paul <108695806+pxrl@users.noreply.github.com> Date: Mon, 25 Mar 2024 20:40:43 +0100 Subject: [PATCH 4/4] Tweak naming --- src/relayer/Relayer.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/relayer/Relayer.ts b/src/relayer/Relayer.ts index bf7051293..596b4b05c 100644 --- a/src/relayer/Relayer.ts +++ b/src/relayer/Relayer.ts @@ -42,13 +42,13 @@ export class Relayer { * @param invalidFills An array of any invalid fills detected for this deposit. * @returns A boolean indicator determining whether the relayer configuration permits the deposit to be filled. */ - filterDeposit({ deposit, version, invalidFills }: RelayerUnfilledDeposit): boolean { + filterDeposit({ deposit, version: depositVersion, invalidFills }: RelayerUnfilledDeposit): boolean { const { depositId, originChainId, destinationChainId, depositor, recipient, inputToken, outputToken } = deposit; const { acrossApiClient, configStoreClient, hubPoolClient } = this.clients; const { ignoredAddresses, relayerTokens, acceptInvalidFills } = this.config; // If we don't have the latest code to support this deposit, skip it. - if (version > configStoreClient.configStoreVersion) { + if (depositVersion > configStoreClient.configStoreVersion) { this.logger.warn({ at: "Relayer::getUnfilledDeposits", message: "Skipping deposit that is not supported by this relayer version.",