From 76eaeace18ef00aedec8c2adc16d97f1804b77b1 Mon Sep 17 00:00:00 2001 From: rjawesome Date: Wed, 26 Jun 2024 15:25:37 -0700 Subject: [PATCH] new threading messaging type --- src/batch_edge_query.ts | 4 ++-- src/cache_handler.ts | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/batch_edge_query.ts b/src/batch_edge_query.ts index d5504b21..2cdfe3d9 100644 --- a/src/batch_edge_query.ts +++ b/src/batch_edge_query.ts @@ -8,7 +8,7 @@ import CacheHandler from './cache_handler'; import { threadId } from 'worker_threads'; import MetaKG from '@biothings-explorer/smartapi-kg'; import { StampedLog } from '@biothings-explorer/utils'; -import { QueryHandlerOptions } from '@biothings-explorer/types'; +import { QueryHandlerOptions, ThreadMessage } from '@biothings-explorer/types'; import QEdge from './query_edge'; import { UnavailableAPITracker } from './types'; import { Record } from '@biothings-explorer/api-response-transform'; @@ -141,7 +141,7 @@ export default class BatchEdgeQueryHandler { if (nonCachedQEdges.length === 0) { queryRecords = []; if (global.parentPort) { - global.parentPort.postMessage({ threadId, cacheDone: true }); + global.parentPort.postMessage({ threadId, type: 'cacheDone', value: true } satisfies ThreadMessage); } } else { debug('Start to convert qEdges into APIEdges....'); diff --git a/src/cache_handler.ts b/src/cache_handler.ts index a52c6d1f..ea0fc10a 100644 --- a/src/cache_handler.ts +++ b/src/cache_handler.ts @@ -11,7 +11,7 @@ import { Record, RecordPackage } from '@biothings-explorer/api-response-transfor import { threadId } from 'worker_threads'; import MetaKG from '@biothings-explorer/smartapi-kg'; import QEdge from './query_edge'; -import { QueryHandlerOptions } from '@biothings-explorer/types'; +import { QueryHandlerOptions, ThreadMessage } from '@biothings-explorer/types'; export interface RecordPacksByQedgeMetaKGHash { [QEdgeHash: string]: RecordPackage; @@ -212,12 +212,12 @@ export default class CacheHandler { async cacheEdges(queryRecords: Record[]): Promise { if (this.cacheEnabled === false || process.env.INTERNAL_DISABLE_REDIS === 'true') { if (global.parentPort) { - global.parentPort.postMessage({ threadId, cacheDone: true }); + global.parentPort.postMessage({ threadId, type: 'cacheDone', value: true } satisfies ThreadMessage); } return; } if (global.parentPort) { - global.parentPort.postMessage({ threadId, cacheInProgress: 1 }); + global.parentPort.postMessage({ threadId, type: 'cacheInProgress', value: 1 } satisfies ThreadMessage); } debug('Start to cache query records.'); try { @@ -229,7 +229,7 @@ export default class CacheHandler { // lock to prevent caching to/reading from actively caching edge const redisID = 'bte:edgeCache:' + hash; if (global.parentPort) { - global.parentPort.postMessage({ threadId, addCacheKey: redisID }); + global.parentPort.postMessage({ threadId, type: 'addCacheKey', value: redisID } satisfies ThreadMessage); } await redisClient.client.usingLock([`redisLock:${redisID}`, 'redisLock:EdgeCaching'], 600000, async () => { try { @@ -267,7 +267,7 @@ export default class CacheHandler { ); } finally { if (global.parentPort) { - global.parentPort.postMessage({ threadId, completeCacheKey: redisID }); + global.parentPort.postMessage({ threadId, type: 'completeCacheKey', value: redisID } satisfies ThreadMessage); } } }); @@ -284,7 +284,7 @@ export default class CacheHandler { debug(`Caching failed due to ${error}. This does not terminate the query.`); } finally { if (global.parentPort) { - global.parentPort.postMessage({ threadId, cacheDone: 1 }); + global.parentPort.postMessage({ threadId, type: 'cacheDone', value: 1 } satisfies ThreadMessage); } } }