Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve(relayer): Parallel processing for destination chain fills #1346

Merged
merged 7 commits into from
Mar 25, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 158 additions & 128 deletions src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assert from "assert";
import { utils as sdkUtils } from "@across-protocol/sdk-v2";
import { utils as ethersUtils } from "ethers";
import { L1Token, V3Deposit, V3DepositWithBlock } from "../interfaces";
Expand Down Expand Up @@ -36,141 +37,154 @@ export class Relayer {
}

/**
* @description Retrieve the complete array of unfilled deposits and filter out deposits we can't or choose
* not to support.
* @returns An array of filtered RelayerUnfilledDeposit objects.
* @description For a given deposit, apply relayer-specific filtering to determine whether it should be filled.
* @param deposit Deposit object.
* @param version Version identified for this deposit.
* @param invalidFills An array of any invalid fills detected for this deposit.
* @returns A boolean indicator determining whether the relayer configuration permits the deposit to be filled.
*/
private async _getUnfilledDeposits(): Promise<RelayerUnfilledDeposit[]> {
const { configStoreClient, hubPoolClient, spokePoolClients, acrossApiClient } = this.clients;
const { relayerTokens, ignoredAddresses, acceptInvalidFills } = this.config;

// Flatten unfilledDeposits for now. @todo: Process deposits in parallel by destination chain.
const unfilledDeposits = Object.values(
await getUnfilledDeposits(spokePoolClients, hubPoolClient, this.config.maxRelayerLookBack)
).flat();

const maxVersion = configStoreClient.configStoreVersion;
return sdkUtils.filterAsync(unfilledDeposits, async ({ deposit, version, invalidFills }) => {
const { depositId, depositor, recipient, originChainId, destinationChainId, inputToken, outputToken } = deposit;
const destinationChain = getNetworkName(destinationChainId);

// If we don't have the latest code to support this deposit, skip it.
if (version > maxVersion) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit that is not supported by this relayer version.",
latestVersionSupported: maxVersion,
latestInConfigStore: configStoreClient.getConfigStoreVersionForTimestamp(),
deposit,
});
return false;
}
filterDeposit({ deposit, version, invalidFills }: RelayerUnfilledDeposit): boolean {
const { depositId, originChainId, destinationChainId, depositor, recipient, inputToken, outputToken } = deposit;
const { acrossApiClient, configStoreClient, hubPoolClient } = this.clients;
const { ignoredAddresses, relayerTokens, acceptInvalidFills } = this.config;

if (!this.routeEnabled(originChainId, destinationChainId)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit from or to disabled chains.",
deposit,
enabledOriginChains: this.config.relayerOriginChains,
enabledDestinationChains: this.config.relayerDestinationChains,
});
return false;
}
// If we don't have the latest code to support this deposit, skip it.
if (version > configStoreClient.configStoreVersion) {
pxrl marked this conversation as resolved.
Show resolved Hide resolved
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit that is not supported by this relayer version.",
latestVersionSupported: configStoreClient.configStoreVersion,
latestInConfigStore: configStoreClient.getConfigStoreVersionForTimestamp(),
deposit,
});
return false;
}

// Skip deposits with quoteTimestamp in the future (impossible to know HubPool utilization => LP fee cannot be computed).
if (deposit.quoteTimestamp > hubPoolClient.currentTime) {
return false;
}
if (!this.routeEnabled(originChainId, destinationChainId)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit from or to disabled chains.",
deposit,
enabledOriginChains: this.config.relayerOriginChains,
enabledDestinationChains: this.config.relayerDestinationChains,
});
return false;
}

if (ignoredAddresses?.includes(getAddress(depositor)) || ignoredAddresses?.includes(getAddress(recipient))) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Ignoring deposit",
depositor,
recipient,
});
return false;
}
// Skip deposits with quoteTimestamp in the future (impossible to know HubPool utilization => LP fee cannot be computed).
if (deposit.quoteTimestamp > hubPoolClient.currentTime) {
return false;
}

// Skip any L1 tokens that are not specified in the config.
// If relayerTokens is an empty list, we'll assume that all tokens are supported.
const l1Token = hubPoolClient.getL1TokenInfoForL2Token(inputToken, originChainId);
if (relayerTokens.length > 0 && !relayerTokens.includes(l1Token.address)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit for unwhitelisted token",
deposit,
l1Token,
});
return false;
}
if (ignoredAddresses?.includes(getAddress(depositor)) || ignoredAddresses?.includes(getAddress(recipient))) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Ignoring deposit",
depositor,
recipient,
});
return false;
}

// It would be preferable to use host time since it's more reliably up-to-date, but this creates issues in test.
const currentTime = this.clients.spokePoolClients[destinationChainId].getCurrentTime();
if (deposit.fillDeadline <= currentTime) {
return false;
}
// Skip any L1 tokens that are not specified in the config.
// If relayerTokens is an empty list, we'll assume that all tokens are supported.
const l1Token = hubPoolClient.getL1TokenInfoForL2Token(inputToken, originChainId);
james-a-morris marked this conversation as resolved.
Show resolved Hide resolved
if (relayerTokens.length > 0 && !relayerTokens.includes(l1Token.address)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit for unwhitelisted token",
deposit,
l1Token,
});
return false;
}

if (deposit.exclusivityDeadline > currentTime && getAddress(deposit.exclusiveRelayer) !== this.relayerAddress) {
return false;
}
// It would be preferable to use host time since it's more reliably up-to-date, but this creates issues in test.
const currentTime = this.clients.spokePoolClients[destinationChainId].getCurrentTime();
if (deposit.fillDeadline <= currentTime) {
return false;
}

if (!hubPoolClient.areTokensEquivalent(inputToken, originChainId, outputToken, destinationChainId)) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit including in-protocol token swap.",
deposit,
});
return false;
}
if (deposit.exclusivityDeadline > currentTime && getAddress(deposit.exclusiveRelayer) !== this.relayerAddress) {
return false;
}

// Skip deposit with message if sending fills with messages is not supported.
if (!this.config.sendingMessageRelaysEnabled && !isMessageEmpty(resolveDepositMessage(deposit))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping fill for deposit with message",
depositUpdated: isDepositSpedUp(deposit),
deposit,
});
return false;
}
if (!hubPoolClient.areTokensEquivalent(inputToken, originChainId, outputToken, destinationChainId)) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit including in-protocol token swap.",
deposit,
});
return false;
}

// Skip deposits that contain invalid fills from the same relayer. This prevents potential corrupted data from
// making the same relayer fill a deposit multiple times.
if (!acceptInvalidFills && invalidFills.some((fill) => fill.relayer === this.relayerAddress)) {
this.logger.error({
at: "Relayer::getUnfilledDeposits",
message: "👨‍👧‍👦 Skipping deposit with invalid fills from the same relayer",
deposit,
invalidFills,
destinationChain,
});
return false;
}
// Skip deposit with message if sending fills with messages is not supported.
if (!this.config.sendingMessageRelaysEnabled && !isMessageEmpty(resolveDepositMessage(deposit))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping fill for deposit with message",
depositUpdated: isDepositSpedUp(deposit),
deposit,
});
return false;
}

// We query the relayer API to get the deposit limits for different token and destination combinations.
// The relayer should *not* be filling deposits that the HubPool doesn't have liquidity for otherwise the relayer's
// refund will be stuck for potentially 7 days. Note: Filter for supported tokens first, since the relayer only
// queries for limits on supported tokens.
const { inputAmount } = deposit;
if (acrossApiClient.updatedLimits && inputAmount.gt(acrossApiClient.getLimit(l1Token.address))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "😱 Skipping deposit with greater unfilled amount than API suggested limit",
limit: acrossApiClient.getLimit(l1Token.address),
l1Token: l1Token.address,
depositId,
inputToken,
inputAmount,
originChainId,
transactionHash: deposit.transactionHash,
});
return false;
}
// Skip deposits that contain invalid fills from the same relayer. This prevents potential corrupted data from
// making the same relayer fill a deposit multiple times.
if (!acceptInvalidFills && invalidFills.some((fill) => fill.relayer === this.relayerAddress)) {
this.logger.error({
at: "Relayer::getUnfilledDeposits",
message: "👨‍👧‍👦 Skipping deposit with invalid fills from the same relayer",
deposit,
invalidFills,
destinationChainId,
});
return false;
}

// The deposit passed all checks, so we can include it in the list of unfilled deposits.
return true;
// We query the relayer API to get the deposit limits for different token and destination combinations.
// The relayer should *not* be filling deposits that the HubPool doesn't have liquidity for otherwise the relayer's
// refund will be stuck for potentially 7 days. Note: Filter for supported tokens first, since the relayer only
// queries for limits on supported tokens.
const { inputAmount } = deposit;
if (acrossApiClient.updatedLimits && inputAmount.gt(acrossApiClient.getLimit(l1Token.address))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "😱 Skipping deposit with greater unfilled amount than API suggested limit",
limit: acrossApiClient.getLimit(l1Token.address),
l1Token: l1Token.address,
depositId,
inputToken,
inputAmount,
originChainId,
transactionHash: deposit.transactionHash,
});
return false;
}

// The deposit passed all checks, so we can include it in the list of unfilled deposits.
return true;
}

/**
* @description Retrieve the complete array of unfilled deposits and filter out deposits we can't or choose
* not to support.
* @returns An array of filtered RelayerUnfilledDeposit objects.
*/
private async _getUnfilledDeposits(): Promise<Record<number, RelayerUnfilledDeposit[]>> {
const { hubPoolClient, spokePoolClients } = this.clients;

const unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient, this.config.maxRelayerLookBack);

// Filter the resulting unfilled deposits according to relayer configuration.
Object.keys(unfilledDeposits).forEach((_destinationChainId) => {
const destinationChainId = Number(_destinationChainId);
unfilledDeposits[destinationChainId] = unfilledDeposits[destinationChainId].filter((deposit) =>
this.filterDeposit(deposit)
);
});

return unfilledDeposits;
}

/**
Expand Down Expand Up @@ -319,7 +333,9 @@ export class Relayer {
// Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation
// per chain, which is based on the deposit volume we could fill.
const unfilledDeposits = await this._getUnfilledDeposits();
const allUnfilledDeposits = unfilledDeposits.map(({ deposit }) => deposit);
const allUnfilledDeposits = Object.values(unfilledDeposits)
.flat()
.map(({ deposit }) => deposit);
this.logger.debug({
at: "Relayer#checkForUnfilledDepositsAndFill",
message: `${allUnfilledDeposits.length} unfilled deposits found.`,
Expand All @@ -329,11 +345,25 @@ export class Relayer {
}

const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits);
for (const deposit of allUnfilledDeposits) {
const { originChainId } = deposit;
const maxBlockNumber = spokePoolClients[originChainId].latestBlockSearched - mdcPerChain[originChainId];
await this.evaluateFill(deposit, maxBlockNumber, sendSlowRelays);
}
const maxBlockNumbers = Object.fromEntries(
Object.values(spokePoolClients).map(({ chainId, latestBlockSearched }) => [
chainId,
latestBlockSearched - mdcPerChain[chainId],
])
);

await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => {
if (unfilledDeposits.length === 0) {
return;
}

const _destinationChainId = unfilledDeposits[0].deposit.destinationChainId;
for (const { deposit } of unfilledDeposits) {
const { originChainId, destinationChainId } = deposit;
assert(destinationChainId === _destinationChainId);
await this.evaluateFill(deposit, maxBlockNumbers[originChainId], sendSlowRelays);
}
});

// If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs.
if (tokenClient.anyCapturedShortFallFills()) {
Expand Down
Loading