Skip to content

Commit

Permalink
Feat/performance tracking (#1896)
Browse files Browse the repository at this point in the history
Goal:
Provide a standardised way of logging performance metrics.
The API is similar to using Node's native perf_hooks, but slightly easier to use.

The Profiler will ensure { datadog: true } is appended to all logs so we can ingest this by searching for this key.

Since the Profiler class holds some state, each instance should be created within a function scope so it can be garbage collected.

---------

Co-authored-by: Paul <[email protected]>
  • Loading branch information
gsteenkamp89 and pxrl authored Nov 26, 2024
1 parent f4043b7 commit 5bf9721
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 91 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"dependencies": {
"@across-protocol/constants": "^3.1.19",
"@across-protocol/contracts": "^3.0.16",
"@across-protocol/sdk": "^3.3.18",
"@across-protocol/sdk": "^3.3.21",
"@arbitrum/sdk": "^4.0.2",
"@consensys/linea-sdk": "^0.2.1",
"@defi-wonderland/smock": "^2.3.5",
Expand Down
38 changes: 22 additions & 16 deletions src/clients/InventoryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
assert,
compareAddressesSimple,
getUsdcSymbol,
Profiler,
getNativeTokenSymbol,
} from "../utils";
import { HubPoolClient, TokenClient, BundleDataClient } from ".";
Expand Down Expand Up @@ -59,6 +60,7 @@ export class InventoryClient {
private readonly formatWei: ReturnType<typeof createFormatFunction>;
private bundleRefundsPromise: Promise<CombinedRefunds[]> = undefined;
private excessRunningBalancePromises: { [l1Token: string]: Promise<{ [chainId: number]: BigNumber }> } = {};
private profiler: InstanceType<typeof Profiler>;

constructor(
readonly relayer: string,
Expand All @@ -75,6 +77,10 @@ export class InventoryClient {
) {
this.scalar = sdkUtils.fixedPointAdjustment;
this.formatWei = createFormatFunction(2, 4, false, 18);
this.profiler = new Profiler({
logger: this.logger,
at: "InventoryClient",
});
}

/**
Expand Down Expand Up @@ -299,13 +305,15 @@ export class InventoryClient {
async getBundleRefunds(l1Token: string): Promise<{ [chainId: string]: BigNumber }> {
let refundsToConsider: CombinedRefunds[] = [];

let mark: ReturnType<typeof this.profiler.start>;
// Increase virtual balance by pending relayer refunds from the latest valid bundle and the
// upcoming bundle. We can assume that all refunds from the second latest valid bundle have already
// been executed.
let startTimer: number;
if (!isDefined(this.bundleRefundsPromise)) {
startTimer = performance.now();
// @dev Save this as a promise so that other parallel calls to this function don't make the same call.
mark = this.profiler.start("bundleRefunds", {
l1Token,
});
this.bundleRefundsPromise = this.getAllBundleRefunds();
}
refundsToConsider = lodash.cloneDeep(await this.bundleRefundsPromise);
Expand All @@ -327,12 +335,12 @@ export class InventoryClient {
},
{}
);
if (startTimer) {
this.log(`Time taken to get bundle refunds: ${Math.round((performance.now() - startTimer) / 1000)}s`, {
l1Token,
totalRefundsPerChain,
});
}

mark?.stop({
message: "Time to calculate total refunds per chain",
l1Token,
});

return totalRefundsPerChain;
}

Expand Down Expand Up @@ -618,7 +626,8 @@ export class InventoryClient {
): Promise<{ [chainId: number]: BigNumber }> {
const { root: latestPoolRebalanceRoot, blockRanges } = await this.bundleDataClient.getLatestPoolRebalanceRoot();
const chainIds = this.hubPoolClient.configStoreClient.getChainIdIndicesForBlock();
const start = performance.now();

const mark = this.profiler.start("getLatestRunningBalances");
const runningBalances = Object.fromEntries(
await sdkUtils.mapAsync(chainsToEvaluate, async (chainId) => {
const chainIdIndex = chainIds.indexOf(chainId);
Expand Down Expand Up @@ -674,13 +683,10 @@ export class InventoryClient {
];
})
);
this.log(
`Approximated latest (abs. val) running balance for ORU chains for token ${l1Token} in ${
Math.round(performance.now() - start) / 1000
}s`,
{ runningBalances }
);

mark.stop({
message: "Time to get running balances",
runningBalances,
});
return Object.fromEntries(Object.entries(runningBalances).map(([k, v]) => [k, v.absLatestRunningBalance]));
}

Expand Down
15 changes: 7 additions & 8 deletions src/clients/TokenClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import {
MAX_UINT_VAL,
assign,
blockExplorerLink,
getCurrentTime,
getNetworkName,
Profiler,
runTransaction,
toBN,
winston,
Expand All @@ -27,6 +27,7 @@ type TokenShortfallType = {
};

export class TokenClient {
private profiler: InstanceType<typeof Profiler>;
tokenData: TokenDataType = {};
tokenShortfall: TokenShortfallType = {};

Expand All @@ -35,7 +36,9 @@ export class TokenClient {
readonly relayerAddress: string,
readonly spokePoolClients: { [chainId: number]: SpokePoolClient },
readonly hubPoolClient: HubPoolClient
) {}
) {
this.profiler = new Profiler({ at: "TokenClient", logger });
}

getAllTokenData(): TokenDataType {
return this.tokenData;
Expand Down Expand Up @@ -238,7 +241,7 @@ export class TokenClient {
}

async update(): Promise<void> {
const start = getCurrentTime();
const mark = this.profiler.start("update");
this.logger.debug({ at: "TokenBalanceClient", message: "Updating TokenBalance client" });
const { hubPoolClient } = this;

Expand Down Expand Up @@ -272,11 +275,7 @@ export class TokenClient {
})
);

this.logger.debug({
at: "TokenBalanceClient",
message: `Updated TokenBalance client in ${getCurrentTime() - start} seconds.`,
balanceData,
});
mark.stop({ message: "Updated TokenBalance client.", balanceData });
}

async fetchTokenData(
Expand Down
15 changes: 10 additions & 5 deletions src/dataworker/DataworkerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
getTimestampsForBundleEndBlocks,
isDefined,
MerkleTree,
Profiler,
TOKEN_SYMBOLS_MAP,
winston,
} from "../utils";
Expand Down Expand Up @@ -350,7 +351,13 @@ export async function persistDataToArweave(
Buffer.from(tag).length <= ARWEAVE_TAG_BYTE_LIMIT,
`Arweave tag cannot exceed ${ARWEAVE_TAG_BYTE_LIMIT} bytes`
);
const startTime = performance.now();

const profiler = new Profiler({
logger,
at: "DataworkerUtils#persistDataToArweave",
});
const mark = profiler.start("persistDataToArweave");

// Check if data already exists on Arweave with the given tag.
// If so, we don't need to persist it again.
const [matchingTxns, address, balance] = await Promise.all([
Expand Down Expand Up @@ -397,10 +404,8 @@ export async function persistDataToArweave(
balance: formatWinston(balance),
notificationPath: "across-arweave",
});
const endTime = performance.now();
logger.debug({
at: "Dataworker#index",
message: `Time to persist data to Arweave: ${endTime - startTime}ms`,
mark.stop({
message: "Time to persist to Arweave",
});
}
}
51 changes: 32 additions & 19 deletions src/dataworker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Signer,
disconnectRedisClients,
isDefined,
Profiler,
} from "../utils";
import { spokePoolClientsToProviders } from "../common";
import { Dataworker } from "./Dataworker";
Expand Down Expand Up @@ -52,21 +53,29 @@ export async function createDataworker(
dataworker,
};
}

export async function runDataworker(_logger: winston.Logger, baseSigner: Signer): Promise<void> {
logger = _logger;
let loopStart = performance.now();
const { clients, config, dataworker } = await createDataworker(logger, baseSigner);
logger.debug({
const profiler = new Profiler({
at: "Dataworker#index",
message: `Time to update non-spoke clients: ${(performance.now() - loopStart) / 1000}s`,
logger: _logger,
});
loopStart = performance.now();
logger = _logger;

const { clients, config, dataworker } = await profiler.measureAsync(
createDataworker(logger, baseSigner),
"createDataworker",
{
message: "Time to update non-spoke clients",
}
);

let proposedBundleData: BundleData | undefined = undefined;
let poolRebalanceLeafExecutionCount = 0;
try {
logger[startupLogLevel(config)]({ at: "Dataworker#index", message: "Dataworker started 👩‍🔬", config });

for (;;) {
profiler.mark("loopStart");
// Determine the spoke client's lookback:
// 1. We initiate the spoke client event search windows based on a start bundle's bundle block end numbers and
// how many bundles we want to look back from the start bundle blocks.
Expand Down Expand Up @@ -108,7 +117,7 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
fromBlocks,
toBlocks
);
const dataworkerFunctionLoopTimerStart = performance.now();
profiler.mark("dataworkerFunctionLoopTimerStart");
// Validate and dispute pending proposal before proposing a new one
if (config.disputerEnabled) {
await dataworker.validatePendingRootBundle(
Expand Down Expand Up @@ -191,19 +200,23 @@ export async function runDataworker(_logger: winston.Logger, baseSigner: Signer)
} else {
await clients.multiCallerClient.executeTxnQueues();
}

const dataworkerFunctionLoopTimerEnd = performance.now();
logger.debug({
at: "Dataworker#index",
message: `Time to update spoke pool clients and run dataworker function: ${Math.round(
(dataworkerFunctionLoopTimerEnd - loopStart) / 1000
)}s`,
timeToLoadSpokes: Math.round((dataworkerFunctionLoopTimerStart - loopStart) / 1000),
timeToRunDataworkerFunctions: Math.round(
(dataworkerFunctionLoopTimerEnd - dataworkerFunctionLoopTimerStart) / 1000
),
profiler.mark("dataworkerFunctionLoopTimerEnd");
profiler.measure("timeToLoadSpokes", {
message: "Time to load spokes in data worker loop",
from: "loopStart",
to: "dataworkerFunctionLoopTimerStart",
});
profiler.measure("timeToRunDataworkerFunctions", {
message: "Time to run data worker functions in data worker loop",
from: "dataworkerFunctionLoopTimerStart",
to: "dataworkerFunctionLoopTimerEnd",
});
// do we need to add an additional log for the sum of the previous?
profiler.measure("dataWorkerTotal", {
message: "Total time taken for dataworker loop",
from: "loopStart",
to: "dataworkerFunctionLoopTimerEnd",
});
loopStart = performance.now();

if (await processEndPollingLoop(logger, "Dataworker", config.pollingDelay)) {
break;
Expand Down
35 changes: 27 additions & 8 deletions src/finalizer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
startupLogLevel,
winston,
CHAIN_IDs,
Profiler,
} from "../utils";
import { ChainFinalizer, CrossChainMessage } from "./types";
import {
Expand Down Expand Up @@ -471,17 +472,23 @@ export class FinalizerConfig extends DataworkerConfig {

export async function runFinalizer(_logger: winston.Logger, baseSigner: Signer): Promise<void> {
logger = _logger;

// Same config as Dataworker for now.
const config = new FinalizerConfig(process.env);
const profiler = new Profiler({
logger,
at: "Finalizer#index",
config,
});

logger[startupLogLevel(config)]({ at: "Finalizer#index", message: "Finalizer started 🏋🏿‍♀️", config });
const { commonClients, spokePoolClients } = await constructFinalizerClients(logger, config, baseSigner);

try {
for (;;) {
const loopStart = performance.now();
profiler.mark("loopStart");
await updateSpokePoolClients(spokePoolClients, ["TokensBridged"]);
const loopStartPostSpokePoolUpdates = performance.now();
profiler.mark("loopStartPostSpokePoolUpdates");

if (config.finalizerEnabled) {
const availableChains = commonClients.configStoreClient
Expand All @@ -501,13 +508,25 @@ export async function runFinalizer(_logger: winston.Logger, baseSigner: Signer):
} else {
logger[startupLogLevel(config)]({ at: "Dataworker#index", message: "Finalizer disabled" });
}
const loopEndPostFinalizations = performance.now();

logger.debug({
at: "Finalizer#index",
message: `Time to loop: ${Math.round((loopEndPostFinalizations - loopStart) / 1000)}s`,
timeToUpdateSpokeClients: Math.round((loopStartPostSpokePoolUpdates - loopStart) / 1000),
timeToFinalize: Math.round((loopEndPostFinalizations - loopStartPostSpokePoolUpdates) / 1000),
profiler.mark("loopEndPostFinalizations");

profiler.measure("timeToUpdateSpokeClients", {
from: "loopStart",
to: "loopStartPostSpokePoolUpdates",
strategy: config.finalizationStrategy,
});

profiler.measure("timeToFinalize", {
from: "loopStartPostSpokePoolUpdates",
to: "loopEndPostFinalizations",
strategy: config.finalizationStrategy,
});

profiler.measure("loopTime", {
message: "Time to loop",
from: "loopStart",
to: "loopEndPostFinalizations",
strategy: config.finalizationStrategy,
});

Expand Down
19 changes: 12 additions & 7 deletions src/libexec/util/evm/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from "assert";
import { Contract, EventFilter } from "ethers";
import { getNetworkName, isDefined, paginatedEventQuery, winston } from "../../../utils";
import { getNetworkName, isDefined, paginatedEventQuery, Profiler, winston } from "../../../utils";
import { Log, ScraperOpts } from "../../types";

/**
Expand Down Expand Up @@ -45,8 +45,12 @@ export async function scrapeEvents(
spokePool: Contract,
eventName: string,
opts: ScraperOpts & { toBlock: number },
logger: winston.Logger
logger?: winston.Logger
): Promise<Log[]> {
const profiler = new Profiler({
logger,
at: "scrapeEvents",
});
const { lookback, deploymentBlock, filterArgs, maxBlockRange, toBlock } = opts;
const { chainId } = await spokePool.provider.getNetwork();
const chain = getNetworkName(chainId);
Expand All @@ -55,13 +59,14 @@ export async function scrapeEvents(
assert(toBlock > fromBlock, `${toBlock} > ${fromBlock}`);
const searchConfig = { fromBlock, toBlock, maxBlockLookBack: maxBlockRange };

const tStart = performance.now();
const mark = profiler.start("paginatedEventQuery");
const filter = getEventFilter(spokePool, eventName, filterArgs[eventName]);
const events = await paginatedEventQuery(spokePool, filter, searchConfig);
const tStop = performance.now();
logger.debug({
at: "scrapeEvents",
message: `Scraped ${events.length} ${chain} ${eventName} events in ${Math.round((tStop - tStart) / 1000)} seconds`,
mark.stop({
message: `Scraped ${events.length} ${chain} ${eventName} events.`,
numEvents: events.length,
chain,
eventName,
searchConfig,
});

Expand Down
Loading

0 comments on commit 5bf9721

Please sign in to comment.