Skip to content

Commit

Permalink
improve(relayer): Parallel processing for destination chain fills (#1346
Browse files Browse the repository at this point in the history
)

Relayer fills are currently processed as a flat array. When messages are
involved this requires simulation of the fill on the destination chain,
thus incurring delay. In these cases, take the opportunity to process
other fills whilst awaiting the outcome of the simulation. This is also
another step towards containing RPC misbehaviour to the affected chain
and preventing contagion to other destinations.
  • Loading branch information
pxrl authored Mar 25, 2024
1 parent 3213842 commit 80fe329
Showing 1 changed file with 173 additions and 128 deletions.
301 changes: 173 additions & 128 deletions src/relayer/Relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,141 +36,154 @@ export class Relayer {
}

/**
* @description Retrieve the complete array of unfilled deposits and filter out deposits we can't or choose
* not to support.
* @returns An array of filtered RelayerUnfilledDeposit objects.
* @description For a given deposit, apply relayer-specific filtering to determine whether it should be filled.
* @param deposit Deposit object.
* @param version Version identified for this deposit.
* @param invalidFills An array of any invalid fills detected for this deposit.
* @returns A boolean indicator determining whether the relayer configuration permits the deposit to be filled.
*/
private async _getUnfilledDeposits(): Promise<RelayerUnfilledDeposit[]> {
const { configStoreClient, hubPoolClient, spokePoolClients, acrossApiClient } = this.clients;
const { relayerTokens, ignoredAddresses, acceptInvalidFills } = this.config;

// Flatten unfilledDeposits for now. @todo: Process deposits in parallel by destination chain.
const unfilledDeposits = Object.values(
await getUnfilledDeposits(spokePoolClients, hubPoolClient, this.config.maxRelayerLookBack)
).flat();

const maxVersion = configStoreClient.configStoreVersion;
return sdkUtils.filterAsync(unfilledDeposits, async ({ deposit, version, invalidFills }) => {
const { depositId, depositor, recipient, originChainId, destinationChainId, inputToken, outputToken } = deposit;
const destinationChain = getNetworkName(destinationChainId);

// If we don't have the latest code to support this deposit, skip it.
if (version > maxVersion) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit that is not supported by this relayer version.",
latestVersionSupported: maxVersion,
latestInConfigStore: configStoreClient.getConfigStoreVersionForTimestamp(),
deposit,
});
return false;
}
filterDeposit({ deposit, version: depositVersion, invalidFills }: RelayerUnfilledDeposit): boolean {
const { depositId, originChainId, destinationChainId, depositor, recipient, inputToken, outputToken } = deposit;
const { acrossApiClient, configStoreClient, hubPoolClient } = this.clients;
const { ignoredAddresses, relayerTokens, acceptInvalidFills } = this.config;

if (!this.routeEnabled(originChainId, destinationChainId)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit from or to disabled chains.",
deposit,
enabledOriginChains: this.config.relayerOriginChains,
enabledDestinationChains: this.config.relayerDestinationChains,
});
return false;
}
// If we don't have the latest code to support this deposit, skip it.
if (depositVersion > configStoreClient.configStoreVersion) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit that is not supported by this relayer version.",
latestVersionSupported: configStoreClient.configStoreVersion,
latestInConfigStore: configStoreClient.getConfigStoreVersionForTimestamp(),
deposit,
});
return false;
}

// Skip deposits with quoteTimestamp in the future (impossible to know HubPool utilization => LP fee cannot be computed).
if (deposit.quoteTimestamp > hubPoolClient.currentTime) {
return false;
}
if (!this.routeEnabled(originChainId, destinationChainId)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit from or to disabled chains.",
deposit,
enabledOriginChains: this.config.relayerOriginChains,
enabledDestinationChains: this.config.relayerDestinationChains,
});
return false;
}

if (ignoredAddresses?.includes(getAddress(depositor)) || ignoredAddresses?.includes(getAddress(recipient))) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Ignoring deposit",
depositor,
recipient,
});
return false;
}
// Skip deposits with quoteTimestamp in the future (impossible to know HubPool utilization => LP fee cannot be computed).
if (deposit.quoteTimestamp > hubPoolClient.currentTime) {
return false;
}

// Skip any L1 tokens that are not specified in the config.
// If relayerTokens is an empty list, we'll assume that all tokens are supported.
const l1Token = hubPoolClient.getL1TokenInfoForL2Token(inputToken, originChainId);
if (relayerTokens.length > 0 && !relayerTokens.includes(l1Token.address)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit for unwhitelisted token",
deposit,
l1Token,
});
return false;
}
if (ignoredAddresses?.includes(getAddress(depositor)) || ignoredAddresses?.includes(getAddress(recipient))) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Ignoring deposit",
depositor,
recipient,
});
return false;
}

// It would be preferable to use host time since it's more reliably up-to-date, but this creates issues in test.
const currentTime = this.clients.spokePoolClients[destinationChainId].getCurrentTime();
if (deposit.fillDeadline <= currentTime) {
return false;
}
// Skip any L1 tokens that are not specified in the config.
// If relayerTokens is an empty list, we'll assume that all tokens are supported.
const l1Token = hubPoolClient.getL1TokenInfoForL2Token(inputToken, originChainId);
if (relayerTokens.length > 0 && !relayerTokens.includes(l1Token.address)) {
this.logger.debug({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit for unwhitelisted token",
deposit,
l1Token,
});
return false;
}

if (deposit.exclusivityDeadline > currentTime && getAddress(deposit.exclusiveRelayer) !== this.relayerAddress) {
return false;
}
// It would be preferable to use host time since it's more reliably up-to-date, but this creates issues in test.
const currentTime = this.clients.spokePoolClients[destinationChainId].getCurrentTime();
if (deposit.fillDeadline <= currentTime) {
return false;
}

if (!hubPoolClient.areTokensEquivalent(inputToken, originChainId, outputToken, destinationChainId)) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit including in-protocol token swap.",
deposit,
});
return false;
}
if (deposit.exclusivityDeadline > currentTime && getAddress(deposit.exclusiveRelayer) !== this.relayerAddress) {
return false;
}

// Skip deposit with message if sending fills with messages is not supported.
if (!this.config.sendingMessageRelaysEnabled && !isMessageEmpty(resolveDepositMessage(deposit))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping fill for deposit with message",
depositUpdated: isDepositSpedUp(deposit),
deposit,
});
return false;
}
if (!hubPoolClient.areTokensEquivalent(inputToken, originChainId, outputToken, destinationChainId)) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping deposit including in-protocol token swap.",
deposit,
});
return false;
}

// Skip deposits that contain invalid fills from the same relayer. This prevents potential corrupted data from
// making the same relayer fill a deposit multiple times.
if (!acceptInvalidFills && invalidFills.some((fill) => fill.relayer === this.relayerAddress)) {
this.logger.error({
at: "Relayer::getUnfilledDeposits",
message: "👨‍👧‍👦 Skipping deposit with invalid fills from the same relayer",
deposit,
invalidFills,
destinationChain,
});
return false;
}
// Skip deposit with message if sending fills with messages is not supported.
if (!this.config.sendingMessageRelaysEnabled && !isMessageEmpty(resolveDepositMessage(deposit))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "Skipping fill for deposit with message",
depositUpdated: isDepositSpedUp(deposit),
deposit,
});
return false;
}

// We query the relayer API to get the deposit limits for different token and destination combinations.
// The relayer should *not* be filling deposits that the HubPool doesn't have liquidity for otherwise the relayer's
// refund will be stuck for potentially 7 days. Note: Filter for supported tokens first, since the relayer only
// queries for limits on supported tokens.
const { inputAmount } = deposit;
if (acrossApiClient.updatedLimits && inputAmount.gt(acrossApiClient.getLimit(l1Token.address))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "😱 Skipping deposit with greater unfilled amount than API suggested limit",
limit: acrossApiClient.getLimit(l1Token.address),
l1Token: l1Token.address,
depositId,
inputToken,
inputAmount,
originChainId,
transactionHash: deposit.transactionHash,
});
return false;
}
// Skip deposits that contain invalid fills from the same relayer. This prevents potential corrupted data from
// making the same relayer fill a deposit multiple times.
if (!acceptInvalidFills && invalidFills.some((fill) => fill.relayer === this.relayerAddress)) {
this.logger.error({
at: "Relayer::getUnfilledDeposits",
message: "👨‍👧‍👦 Skipping deposit with invalid fills from the same relayer",
deposit,
invalidFills,
destinationChainId,
});
return false;
}

// The deposit passed all checks, so we can include it in the list of unfilled deposits.
return true;
// We query the relayer API to get the deposit limits for different token and destination combinations.
// The relayer should *not* be filling deposits that the HubPool doesn't have liquidity for otherwise the relayer's
// refund will be stuck for potentially 7 days. Note: Filter for supported tokens first, since the relayer only
// queries for limits on supported tokens.
const { inputAmount } = deposit;
if (acrossApiClient.updatedLimits && inputAmount.gt(acrossApiClient.getLimit(l1Token.address))) {
this.logger.warn({
at: "Relayer::getUnfilledDeposits",
message: "😱 Skipping deposit with greater unfilled amount than API suggested limit",
limit: acrossApiClient.getLimit(l1Token.address),
l1Token: l1Token.address,
depositId,
inputToken,
inputAmount,
originChainId,
transactionHash: deposit.transactionHash,
});
return false;
}

// The deposit passed all checks, so we can include it in the list of unfilled deposits.
return true;
}

/**
* @description Retrieve the complete array of unfilled deposits and filter out deposits we can't or choose
* not to support.
* @returns An array of filtered RelayerUnfilledDeposit objects.
*/
private async _getUnfilledDeposits(): Promise<Record<number, RelayerUnfilledDeposit[]>> {
const { hubPoolClient, spokePoolClients } = this.clients;

const unfilledDeposits = await getUnfilledDeposits(spokePoolClients, hubPoolClient, this.config.maxRelayerLookBack);

// Filter the resulting unfilled deposits according to relayer configuration.
Object.keys(unfilledDeposits).forEach((_destinationChainId) => {
const destinationChainId = Number(_destinationChainId);
unfilledDeposits[destinationChainId] = unfilledDeposits[destinationChainId].filter((deposit) =>
this.filterDeposit(deposit)
);
});

return unfilledDeposits;
}

/**
Expand Down Expand Up @@ -309,6 +322,24 @@ export class Relayer {
}
}

/**
* For a given destination chain, evaluate and optionally fill each unfilled deposit. Note that each fill should be
* evaluated sequentially in order to ensure atomic balance updates.
* @param deposits An array of deposits destined for the same destination chain.
* @param maxBlockNumbers A map of the highest block number per origin chain to fill.
* @returns void
*/
async evaluateFills(
deposits: V3DepositWithBlock[],
maxBlockNumbers: { [chainId: number]: number },
sendSlowRelays: boolean
): Promise<void> {
for (let i = 0; i < deposits.length; ++i) {
const deposit = deposits[i];
await this.evaluateFill(deposit, maxBlockNumbers[deposit.originChainId], sendSlowRelays);
}
}

async checkForUnfilledDepositsAndFill(sendSlowRelays = true): Promise<void> {
// Fetch all unfilled deposits, order by total earnable fee.
const { profitClient, spokePoolClients, tokenClient, multiCallerClient } = this.clients;
Expand All @@ -319,7 +350,9 @@ export class Relayer {
// Fetch unfilled deposits and filter out deposits upfront before we compute the minimum deposit confirmation
// per chain, which is based on the deposit volume we could fill.
const unfilledDeposits = await this._getUnfilledDeposits();
const allUnfilledDeposits = unfilledDeposits.map(({ deposit }) => deposit);
const allUnfilledDeposits = Object.values(unfilledDeposits)
.flat()
.map(({ deposit }) => deposit);
this.logger.debug({
at: "Relayer#checkForUnfilledDepositsAndFill",
message: `${allUnfilledDeposits.length} unfilled deposits found.`,
Expand All @@ -329,11 +362,23 @@ export class Relayer {
}

const mdcPerChain = this.computeRequiredDepositConfirmations(allUnfilledDeposits);
for (const deposit of allUnfilledDeposits) {
const { originChainId } = deposit;
const maxBlockNumber = spokePoolClients[originChainId].latestBlockSearched - mdcPerChain[originChainId];
await this.evaluateFill(deposit, maxBlockNumber, sendSlowRelays);
}
const maxBlockNumbers = Object.fromEntries(
Object.values(spokePoolClients).map(({ chainId, latestBlockSearched }) => [
chainId,
latestBlockSearched - mdcPerChain[chainId],
])
);

await sdkUtils.forEachAsync(Object.values(unfilledDeposits), async (unfilledDeposits) => {
if (unfilledDeposits.length === 0) {
return;
}
await this.evaluateFills(
unfilledDeposits.map(({ deposit }) => deposit),
maxBlockNumbers,
sendSlowRelays
);
});

// If during the execution run we had shortfalls or unprofitable fills then handel it by producing associated logs.
if (tokenClient.anyCapturedShortFallFills()) {
Expand Down

0 comments on commit 80fe329

Please sign in to comment.