Skip to content

Commit

Permalink
Merge branch 'master' into pxrl/fixExternalInventory
Browse files Browse the repository at this point in the history
  • Loading branch information
pxrl authored Mar 25, 2024
2 parents c335447 + c63c1c1 commit 0b63f87
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 29 deletions.
24 changes: 16 additions & 8 deletions src/clients/BundleDataClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import {
LoadDataReturnValue,
} from "../interfaces/BundleData";

type DataCache = Record<string, LoadDataReturnValue>;
type DataCache = Record<string, Promise<LoadDataReturnValue>>;

// V3 dictionary helper functions
function updateExpiredDepositsV3(dict: ExpiredDepositsToRefundV3, deposit: V3DepositWithBlock): void {
Expand Down Expand Up @@ -158,10 +158,10 @@ export class BundleDataClient {
this.loadDataCache = {};
}

loadDataFromCache(key: string): LoadDataReturnValue {
async loadDataFromCache(key: string): Promise<LoadDataReturnValue> {
// Always return a deep cloned copy of object stored in cache. Since JS passes by reference instead of value, we
// want to minimize the risk that the programmer accidentally mutates data in the cache.
return _.cloneDeep(this.loadDataCache[key]);
return _.cloneDeep(await this.loadDataCache[key]);
}

getBundleTimestampsFromCache(key: string): undefined | { [chainId: number]: number[] } {
Expand Down Expand Up @@ -299,10 +299,20 @@ export class BundleDataClient {
): Promise<LoadDataReturnValue> {
const key = JSON.stringify(blockRangesForChains);

if (this.loadDataCache[key]) {
return this.loadDataFromCache(key);
if (!this.loadDataCache[key]) {
this.loadDataCache[key] = this._loadData(blockRangesForChains, spokePoolClients, logData);
}

return this.loadDataFromCache(key);
}

async _loadData(
blockRangesForChains: number[][],
spokePoolClients: SpokePoolClientsByChain,
logData = true
): Promise<LoadDataReturnValue> {
const key = JSON.stringify(blockRangesForChains);

if (!this.clients.configStoreClient.isUpdated) {
throw new Error("ConfigStoreClient not updated");
} else if (!this.clients.hubPoolClient.isUpdated) {
Expand Down Expand Up @@ -1061,7 +1071,7 @@ export class BundleDataClient {
});
}

this.loadDataCache[key] = {
return {
fillsToRefund,
deposits,
unfilledDeposits,
Expand All @@ -1073,8 +1083,6 @@ export class BundleDataClient {
unexecutableSlowFills,
bundleSlowFillsV3,
};

return this.loadDataFromCache(key);
}

async getBundleBlockTimestamps(
Expand Down
31 changes: 18 additions & 13 deletions src/dataworker/Dataworker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export type PoolRebalanceRoot = {
tree: MerkleTree<PoolRebalanceLeaf>;
};

type PoolRebalanceRootCache = Record<string, PoolRebalanceRoot>;
type PoolRebalanceRootCache = Record<string, Promise<PoolRebalanceRoot>>;

// @notice Constructs roots to submit to HubPool on L1. Fetches all data synchronously from SpokePool/HubPool clients
// so this class assumes that those upstream clients are already updated and have fetched on-chain data from RPC's.
Expand Down Expand Up @@ -1554,7 +1554,7 @@ export class Dataworker {
const l1TokensWithPotentiallyOlderUpdate = expectedTrees.poolRebalanceTree.leaves.reduce((l1TokenSet, leaf) => {
const currLeafL1Tokens = leaf.l1Tokens;
currLeafL1Tokens.forEach((l1Token) => {
if (!l1TokenSet[l1Token] && !updatedL1Tokens.has(l1Token)) {
if (!l1TokenSet.includes(l1Token) && !updatedL1Tokens.has(l1Token)) {
l1TokenSet.push(l1Token);
}
});
Expand Down Expand Up @@ -1727,7 +1727,7 @@ export class Dataworker {
if (currentLiquidReserves.gte(netSendAmounts[idx])) {
this.logger.debug({
at: "Dataworker#_updateExchangeRatesBeforeExecutingHubChainLeaves",
message: `Skipping exchange rate update for ${tokenSymbol} because current liquid reserves > netSendAmount`,
message: `Skipping exchange rate update for ${tokenSymbol} because current liquid reserves > netSendAmount for hubChain`,
currentLiquidReserves,
netSendAmount: netSendAmounts[idx],
l1Token,
Expand All @@ -1751,7 +1751,7 @@ export class Dataworker {

this.logger.debug({
at: "Dataworker#_updateExchangeRatesBeforeExecutingHubChainLeaves",
message: `Updating exchange rate update for ${tokenSymbol} because we need to update the liquid reserves of the contract to execute the poolRebalanceLeaf.`,
message: `Updating exchange rate update for ${tokenSymbol} because we need to update the liquid reserves of the contract to execute the hubChain poolRebalanceLeaf.`,
poolRebalanceLeaf,
netSendAmount: netSendAmounts[idx],
currentPooledTokens,
Expand All @@ -1776,7 +1776,7 @@ export class Dataworker {
async _updateExchangeRatesBeforeExecutingNonHubChainLeaves(
latestLiquidReserves: Record<string, BigNumber>,
balanceAllocator: BalanceAllocator,
poolRebalanceLeaves: Pick<PoolRebalanceLeaf, "netSendAmounts" | "l1Tokens">[],
poolRebalanceLeaves: Pick<PoolRebalanceLeaf, "netSendAmounts" | "l1Tokens" | "chainId">[],
submitExecution: boolean
): Promise<Set<string>> {
const updatedL1Tokens = new Set<string>();
Expand Down Expand Up @@ -1813,7 +1813,8 @@ export class Dataworker {
if (currHubPoolLiquidReserves.gte(leaf.netSendAmounts[idx])) {
this.logger.debug({
at: "Dataworker#_updateExchangeRatesBeforeExecutingNonHubChainLeaves",
message: `Skipping exchange rate update for ${tokenSymbol} because current liquid reserves > netSendAmount`,
message: `Skipping exchange rate update for ${tokenSymbol} because current liquid reserves > netSendAmount for chain ${leaf.chainId}`,
l2ChainId: leaf.chainId,
currHubPoolLiquidReserves,
netSendAmount: leaf.netSendAmounts[idx],
l1Token,
Expand Down Expand Up @@ -1876,16 +1877,18 @@ export class Dataworker {
const tokenSymbol = this.clients.hubPoolClient.getTokenInfo(chainId, l1Token)?.symbol;

// Exit early if we recently synced this token.
const lastestFeesCompoundedTime =
const latestFeesCompoundedTime =
this.clients.hubPoolClient.getLpTokenInfoForL1Token(l1Token)?.lastLpFeeUpdate ?? 0;
// Force update every 2 days:
if (
this.clients.hubPoolClient.currentTime === undefined ||
this.clients.hubPoolClient.currentTime - lastestFeesCompoundedTime <= 2 * 24 * 60 * 60 // 2 day
this.clients.hubPoolClient.currentTime - latestFeesCompoundedTime <= 2 * 24 * 60 * 60
) {
const timeToNextUpdate = 2 * 24 * 60 * 60 - (this.clients.hubPoolClient.currentTime - latestFeesCompoundedTime);
this.logger.debug({
at: "Dataworker#_updateOldExchangeRates",
message: `Skipping exchange rate update for ${tokenSymbol} because it was recently updated`,
lastUpdateTime: lastestFeesCompoundedTime,
message: `Skipping exchange rate update for ${tokenSymbol} because it was recently updated. Seconds to next update: ${timeToNextUpdate}s`,
lastUpdateTime: latestFeesCompoundedTime,
});
return;
}
Expand Down Expand Up @@ -1913,7 +1916,9 @@ export class Dataworker {
this.logger.debug({
at: "Dataworker#_updateOldExchangeRates",
message: `Updating exchange rate for ${tokenSymbol}`,
lastUpdateTime: lastestFeesCompoundedTime,
lastUpdateTime: latestFeesCompoundedTime,
currentLiquidReserves,
updatedLiquidReserves,
l1Token,
});
if (submitExecution) {
Expand Down Expand Up @@ -2270,7 +2275,7 @@ export class Dataworker {
// executor running for tonight (2023-08-28) until we can fix the
// root cache rebalancing bug.
if (!this.rootCache[key] || process.env.DATAWORKER_DISABLE_REBALANCE_ROOT_CACHE === "true") {
this.rootCache[key] = await _buildPoolRebalanceRoot(
this.rootCache[key] = _buildPoolRebalanceRoot(
latestMainnetBlock,
mainnetBundleEndBlock,
fillsToRefund,
Expand All @@ -2291,7 +2296,7 @@ export class Dataworker {
);
}

return _.cloneDeep(this.rootCache[key]);
return _.cloneDeep(await this.rootCache[key]);
}

_getRequiredEthForArbitrumPoolRebalanceLeaf(leaf: PoolRebalanceLeaf): BigNumber {
Expand Down
1 change: 1 addition & 0 deletions src/utils/ProviderUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class RateLimitedProvider extends ethers.providers.StaticJsonRpcProvider {
provider: getOriginFromURL(this.connection.url),
method,
params,
chainId: this.network.chainId,
};

// In this path we log an rpc response sample.
Expand Down
16 changes: 8 additions & 8 deletions test/Dataworker.executePoolRebalances.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ describe("Dataworker: Execute pool rebalances", async function () {
const updated = await dataworkerInstance._updateExchangeRatesBeforeExecutingNonHubChainLeaves(
{},
balanceAllocator,
[{ netSendAmounts: [toBNWei(-1)], l1Tokens: [l1Token_1.address] }],
[{ netSendAmounts: [toBNWei(-1)], l1Tokens: [l1Token_1.address], chainId: 1 }],
true
);
expect(updated.size).to.equal(0);
Expand All @@ -259,7 +259,7 @@ describe("Dataworker: Execute pool rebalances", async function () {
const updated = await dataworkerInstance._updateExchangeRatesBeforeExecutingNonHubChainLeaves(
{},
balanceAllocator,
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] }],
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 }],
true
);
expect(updated.size).to.equal(0);
Expand All @@ -274,7 +274,7 @@ describe("Dataworker: Execute pool rebalances", async function () {
[l1Token_1.address]: liquidReserves,
},
balanceAllocator,
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] }],
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 }],
true
);
expect(updated.size).to.equal(0);
Expand All @@ -288,7 +288,7 @@ describe("Dataworker: Execute pool rebalances", async function () {
const updated = await dataworkerInstance._updateExchangeRatesBeforeExecutingNonHubChainLeaves(
{},
balanceAllocator,
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] }],
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 }],
true
);
expect(lastSpyLogLevel(spy)).to.equal("error");
Expand All @@ -307,7 +307,7 @@ describe("Dataworker: Execute pool rebalances", async function () {
const updated = await dataworkerInstance._updateExchangeRatesBeforeExecutingNonHubChainLeaves(
{},
balanceAllocator,
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] }],
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 }],
true
);
expect(updated.size).to.equal(1);
Expand All @@ -326,7 +326,7 @@ describe("Dataworker: Execute pool rebalances", async function () {
[l1Token_1.address]: liquidReserves,
},
balanceAllocator,
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] }],
[{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 }],
true
);
expect(updated.size).to.equal(1);
Expand All @@ -346,8 +346,8 @@ describe("Dataworker: Execute pool rebalances", async function () {
},
balanceAllocator,
[
{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] },
{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address] },
{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 },
{ netSendAmounts: [netSendAmount], l1Tokens: [l1Token_1.address], chainId: 1 },
],
true
);
Expand Down

0 comments on commit 0b63f87

Please sign in to comment.