Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaces global activity cache with generic query caching #9554

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions libs/adapters/src/trpc/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
CacheNamespaces,
Events,
INVALID_ACTOR_ERROR,
INVALID_INPUT_ERROR,
INVALID_STATE_ERROR,
cache,
command as coreCommand,
query as coreQuery,
handleEvent,
logger,
type EventSchemas,
type EventsHandlerMetadata,
type Metadata,
Expand All @@ -14,6 +17,8 @@ import { TRPCError } from '@trpc/server';
import { ZodSchema, ZodUndefined, z } from 'zod';
import { Commit, Tag, Track, buildproc, procedure } from './middleware';

const log = logger(import.meta);

const trpcerror = (error: unknown): TRPCError => {
if (error instanceof Error) {
const { name, message, ...other } = error;
Expand Down Expand Up @@ -89,6 +94,7 @@ export const command = <
* @param factory query factory
* @param tag query tag used for OpenAPI spec grouping
* @param forceSecure whether to force secure requests for rate-limited external-router
* @param ttlSecs cache response ttl in seconds
* @returns tRPC query procedure
*/
export const query = <
Expand All @@ -98,25 +104,52 @@ export const query = <
>(
factory: () => Metadata<Input, Output, AuthContext>,
tag: Tag,
forceSecure?: boolean,
options?: {
forceSecure?: boolean;
ttlSecs?: number;
},
) => {
const md = factory();
return buildproc({
method: 'GET',
name: factory.name,
md,
tag,
forceSecure,
forceSecure: options?.forceSecure,
}).query(async ({ ctx, input }) => {
try {
return await coreQuery(
const cacheKey = options?.ttlSecs
? `${factory.name}_${JSON.stringify(input)}`
: undefined;
if (cacheKey) {
const cachedReponse = await cache().getKey(
CacheNamespaces.Query_Response,
cacheKey,
);
if (cachedReponse) {
log.info(`Returning cached response for ${cacheKey}`);
return JSON.parse(cachedReponse);
}
}
const response = await coreQuery(
md,
{
actor: ctx.actor,
payload: input!,
},
false,
);
if (cacheKey) {
void cache()
.setKey(
CacheNamespaces.Query_Response,
cacheKey,
JSON.stringify(response),
options?.ttlSecs,
)
.then(() => log.info(`Cached response for ${cacheKey}`));
}
return response;
} catch (error) {
throw trpcerror(error);
}
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/ports/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export enum CacheNamespaces {
Activity_Cache = 'activity_cache',
Rate_Limiter = 'rate_limiter',
Api_key_auth = 'api_key_auth',
Query_Response = 'query_response',
}

/**
Expand Down
5 changes: 2 additions & 3 deletions libs/model/src/feed/GetGlobalActivity.query.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { Query } from '@hicommonwealth/core';
import * as schemas from '@hicommonwealth/schemas';
import { GlobalActivityCache } from '../globalActivityCache';
import { getUserActivityFeed } from '../getUserActivityFeed';

export function GetGlobalActivity(): Query<typeof schemas.ActivityFeed> {
return {
...schemas.ActivityFeed,
auth: [],
secure: false,
body: async () =>
await GlobalActivityCache.getInstance().getGlobalActivity(),
body: async () => await getUserActivityFeed({}),
};
}
2 changes: 1 addition & 1 deletion libs/model/src/feed/GetUserActivity.query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Query } from '@hicommonwealth/core';
import * as schemas from '@hicommonwealth/schemas';
import { getUserActivityFeed } from '../globalActivityCache';
import { getUserActivityFeed } from '../getUserActivityFeed';

export function GetUserActivity(): Query<typeof schemas.ActivityFeed> {
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { CacheNamespaces, cache, logger } from '@hicommonwealth/core';
import { ActivityFeed, ActivityThread } from '@hicommonwealth/schemas';
import { QueryTypes } from 'sequelize';
import { v4 as uuidv4 } from 'uuid';
import { z } from 'zod';
import { models } from './database';

Expand All @@ -15,9 +13,7 @@ export async function getUserActivityFeed({
user_id = 0,
thread_limit = 50,
comment_limit = 3,
}: Omit<z.infer<typeof ActivityFeed.input>, 'is_global'> & {
user_id?: number;
}) {
}: z.infer<typeof ActivityFeed.input> & { user_id?: number }) {
const query = `
WITH
user_communities AS (
Expand Down Expand Up @@ -114,124 +110,3 @@ ORDER BY

return threads.map((t) => t.thread);
}

const log = logger(import.meta);

export class GlobalActivityCache {
private _cacheKey = 'global_activity';
private _lockName = 'global_activity_cache_locker';
private static _instance: GlobalActivityCache;

constructor(
private _cacheTTL: number = 60 * 5, // cache TTL in seconds
) {}

static getInstance(cacheTTL?: number): GlobalActivityCache {
if (!GlobalActivityCache._instance) {
GlobalActivityCache._instance = new GlobalActivityCache(cacheTTL);
}
return GlobalActivityCache._instance;
}

public async start() {
await this.refreshGlobalActivity();
// eslint-disable-next-line @typescript-eslint/no-misused-promises
setInterval(this.refreshGlobalActivity.bind(this), this._cacheTTL * 1000);
}

public async getGlobalActivity() {
const activity = await cache().getKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
);
if (!activity) {
if (GlobalActivityCache._instance) {
const msg = 'Failed to fetch global activity from Redis';
log.error(msg);
}
return await getUserActivityFeed({});
}
return JSON.parse(activity);
}

public async deleteActivityFromCache(threadId: number): Promise<void> {
const errorMsg = 'Failed to update global activity in Redis';

try {
const res = await cache().getKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
);

if (!res) {
log.info('Global Activity Cache is empty');
return;
}

let activity = JSON.parse(res);
let updated = false;
activity = activity.filter((a: any) => {
let shouldKeep = true;
if (a.thread_id === threadId) {
updated = true;
shouldKeep = false;
}
return shouldKeep;
});

if (!updated) return;

const result = await cache().setKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
JSON.stringify(activity),
);
if (!result) {
log.error(errorMsg);
}
} catch (e: any) {
log.error(errorMsg, e);
}
}

private async refreshGlobalActivity(): Promise<void> {
try {
const lockAcquired = await this.acquireLock();

if (lockAcquired === false) {
log.info('Unable to acquire lock. Skipping refresh...');
return;
}

const activity = await getUserActivityFeed({});
const result = await cache().setKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
JSON.stringify(activity),
);

if (!result) {
const msg = 'Failed to save global activity in Redis';
log.error(msg);
return;
}

log.info('Activity cache successfully refreshed');
} catch (e: any) {
const msg = 'Failed to refresh the global cache';
log.error(msg, e);
}
}

private async acquireLock() {
return await cache().setKey(
CacheNamespaces.Activity_Cache,
this._lockName,
uuidv4(),
// shorten by 5 seconds to eliminate any discrepancies
// between setInterval delay and Redis TTL
this._cacheTTL - 5,
true,
);
}
}
1 change: 0 additions & 1 deletion libs/model/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ export type { E2E_TestEntities } from './tester';
export * from './chainEventSignatures';
export * from './config';
export * from './database';
export * from './globalActivityCache';
export * from './models';
export * from './utils';
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ const GLOBAL_ACTIVITY_STALE_TIME = 5 * 60 * 1_000; // 5 minutes (backend caches

export const useFetchGlobalActivityQuery = () => {
return trpc.feed.getGlobalActivity.useQuery(
{
thread_limit: 50,
comment_limit: 3,
},
{},
{
staleTime: GLOBAL_ACTIVITY_STALE_TIME,
cacheTime: USER_ACTIVITY_CACHE_TIME,
Expand Down
9 changes: 0 additions & 9 deletions packages/commonwealth/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { CacheDecorator, setupErrorHandlers } from '@hicommonwealth/adapters';
import { logger } from '@hicommonwealth/core';
import type { DB } from '@hicommonwealth/model';
import { GlobalActivityCache } from '@hicommonwealth/model';
import sgMail from '@sendgrid/mail';
import compression from 'compression';
import SessionSequelizeStore from 'connect-session-sequelize';
Expand Down Expand Up @@ -41,12 +40,10 @@ export async function main(
db: DB,
{
port,
noGlobalActivityCache = true,
withLoggingMiddleware = false,
withPrerender = false,
}: {
port: number;
noGlobalActivityCache?: boolean;
withLoggingMiddleware?: boolean;
withPrerender?: boolean;
},
Expand Down Expand Up @@ -153,11 +150,6 @@ export async function main(
setupMiddleware();
setupPassport(db);

// TODO: decouple as global singleton
const globalActivityCache = GlobalActivityCache.getInstance();
// initialize async to avoid blocking startup
if (!noGlobalActivityCache) globalActivityCache.start();

// Declare Validation Middleware Service
// middleware to use for all requests
const dbValidationService: DatabaseValidationService =
Expand All @@ -168,7 +160,6 @@ export async function main(
app,
db,
viewCountCache,
globalActivityCache,
dbValidationService,
cacheDecorator,
);
Expand Down
1 change: 0 additions & 1 deletion packages/commonwealth/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ const start = async () => {

main(app, models, {
port: config.PORT,
noGlobalActivityCache: config.NO_GLOBAL_ACTIVITY_CACHE,
withLoggingMiddleware: true,
withPrerender: config.APP_ENV === 'production' && !config.NO_PRERENDER,
})
Expand Down
29 changes: 19 additions & 10 deletions packages/commonwealth/server/api/external-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,26 @@ const { createComment, updateComment, deleteComment, createCommentReaction } =
const { getNewContent } = user.trpcRouter;

const api = {
getGlobalActivity: trpc.query(Feed.GetGlobalActivity, trpc.Tag.User, true),
getUserActivity: trpc.query(Feed.GetUserActivity, trpc.Tag.User, true),
getGlobalActivity: trpc.query(Feed.GetGlobalActivity, trpc.Tag.User, {
forceSecure: true,
ttlSecs: config.NO_GLOBAL_ACTIVITY_CACHE ? undefined : 60 * 5,
}),
getUserActivity: trpc.query(Feed.GetUserActivity, trpc.Tag.User, {
forceSecure: true,
}),
getNewContent,
getCommunities: trpc.query(
Community.GetCommunities,
trpc.Tag.Community,
true,
),
getCommunity: trpc.query(Community.GetCommunity, trpc.Tag.Community, true),
getMembers: trpc.query(Community.GetMembers, trpc.Tag.Community, true),
getComments: trpc.query(Comment.GetComments, trpc.Tag.Comment, true),
getCommunities: trpc.query(Community.GetCommunities, trpc.Tag.Community, {
forceSecure: true,
}),
getCommunity: trpc.query(Community.GetCommunity, trpc.Tag.Community, {
forceSecure: true,
}),
getMembers: trpc.query(Community.GetMembers, trpc.Tag.Community, {
forceSecure: true,
}),
getComments: trpc.query(Comment.GetComments, trpc.Tag.Comment, {
forceSecure: true,
}),
createCommunity,
updateCommunity,
createTopic,
Expand Down
5 changes: 4 additions & 1 deletion packages/commonwealth/server/api/feed.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { trpc } from '@hicommonwealth/adapters';
import { Feed } from '@hicommonwealth/model';
import { config } from '../../server/config';

export const trpcRouter = trpc.router({
getGlobalActivity: trpc.query(Feed.GetGlobalActivity, trpc.Tag.User),
getGlobalActivity: trpc.query(Feed.GetGlobalActivity, trpc.Tag.User, {
ttlSecs: config.NO_GLOBAL_ACTIVITY_CACHE ? undefined : 60 * 5,
}),
getUserActivity: trpc.query(Feed.GetUserActivity, trpc.Tag.User),
});
Loading
Loading