Skip to content

Commit

Permalink
Merge pull request #77 from OlympusDAO/develop
Browse files Browse the repository at this point in the history
Release: Data Consistency
  • Loading branch information
0xJem authored Oct 12, 2023
2 parents e5faf6a + 67b835e commit 1e6e2b6
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- run: yarn

## Fetch values from Subgraph Studio
- run: yarn test # Runs build automatically
- run: yarn test:ci # Runs build automatically
env:
ARBITRUM_SUBGRAPH_API_KEY: ${{ secrets.ARBITRUM_SUBGRAPH_API_KEY }}
UPSTASH_REDIS_URL: ${{ secrets.UPSTASH_REDIS_URL }}
10 changes: 10 additions & 0 deletions apps/server/.wundergraph/dateHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ export const getOffsetDays = (dateOffset?: number): number => {
return dateOffset;
}

/**
* Subtracts `offsetDays` from the `currentDate` and returns the new start date.
*
* This function also ensures that the new start date is not before the `finalStartDate`.
*
* @param offsetDays
* @param finalStartDate
* @param currentDate
* @returns
*/
export const getNextStartDate = (offsetDays: number, finalStartDate: Date, currentDate: Date | null): Date => {
const newEndDate: Date = getNextEndDate(currentDate);

Expand Down
1 change: 1 addition & 0 deletions apps/server/.wundergraph/metricHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ export const getMetricObject = (log: RequestLogger, tokenRecords: TokenRecord[],
*/
if (!tokenRecords.length || !tokenSupplies.length || !protocolMetrics.length) {
log.warn(`${FUNC}: Not all parameters have non-zero length: tokenRecords: ${tokenRecords.length}, tokenSupplies: ${tokenSupplies.length}, protocolMetrics: ${protocolMetrics.length}`);
log.warn(`${FUNC}: Fallback date: ${options && options.dateFallback ? options.dateFallback : "None"}`);

const date = options && options.dateFallback ? options.dateFallback : "";
const timestamp = options && options.dateFallback ? new Date(options.dateFallback).getTime() / 1000 : 0;
Expand Down
21 changes: 21 additions & 0 deletions apps/server/.wundergraph/operations/paginated/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export default createOperation.query({
});

// Group by date
let latestTokenSupplyDate: string | null = null;
if (tokenSuppliesQueryResult.data) {
tokenSuppliesQueryResult.data.forEach((record) => {
const date = record.date;
Expand All @@ -124,11 +125,31 @@ export default createOperation.query({

recordContainer.tokenSupplies.push(record);
byDateRecords.set(date, recordContainer);

// Record the latest date
if (!latestTokenSupplyDate || new Date(date) > new Date(latestTokenSupplyDate)) {
log.info(`${FUNC}: Found latest date: ${date}`);
latestTokenSupplyDate = date;
}
});

log.info(`${FUNC}: Processed ${tokenSuppliesQueryResult.data.length} TokenSupply records.`);
}

// If crossChainDataComplete is true, filter out incomplete records
if (ctx.input.crossChainDataComplete == true && latestTokenSupplyDate) {
const latestDate: Date = new Date(latestTokenSupplyDate);
// TokenRecord and TokenSupply records would have already been filtered to the latest complete date,
// but ProtocolMetric records would not be, as they are not cross-chain
// Remove by byDateRecords any keys that are later than latestTokenSupplyDate
byDateRecords.forEach((recordContainer, date) => {
if (new Date(date) > latestDate) {
log.info(`${FUNC}: Removing incomplete records for date ${date}.`);
byDateRecords.delete(date);
}
});
}

// Convert into new Metric objects
byDateRecords.forEach((recordContainer, date) => {
const metricRecord: Metric = getMetricObject(log, recordContainer.tokenRecords, recordContainer.tokenSupplies, recordContainer.protocolMetrics, { includeRecords: ctx.input.includeRecords, dateFallback: date });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export default createOperation.query({
input: z.object({
startDate: z.string({ description: "The start date in the YYYY-MM-DD format." }),
dateOffset: z.number({ description: "The number of days to paginate by. Reduce the value if data is missing." }).optional(),
// No need for crossChainDataComplete, as ProtocolMetrics are only available on the Ethereum subgraph
ignoreCache: z.boolean({ description: "If true, ignores the cache and queries the subgraphs directly." }).optional(),
}),
handler: async (ctx) => {
Expand Down
49 changes: 11 additions & 38 deletions apps/server/.wundergraph/operations/paginated/tokenRecords.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,11 @@
import { RequestLogger } from '@wundergraph/sdk/server';
import { getCacheKey, getCachedRecords, setCachedRecords } from '../../cacheHelper';
import { getOffsetDays, getNextStartDate, getNextEndDate, getISO8601DateString } from '../../dateHelper';
import { TokenRecordsResponseData } from '../../generated/models';
import { createOperation, z } from '../../generated/wundergraph.factory';
import { TokenRecord, flattenRecords, isCrossChainRecordDataComplete, sortRecordsDescending } from '../../tokenRecordHelper';
import { TokenRecord, filterCompleteRecords, flattenRecords, isCrossChainRecordDataComplete, sortRecordsDescending } from '../../tokenRecordHelper';
import { BadRequestError } from '../../badRequestError';
import { UpstreamSubgraphError } from '../../upstreamSubgraphError';

/**
* Determines whether the provided records should be processed further.
*
* This is used to skip processing records if the cross-chain data is incomplete (when the flag is provided).
*
* @param records
* @param hasProcessedFirstDate
* @param crossChainDataComplete
* @returns
*/
const shouldProcessRecords = (records: TokenRecordsResponseData, hasProcessedFirstDate: boolean, log: RequestLogger, crossChainDataComplete: boolean | undefined): boolean => {
if (hasProcessedFirstDate === true) {
return true;
}

if (!crossChainDataComplete) {
return true;
}

const arbitrumTokenRecords = records.treasuryArbitrum_tokenRecords;
const ethereumTokenRecords = records.treasuryEthereum_tokenRecords;

if (isCrossChainRecordDataComplete(arbitrumTokenRecords, ethereumTokenRecords)) {
log.info(`Cross-chain data is complete.`);
return true;
}

log.info(`Cross-chain data is incomplete.`);
return false;
}

/**
* This custom query will return a flat array containing TokenRecord objects from
* across all endpoints.
Expand Down Expand Up @@ -99,14 +67,19 @@ export default createOperation.query({
throw new UpstreamSubgraphError({ message: `${FUNC}: No data returned for date range ${getISO8601DateString(currentStartDate)} to ${getISO8601DateString(currentEndDate)}` });
}

if (shouldProcessRecords(queryResult.data, hasProcessedFirstDate, log, ctx.input.crossChainDataComplete)) {
// Collapse the data into a single array
combinedTokenRecords.push(...flattenRecords(queryResult.data, true, log));
let data: TokenRecordsResponseData = queryResult.data;

// This prevents checking for consistent cross-chain data a second time
hasProcessedFirstDate = true;
// If the first set of data has not been processed (in which case there may be lagging indexing) and cross-chain data should be complete, filter the records
if (!hasProcessedFirstDate && ctx.input.crossChainDataComplete == true) {
data = filterCompleteRecords(data, log);
}

// This prevents checking for consistent cross-chain data a second time
hasProcessedFirstDate = true;

// Flatten the data and add it to the combined array
combinedTokenRecords.push(...flattenRecords(data, true, log));

// Ensures that a finalStartDate close to the current date (within the first page) is handled correctly
// There is probably a cleaner way to do this, but this works for now
if (currentStartDate == finalStartDate) {
Expand Down
49 changes: 11 additions & 38 deletions apps/server/.wundergraph/operations/paginated/tokenSupplies.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,11 @@
import { RequestLogger } from '@wundergraph/sdk/server';
import { getCacheKey, getCachedRecords, setCachedRecords } from '../../cacheHelper';
import { getOffsetDays, getNextStartDate, getNextEndDate, getISO8601DateString } from '../../dateHelper';
import { TokenSuppliesResponseData } from '../../generated/models';
import { createOperation, z } from '../../generated/wundergraph.factory';
import { TokenSupply, flattenRecords, isCrossChainSupplyDataComplete, sortRecordsDescending } from '../../tokenSupplyHelper';
import { TokenSupply, filterCompleteRecords, flattenRecords, sortRecordsDescending } from '../../tokenSupplyHelper';
import { UpstreamSubgraphError } from '../../upstreamSubgraphError';
import { BadRequestError } from '../../badRequestError';

/**
* Determines whether the provided records should be processed further.
*
* This is used to skip processing records if the cross-chain data is incomplete (when the flag is provided).
*
* @param records
* @param hasProcessedFirstDate
* @param crossChainDataComplete
* @returns
*/
const shouldProcessRecords = (records: TokenSuppliesResponseData, hasProcessedFirstDate: boolean, log: RequestLogger, crossChainDataComplete: boolean | undefined): boolean => {
if (hasProcessedFirstDate === true) {
return true;
}

if (!crossChainDataComplete) {
return true;
}

const arbitrumTokenRecords = records.treasuryArbitrum_tokenSupplies;
const ethereumTokenRecords = records.treasuryEthereum_tokenSupplies;

if (isCrossChainSupplyDataComplete(arbitrumTokenRecords, ethereumTokenRecords)) {
log.info(`Cross-chain data is complete.`);
return true;
}

log.info(`Cross-chain data is incomplete.`);
return false;
}

/**
* This custom query will return a flat array containing TokenSupply objects from
* across all endpoints.
Expand Down Expand Up @@ -103,14 +71,19 @@ export default createOperation.query({
throw new UpstreamSubgraphError({ message: `${FUNC}: No data returned for date range ${getISO8601DateString(currentStartDate)} to ${getISO8601DateString(currentEndDate)}` });
}

if (shouldProcessRecords(queryResult.data, hasProcessedFirstDate, log, ctx.input.crossChainDataComplete)) {
// Collapse the data into a single array, and add a missing property
combinedTokenSupplies.push(...flattenRecords(queryResult.data, true, true, log));
let data: TokenSuppliesResponseData = queryResult.data;

// This prevents checking for consistent cross-chain data a second time
hasProcessedFirstDate = true;
// If the first set of data has not been processed (in which case there may be lagging indexing) and cross-chain data should be complete, filter the records
if (!hasProcessedFirstDate && ctx.input.crossChainDataComplete == true) {
data = filterCompleteRecords(data, log);
}

// This prevents checking for consistent cross-chain data a second time
hasProcessedFirstDate = true;

// Flatten the data and add it to the combined array
combinedTokenSupplies.push(...flattenRecords(data, true, true, log));

// Ensures that a finalStartDate close to the current date (within the first page) is handled correctly
// There is probably a cleaner way to do this, but this works for now
if (currentStartDate == finalStartDate) {
Expand Down
36 changes: 36 additions & 0 deletions apps/server/.wundergraph/tokenRecordHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,42 @@ export const sortRecordsDescending = (records: TokenRecord[]): TokenRecord[] =>
});
};

/**
* Filters `records` to only include records with a complete set of cross-chain data.
*
* @param records
*/
export const filterCompleteRecords = (records: TokenRecordsLatestResponseData, log: RequestLogger): TokenRecordsLatestResponseData => {
const FUNC = `tokenRecord/filterCompleteRecords`;

// Check for empty values
if (!records.treasuryArbitrum_tokenRecords.length || !records.treasuryEthereum_tokenRecords.length) {
log.warn(`${FUNC}: Arbitrum or Ethereum records are empty.`)
return {
treasuryArbitrum_tokenRecords: [],
treasuryEthereum_tokenRecords: [],
treasuryFantom_tokenRecords: [],
treasuryPolygon_tokenRecords: [],
};
}

// Get the earliest date across the Ethereum and Arbitrum records
const arbitrumDate = records.treasuryArbitrum_tokenRecords[0].date;
const ethereumDate = records.treasuryEthereum_tokenRecords[0].date;
const earliestDate = new Date(arbitrumDate) < new Date(ethereumDate) ? new Date(arbitrumDate) : new Date(ethereumDate);

// Filter the records to only include records up to the earliest date
const filteredRecords = {
treasuryArbitrum_tokenRecords: records.treasuryArbitrum_tokenRecords.filter((record) => new Date(record.date) <= earliestDate),
treasuryEthereum_tokenRecords: records.treasuryEthereum_tokenRecords.filter((record) => new Date(record.date) <= earliestDate),
treasuryFantom_tokenRecords: records.treasuryFantom_tokenRecords.filter((record) => new Date(record.date) <= earliestDate),
treasuryPolygon_tokenRecords: records.treasuryPolygon_tokenRecords.filter((record) => new Date(record.date) <= earliestDate),
};
log.info(`${FUNC}: Filtered records up to latest consistent date: ${earliestDate.toISOString()}`);

return filteredRecords;
}

export const flattenRecords = (records: TokenRecordsLatestResponseData, latestBlock: boolean, log: RequestLogger): TokenRecord[] => {
const FUNC = "tokenRecord/flattenRecords";
const combinedRecords: TokenRecord[] = [];
Expand Down
60 changes: 36 additions & 24 deletions apps/server/.wundergraph/tokenSupplyHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,42 @@ export const setBlockchainProperty = (records: TokenSupply[], blockchain: string
});
}

/**
* Filters `records` to only include records with a complete set of cross-chain data.
*
* @param records
*/
export const filterCompleteRecords = (records: TokenSuppliesLatestResponseData, log: RequestLogger): TokenSuppliesLatestResponseData => {
const FUNC = `tokenSupply/filterCompleteRecords`;

// Check for empty values
if (!records.treasuryArbitrum_tokenSupplies.length || !records.treasuryEthereum_tokenSupplies.length) {
log.warn(`${FUNC}: Arbitrum or Ethereum records are empty.`)
return {
treasuryArbitrum_tokenSupplies: [],
treasuryEthereum_tokenSupplies: [],
treasuryFantom_tokenSupplies: [],
treasuryPolygon_tokenSupplies: [],
};
}

// Get the earliest date across the Ethereum and Arbitrum records
const arbitrumDate = records.treasuryArbitrum_tokenSupplies[0].date;
const ethereumDate = records.treasuryEthereum_tokenSupplies[0].date;
const earliestDate = new Date(arbitrumDate) < new Date(ethereumDate) ? new Date(arbitrumDate) : new Date(ethereumDate);

// Filter the records to only include records up to the earliest date
const filteredRecords = {
treasuryArbitrum_tokenSupplies: records.treasuryArbitrum_tokenSupplies.filter((record) => new Date(record.date) <= earliestDate),
treasuryEthereum_tokenSupplies: records.treasuryEthereum_tokenSupplies.filter((record) => new Date(record.date) <= earliestDate),
treasuryFantom_tokenSupplies: records.treasuryFantom_tokenSupplies.filter((record) => new Date(record.date) <= earliestDate),
treasuryPolygon_tokenSupplies: records.treasuryPolygon_tokenSupplies.filter((record) => new Date(record.date) <= earliestDate),
};
log.info(`${FUNC}: Filtered records up to latest consistent date: ${earliestDate.toISOString()}`);

return filteredRecords;
}

export const flattenRecords = (records: TokenSuppliesLatestResponseData, blockchain: boolean, latestBlock: boolean, log: RequestLogger): TokenSupply[] => {
const FUNC = "tokenSupply/flattenRecords";
const combinedRecords: TokenSupply[] = [];
Expand Down Expand Up @@ -90,27 +126,3 @@ export const flattenRecords = (records: TokenSuppliesLatestResponseData, blockch

return combinedRecords;
};

/**
* Determines whether the data across chains is complete.
*
* It determines this by checking if the date of the records across chains is the same.
*
* Assumptions:
* - The data is sorted in descending order and for the same day
* - Ethereum and Arbitrum have OHM supply, so we only check those two chains
*
* @param arbitrumRecords
* @param ethereumRecords
* @returns
*/
export const isCrossChainSupplyDataComplete = (arbitrumRecords: TokenSupply[], ethereumRecords: TokenSupply[]): boolean => {
if (!arbitrumRecords.length || !ethereumRecords.length) {
return false;
}

const arbitrumDate = arbitrumRecords[0].date;
const ethereumDate = ethereumRecords[0].date;

return arbitrumDate === ethereumDate;
}
4 changes: 3 additions & 1 deletion apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"@types/node": "^18.16.7",
"date-fns": "^2.30.0",
"jest": "^29.5.0",
"jest-mock-extended": "^3.0.5",
"ts-jest": "^29.1.0",
"ts-node": "^10.9.1",
"typescript": "^5.0.4"
Expand All @@ -31,7 +32,8 @@
"build:release": "WG_PUBLIC_NODE_URL=https://olympus-treasury-subgraph-prod.web.app yarn build",
"build:local": "WG_PUBLIC_NODE_URL=http://localhost:9991 yarn build",
"pretest": "yarn build:local",
"test": "jest --runInBand",
"pretest:ci": "yarn build:local",
"test": "jest",
"test:ci": "WG_LOG_LEVEL=error jest --runInBand --ci",
"test:local": "dotenv -e ../../.env jest"
}
Expand Down
Loading

0 comments on commit 1e6e2b6

Please sign in to comment.