From 15508ed7c21d17ae79d52b480b7a4ead44befd7f Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Fri, 6 Sep 2024 13:41:01 -0300 Subject: [PATCH] feat: allow user to disable cache lookup via request body --- api/src/graphql/import.resolver.ts | 4 ++-- api/src/handlers/import.handler.ts | 3 ++- api/src/handlers/streamSyncronize.handler.ts | 4 ++-- api/src/handlers/syncronize.handler.ts | 4 ++-- api/src/protos/pokeshop.ts | 1 + api/src/services/pokemonRpc.service.ts | 5 +++-- .../services/pokemonSyncronizer.service.ts | 20 ++++++++++++++----- api/src/services/queue.service.ts | 8 ++++---- api/src/validators/importPokemon.ts | 2 ++ 9 files changed, 33 insertions(+), 18 deletions(-) diff --git a/api/src/graphql/import.resolver.ts b/api/src/graphql/import.resolver.ts index c1eea4b..92ac05f 100644 --- a/api/src/graphql/import.resolver.ts +++ b/api/src/graphql/import.resolver.ts @@ -4,8 +4,8 @@ import PokemonSyncronizer from '@pokemon/services/pokemonSyncronizer.service'; const pokeApiService = new PokeAPIService(); const pokemonSyncronizer = PokemonSyncronizer(pokeApiService); -const importPokemon = async ({ id = 0 }) => { - await pokemonSyncronizer.queue({ id }); +const importPokemon = async ({ id = 0, ignoreCache = false }) => { + await pokemonSyncronizer.queue({ id, ignoreCache }); return { id }; }; diff --git a/api/src/handlers/import.handler.ts b/api/src/handlers/import.handler.ts index af38821..90e1e32 100644 --- a/api/src/handlers/import.handler.ts +++ b/api/src/handlers/import.handler.ts @@ -8,10 +8,11 @@ const pokeApiService = new PokeAPIService(); const pokemonSyncronizer = PokemonSyncronizer(pokeApiService); const importPokemon = async (ctx: { body: ImportPokemon }) => { - const { id = 0 } = ctx.body; + const { id = 0, ignoreCache = false } = ctx.body; await pokemonSyncronizer.queue({ id: id, + ignoreCache: ignoreCache, }); return { diff --git a/api/src/handlers/streamSyncronize.handler.ts b/api/src/handlers/streamSyncronize.handler.ts index 9520d79..4bf75e3 100644 --- a/api/src/handlers/streamSyncronize.handler.ts +++ b/api/src/handlers/streamSyncronize.handler.ts @@ -12,8 +12,8 @@ const pokemonSyncronizationHandler = async (message: KafkaMessage) => { return; } - const { id }: TPokemonSyncMessage = JSON.parse(messageString); - await pokemonSyncronizer.sync(id); + const msg: TPokemonSyncMessage = JSON.parse(messageString); + await pokemonSyncronizer.sync(msg); }; export default function setupWorker(streamService: StreamingService) { diff --git a/api/src/handlers/syncronize.handler.ts b/api/src/handlers/syncronize.handler.ts index 70d683e..6f73ce9 100644 --- a/api/src/handlers/syncronize.handler.ts +++ b/api/src/handlers/syncronize.handler.ts @@ -6,9 +6,9 @@ import ampqlib from 'amqplib'; const pokemonSyncronizationHandler = async (message: ampqlib.ConsumeMessage) => { const pokeApiService = new PokeAPIService(); const pokemonSyncronizer = PokemonSyncronizer(pokeApiService); - const { id }: TPokemonSyncMessage = JSON.parse(message.content.toString()); + const pokemonSyncMessage: TPokemonSyncMessage = JSON.parse(message.content.toString()); - await pokemonSyncronizer.sync(id); + await pokemonSyncronizer.sync(pokemonSyncMessage); }; export default function setupWorker(queueService: QueueService) { diff --git a/api/src/protos/pokeshop.ts b/api/src/protos/pokeshop.ts index 7086dd3..a77cd5a 100644 --- a/api/src/protos/pokeshop.ts +++ b/api/src/protos/pokeshop.ts @@ -18,6 +18,7 @@ export const protobufPackage = "pokeshop"; export interface ImportPokemonRequest { id: number; isFixed?: boolean | undefined; + ignoreCache?: boolean | undefined; } export interface GetPokemonRequest { diff --git a/api/src/services/pokemonRpc.service.ts b/api/src/services/pokemonRpc.service.ts index f750595..c57c6bd 100644 --- a/api/src/services/pokemonRpc.service.ts +++ b/api/src/services/pokemonRpc.service.ts @@ -25,9 +25,10 @@ const PokemonRpcService = (): PokeshopServer => ({ callback(null, pokemon); }, - async importPokemon({ request: { id } }, callback) { + async importPokemon({ request: { id, ignoreCache }}, callback) { await pokemonSyncronizer.queue({ - id: id, + id, + ignoreCache: ignoreCache ?? false, }); callback(null, { id }); diff --git a/api/src/services/pokemonSyncronizer.service.ts b/api/src/services/pokemonSyncronizer.service.ts index 0057663..51890e1 100644 --- a/api/src/services/pokemonSyncronizer.service.ts +++ b/api/src/services/pokemonSyncronizer.service.ts @@ -9,27 +9,37 @@ export const MESSAGE_GROUP = 'queue.synchronizePokemon'; export type TPokemonSyncMessage = { id: number; + ignoreCache: boolean; }; const PokemonSyncronizer = pokeApiService => { const queue = createQueueService(MESSAGE_GROUP); const repository = getPokemonRepository(); - const cache = getCacheService() + const cache = getCacheService(); + + async function getFromCache(message: TPokemonSyncMessage) { + if (message.ignoreCache) { + return null + } + + return await cache.get(`pokemon_${message.id}`) + } return { async queue(message: TPokemonSyncMessage) { return queue.send(message); }, - async sync(pokemonId: Number) { + + async sync(message: TPokemonSyncMessage) { const parentSpan = await getParentSpan(); const span = await createSpan('import pokemon', parentSpan, { kind: SpanKind.INTERNAL }); try { return await runWithSpan(span, async () => { - let pokemon = await cache.get(`pokemon_${pokemonId}`) + let pokemon = await getFromCache(message) if (!pokemon) { - pokemon = await pokeApiService.getPokemon(pokemonId); - await cache.set(`pokemon_${pokemonId}`, pokemon!!) + pokemon = await pokeApiService.getPokemon(message.id); + await cache.set(`pokemon_${message.id}`, pokemon!!) } await repository.create(new Pokemon({ ...pokemon })); diff --git a/api/src/services/queue.service.ts b/api/src/services/queue.service.ts index c7ec547..26ae46b 100644 --- a/api/src/services/queue.service.ts +++ b/api/src/services/queue.service.ts @@ -111,11 +111,11 @@ class RabbitQueueService implements QueueService { this.messageGroup = messageGroup; } - private async connect(useCache: boolean = true): Promise { + private async connect(ignoreCache: boolean = true): Promise { let lastError; for (let i = 0; i < 10; i++) { try { - return await this._connect(useCache) + return await this._connect(ignoreCache) } catch (ex) { lastError = ex await new Promise(r => setTimeout(r, 2000)); @@ -125,8 +125,8 @@ class RabbitQueueService implements QueueService { throw new Error(`could not connect after 10 tries: ${lastError?.message}`) } - private async _connect(useCache: boolean = true): Promise { - if (useCache && this.channel) { + private async _connect(ignoreCache: boolean = true): Promise { + if (ignoreCache && this.channel) { return this.channel; } diff --git a/api/src/validators/importPokemon.ts b/api/src/validators/importPokemon.ts index 29911ee..55f9afd 100644 --- a/api/src/validators/importPokemon.ts +++ b/api/src/validators/importPokemon.ts @@ -4,6 +4,8 @@ class ImportPokemon { @IsNumber() @IsPositive() public id: number; + + public ignoreCache: boolean; } export default ImportPokemon;