Skip to content

Commit

Permalink
Merge pull request #84 from OlympusDAO/develop
Browse files Browse the repository at this point in the history
Release: in-memory cache
  • Loading branch information
0xJem authored Jan 26, 2024
2 parents a30bf46 + 9795050 commit 44bac50
Show file tree
Hide file tree
Showing 25 changed files with 66 additions and 421 deletions.
1 change: 0 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# Graph Protocol on Arbitrum
ARBITRUM_SUBGRAPH_API_KEY=CHANGEME
UPSTASH_REDIS_URL=CHANGEME
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ jobs:
- run: yarn test:ci # Runs build automatically
env:
ARBITRUM_SUBGRAPH_API_KEY: ${{ secrets.ARBITRUM_SUBGRAPH_API_KEY }}
UPSTASH_REDIS_URL: ${{ secrets.UPSTASH_REDIS_URL }}
291 changes: 21 additions & 270 deletions apps/server/.wundergraph/cacheHelper.ts
Original file line number Diff line number Diff line change
@@ -1,108 +1,12 @@
import { RequestLogger } from "@wundergraph/sdk/server";

import { RedisClientType, createClient } from "redis";

const TTL = 60 * 60;

const UPSTASH_REQUEST_LIMIT = 1000000;
const CHUNK_MULTIPLIER = 0.9;

const CHUNK_SIZE = 1500;
// 1 hour
const TTL = 60 * 60 * 1000;

/**
* Source: https://stackoverflow.com/a/76352488
* Provides a rudimentary in-memory cache for the server.
*/
type CachedJsonElement = null | boolean | number | string | Date | CachedJSONArray | CachedJSONObject;
interface CachedJSONObject {
[key: string]: CachedJsonElement;
[key: number]: CachedJsonElement;
}
type CachedJSONArray = Array<CachedJsonElement>;

/**
* Determines the chunk size to use when storing an array in Upstash-hosted Redis.
*
* This ensures that the request size does not exceed the Upstash limit of 1MB.
*
* For example, if the string length of the records array is 1.5 MB, then the chunk size will be 2.
*/
const getChunkQuantity = (records: CachedJsonElement[]): number => {
const size = records.reduce((acc: number, record) => {
if (typeof record === "string") {
return acc + record.length;
}

if (typeof record === "number") {
return acc + 8;
}

if (typeof record === "boolean") {
return acc + 4;
}

if (record === null) {
return acc + 4;
}

if (record instanceof Date) {
return acc + 8;
}

if (Array.isArray(record)) {
return acc + getChunkQuantity(record);
}

if (typeof record === "object") {
return acc + getChunkQuantity(Object.values(record));
}

return acc;
}, 0);

return Math.ceil(size / UPSTASH_REQUEST_LIMIT);
}

/**
* Determines the chunk size to use when getting an array from Upstash-hosted Redis.
*/
const getChunkSize = async (client: RedisClientType, key: string, log: RequestLogger): Promise<number | null> => {
const FUNC = `getChunkSize: ${key}`;
// Get the first entry in the list
const firstEntry = await client.lIndex(key, 0);
if (!firstEntry) {
return null;
}

// Get the length of the first entry
const firstEntryLength = firstEntry.length;
log.info(`${FUNC}: First entry length: ${firstEntryLength}`);

// Return the number of entries that can be stored in 1MB
const entriesPerRequest = Math.floor(UPSTASH_REQUEST_LIMIT * CHUNK_MULTIPLIER / firstEntryLength);
log.info(`${FUNC}: Entries per request: ${entriesPerRequest}`);

return entriesPerRequest;
}

const chunkArray = <T>(array: T[], chunkSize: number): T[][] => {
const chunkedRecords: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
const chunk = array.slice(i, i + chunkSize);
chunkedRecords.push(chunk);
}

return chunkedRecords;
}

const getClient = (): RedisClientType => {
if (!process.env.UPSTASH_REDIS_URL) {
throw new Error("UPSTASH_REDIS_URL is not set");
}

return createClient({
url: process.env.UPSTASH_REDIS_URL,
});
}
const cache = new Map<string, [number, unknown]>();

const isCacheEnabled = (): boolean => {
if (!process.env.CACHE_ENABLED) {
Expand All @@ -124,190 +28,37 @@ export async function getCachedRecord<T>(key: string, log: RequestLogger): Promi
return null;
}

const startTime = Date.now();
const client = getClient();

let result: T | null = null;
try {
await client.connect();

const initialResult = await client.get(key);
if (initialResult) {
log.info(`${FUNC}: Cache hit`);
result = JSON.parse(initialResult) as T;
}
}
// Catch any errors. Worst-case is that the cache value is not used and a query is performed instead.
catch (e) {
log.error(`${FUNC}: Failed to get cache`, e);
log.error("message" in e ? e.message : "No error message available");
log.error("stack" in e ? e.stack : "No error stack available");

// Ensure the result is empty
result = null;
}
finally {
await client.disconnect();
}

const endTime = Date.now();
log.info(`${FUNC}: ${endTime - startTime}ms elapsed`);

return result;
}

export async function getCachedRecords<T>(key: string, log: RequestLogger): Promise<T[] | null> {
const FUNC = `getCachedRecords: ${key}`;

if (!isCacheEnabled()) {
log.info(`${FUNC}: Cache not enabled`);
// Attempt to get a cached result
const cachedResultWrapper = cache.get(key);
if (!cachedResultWrapper) {
log.info(`${FUNC}: Cache miss`);
return null;
}

const startTime = Date.now();
const client = getClient();

let result: T[] | null = null;
try {
await client.connect();

// Get the length of the list
const length = await client.lLen(key);
if (length === 0) {
log.info(`${FUNC}: Cache miss`);
return null;
}

result = [];
log.info(`${FUNC}: ${length} records found in cache`);

const chunkSize = await getChunkSize(client, key, log);
if (!chunkSize) {
log.warn(`${FUNC}: Unable to determine chunk size. Skipping.`);
return null;
}

// Get the list in chunks of chunkSize
// It is a known issue that with longer time periods and with nested records, this can exceed the maximum request size... in which case the cache will not be used
for (let i = 0; i < length; i += chunkSize) {
const chunkStartTime = Date.now();
log.info(`${FUNC}: Getting chunk in range ${i} to ${i + chunkSize - 1}`);

const chunk = await client.lRange(key, i, i + chunkSize - 1);
result.push(...(chunk.map(record => JSON.parse(record) as T)));

log.info(`${FUNC}: Chunk retrieved in ${Date.now() - chunkStartTime}ms`);
}

log.info(`${FUNC}: Cache hit`);
}
// Catch any errors. Worst-case is that the cache value is not used and a query is performed instead.
catch (e) {
log.error(`${FUNC}: Failed to get cache`);
log.error("message" in e ? e.message : "No error message available");
log.error("stack" in e ? e.stack : "No error stack available");

// Ensure the result is empty
result = null;
}
finally {
await client.disconnect();
// Check that the result is within the TTL
const currentTimestampMs = Date.now();
const resultTimestampMs = cachedResultWrapper[0];
if (currentTimestampMs - resultTimestampMs > TTL) {
log.info(`${FUNC}: Cache expired`);
cache.delete(key);
return null;
}

const endTime = Date.now();
log.info(`${FUNC}: ${endTime - startTime}ms elapsed`);

return result;
// Otherwise return the value
log.info(`${FUNC}: Cache hit`);
return cachedResultWrapper[1] as T;
}

export async function setCachedRecord(key: string, value: CachedJsonElement, log: RequestLogger): Promise<void> {
export async function setCachedRecord(key: string, value: unknown, log: RequestLogger): Promise<void> {
const FUNC = `setCachedRecord: ${key}`;

if (!isCacheEnabled()) {
log.info(`${FUNC}: Cache not enabled`);
return;
}

const startTime = Date.now();
const client = getClient();

try {
await client.connect();

// Set the value and expiry for 1 hour
await client.json.set(key, "$", value);
log.info(`${FUNC}: Updated cache`);
}
// Catch any errors. Worst-case is that the cache is not updated
catch (e) {
log.error(`${FUNC}: Failed to update cache`);
log.error("message" in e ? e.message : "No error message available");
log.error("stack" in e ? e.stack : "No error stack available");
}
finally {
await client.disconnect();
}

const endTime = Date.now();
log.info(`${FUNC}: ${endTime - startTime}ms elapsed`);
}

export async function setCachedRecords(key: string, records: CachedJsonElement[], log: RequestLogger): Promise<void> {
const FUNC = `setCachedRecords: ${key}`;

if (!isCacheEnabled()) {
log.info(`${FUNC}: Cache not enabled`);
return;
}

const startTime = Date.now();
const client = getClient();

try {
await client.connect();

/**
* Use an isolated client to ensure that the list is cleared and populated in a single transaction.
*
* Otherwise there is a risk that records are added to the list before it is cleared, which would result in duplicate records.
*/
await client.executeIsolated(async isolatedClient => {
log.info(`${FUNC}: Starting transaction`);

// Throw an error if the key is modified during the transaction
await isolatedClient.watch(key);

// Clear the list
log.info(`${FUNC}: Clearing cache`);
await isolatedClient.del(key);

// Divide the array into smaller chunks, to avoid the maximum request size
// It is a known issue that with longer time periods and with nested records, this can exceed the maximum request size... in which case the cache will not be updated
const chunkSize = getChunkQuantity(records);
const chunkedRecords = chunkArray(records, chunkSize);
log.info(`${FUNC}: ${chunkedRecords.length} chunks to insert`);
for (const chunk of chunkedRecords) {
await isolatedClient.rPush(key, chunk.map(record => JSON.stringify(record)));
}

// Set the value and expiry for 1 hour
await isolatedClient.expire(key, TTL);
});

log.info(`${FUNC}: Updated cache`);
}
// Catch any errors. Worst-case is that the cache is not updated
catch (e) {
log.error(`${FUNC}: Failed to update cache`);
log.error("message" in e ? e.message : "No error message available");
log.error("stack" in e ? e.stack : "No error stack available");
}
finally {
await client.disconnect();
}

const endTime = Date.now();
log.info(`${FUNC}: ${endTime - startTime}ms elapsed`);
cache.set(key, [Date.now(), value]);
log.info(`${FUNC}: Updated cache`);
}

export const getCacheKey = (name: string, input?: Record<string, unknown>): string => {
Expand Down
6 changes: 3 additions & 3 deletions apps/server/.wundergraph/operations/earliest/tokenRecords.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getCacheKey, getCachedRecords, setCachedRecords } from '../../cacheHelper';
import { getCacheKey, getCachedRecord, setCachedRecord } from '../../cacheHelper';
import { UpstreamSubgraphError } from '../../upstreamSubgraphError';
import { createOperation, z } from '../../generated/wundergraph.factory';
import { TokenRecord, flattenRecords } from '../../tokenRecordHelper';
Expand All @@ -20,7 +20,7 @@ export default createOperation.query({
// Return cached data if it exists
const cacheKey = getCacheKey(FUNC, ctx.input);
if (!ctx.input.ignoreCache) {
const cachedData = await getCachedRecords<TokenRecord>(cacheKey, log);
const cachedData = await getCachedRecord<TokenRecord[]>(cacheKey, log);
if (cachedData) {
return cachedData;
}
Expand All @@ -39,7 +39,7 @@ export default createOperation.query({
const flatRecords = flattenRecords(queryResult.data, false, log);

// Update the cache
await setCachedRecords(cacheKey, flatRecords, log);
await setCachedRecord(cacheKey, flatRecords, log);

log.info(`${FUNC}: Returning ${flatRecords.length} records.`);
return flatRecords;
Expand Down
6 changes: 3 additions & 3 deletions apps/server/.wundergraph/operations/earliest/tokenSupplies.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getCacheKey, getCachedRecords, setCachedRecords } from '../../cacheHelper';
import { getCacheKey, getCachedRecord, setCachedRecord } from '../../cacheHelper';
import { UpstreamSubgraphError } from '../../upstreamSubgraphError';
import { createOperation, z } from '../../generated/wundergraph.factory';
import { TokenSupply, flattenRecords } from '../../tokenSupplyHelper';
Expand All @@ -20,7 +20,7 @@ export default createOperation.query({
// Return cached data if it exists
const cacheKey = getCacheKey(FUNC, ctx.input);
if (!ctx.input.ignoreCache) {
const cachedData = await getCachedRecords<TokenSupply>(cacheKey, log);
const cachedData = await getCachedRecord<TokenSupply[]>(cacheKey, log);
if (cachedData) {
return cachedData;
}
Expand All @@ -39,7 +39,7 @@ export default createOperation.query({
const flatRecords = flattenRecords(queryResult.data, true, false, log);

// Update the cache
await setCachedRecords(cacheKey, flatRecords, log);
await setCachedRecord(cacheKey, flatRecords, log);

log.info(`${FUNC}: Returning ${flatRecords.length} records.`);
return flatRecords;
Expand Down
Loading

0 comments on commit 44bac50

Please sign in to comment.