Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow user to disable cache lookup via request body #44

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/graphql/import.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import PokemonSyncronizer from '@pokemon/services/pokemonSyncronizer.service';
const pokeApiService = new PokeAPIService();
const pokemonSyncronizer = PokemonSyncronizer(pokeApiService);

const importPokemon = async ({ id = 0 }) => {
await pokemonSyncronizer.queue({ id });
const importPokemon = async ({ id = 0, ignoreCache = false }) => {
await pokemonSyncronizer.queue({ id, ignoreCache });

return { id };
};
Expand Down
3 changes: 2 additions & 1 deletion api/src/handlers/import.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ const pokeApiService = new PokeAPIService();
const pokemonSyncronizer = PokemonSyncronizer(pokeApiService);

const importPokemon = async (ctx: { body: ImportPokemon }) => {
const { id = 0 } = ctx.body;
const { id = 0, ignoreCache = false } = ctx.body;

await pokemonSyncronizer.queue({
id: id,
ignoreCache: ignoreCache,
});

return {
Expand Down
4 changes: 2 additions & 2 deletions api/src/handlers/streamSyncronize.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ const pokemonSyncronizationHandler = async (message: KafkaMessage) => {
return;
}

const { id }: TPokemonSyncMessage = JSON.parse(messageString);
await pokemonSyncronizer.sync(id);
const msg: TPokemonSyncMessage = JSON.parse(messageString);
await pokemonSyncronizer.sync(msg);
};

export default function setupWorker(streamService: StreamingService<TPokemonSyncMessage>) {
Expand Down
4 changes: 2 additions & 2 deletions api/src/handlers/syncronize.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import ampqlib from 'amqplib';
const pokemonSyncronizationHandler = async (message: ampqlib.ConsumeMessage) => {
const pokeApiService = new PokeAPIService();
const pokemonSyncronizer = PokemonSyncronizer(pokeApiService);
const { id }: TPokemonSyncMessage = JSON.parse(message.content.toString());
const pokemonSyncMessage: TPokemonSyncMessage = JSON.parse(message.content.toString());

await pokemonSyncronizer.sync(id);
await pokemonSyncronizer.sync(pokemonSyncMessage);
};

export default function setupWorker(queueService: QueueService<TPokemonSyncMessage>) {
Expand Down
1 change: 1 addition & 0 deletions api/src/protos/pokeshop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const protobufPackage = "pokeshop";
export interface ImportPokemonRequest {
id: number;
isFixed?: boolean | undefined;
ignoreCache?: boolean | undefined;
}

export interface GetPokemonRequest {
Expand Down
5 changes: 3 additions & 2 deletions api/src/services/pokemonRpc.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ const PokemonRpcService = (): PokeshopServer => ({

callback(null, pokemon);
},
async importPokemon({ request: { id } }, callback) {
async importPokemon({ request: { id, ignoreCache }}, callback) {
await pokemonSyncronizer.queue({
id: id,
id,
ignoreCache: ignoreCache ?? false,
});

callback(null, { id });
Expand Down
20 changes: 15 additions & 5 deletions api/src/services/pokemonSyncronizer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,37 @@ export const MESSAGE_GROUP = 'queue.synchronizePokemon';

export type TPokemonSyncMessage = {
id: number;
ignoreCache: boolean;
};

const PokemonSyncronizer = pokeApiService => {
const queue = createQueueService<TPokemonSyncMessage>(MESSAGE_GROUP);
const repository = getPokemonRepository();
const cache = getCacheService<TPokemon>()
const cache = getCacheService<TPokemon>();

async function getFromCache(message: TPokemonSyncMessage) {
if (message.ignoreCache) {
return null
}

return await cache.get(`pokemon_${message.id}`)
}

return {
async queue(message: TPokemonSyncMessage) {
return queue.send(message);
},
async sync(pokemonId: Number) {

async sync(message: TPokemonSyncMessage) {
const parentSpan = await getParentSpan();
const span = await createSpan('import pokemon', parentSpan, { kind: SpanKind.INTERNAL });

try {
return await runWithSpan(span, async () => {
let pokemon = await cache.get(`pokemon_${pokemonId}`)
let pokemon = await getFromCache(message)
if (!pokemon) {
pokemon = await pokeApiService.getPokemon(pokemonId);
await cache.set(`pokemon_${pokemonId}`, pokemon!!)
pokemon = await pokeApiService.getPokemon(message.id);
await cache.set(`pokemon_${message.id}`, pokemon!!)
}

await repository.create(new Pokemon({ ...pokemon }));
Expand Down
8 changes: 4 additions & 4 deletions api/src/services/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ class RabbitQueueService<T> implements QueueService<T> {
this.messageGroup = messageGroup;
}

private async connect(useCache: boolean = true): Promise<ampqlib.Channel> {
private async connect(ignoreCache: boolean = true): Promise<ampqlib.Channel> {
let lastError;
for (let i = 0; i < 10; i++) {
try {
return await this._connect(useCache)
return await this._connect(ignoreCache)
} catch (ex) {
lastError = ex
await new Promise(r => setTimeout(r, 2000));
Expand All @@ -125,8 +125,8 @@ class RabbitQueueService<T> implements QueueService<T> {
throw new Error(`could not connect after 10 tries: ${lastError?.message}`)
}

private async _connect(useCache: boolean = true): Promise<ampqlib.Channel> {
if (useCache && this.channel) {
private async _connect(ignoreCache: boolean = true): Promise<ampqlib.Channel> {
if (ignoreCache && this.channel) {
return this.channel;
}

Expand Down
2 changes: 2 additions & 0 deletions api/src/validators/importPokemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ class ImportPokemon {
@IsNumber()
@IsPositive()
public id: number;

public ignoreCache: boolean;
}

export default ImportPokemon;
Loading