From edba6ece4827e7552fb0fb2b91f965aa6f6e989b Mon Sep 17 00:00:00 2001 From: WRadoslaw Date: Mon, 22 Apr 2024 13:30:41 +0200 Subject: [PATCH] Persisted queue idea --- docker-compose.yml | 8 ++ .../handlers/videoConsumed.ts | 2 +- src/utils/RecommendationServiceManager.ts | 70 +++++++++++---- src/utils/persistingQueue.ts | 88 +++++++++++++++++++ 4 files changed, 148 insertions(+), 20 deletions(-) create mode 100644 src/utils/persistingQueue.ts diff --git a/docker-compose.yml b/docker-compose.yml index d53984a58..7a086e2b1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,6 +48,7 @@ services: depends_on: - orion_db volumes: + - processor_queue:/app/persistedQueue - type: bind source: . target: /orion @@ -68,6 +69,7 @@ services: depends_on: - orion_db volumes: + - graphql_queue:/app/persistedQueue - type: bind source: . target: /orion @@ -90,6 +92,7 @@ services: depends_on: - orion_db volumes: + - auth_api_queue:/app/persistedQueue - type: bind source: . target: /orion @@ -112,6 +115,7 @@ services: depends_on: - orion_db volumes: + - interactions_api_queue:/app/persistedQueue - type: bind source: . target: /orion @@ -123,6 +127,10 @@ services: volumes: orion_db_data: interactions_db_data: + interactions_api_queue: + auth_api_queue: + graphql_queue: + processor_queue: networks: default: diff --git a/src/interactions-server/handlers/videoConsumed.ts b/src/interactions-server/handlers/videoConsumed.ts index e69384f7d..b4cfb1407 100644 --- a/src/interactions-server/handlers/videoConsumed.ts +++ b/src/interactions-server/handlers/videoConsumed.ts @@ -4,7 +4,7 @@ import { components } from '../generated/api-types' import { AuthContext } from '../../utils/auth' import { recommendationServiceManager } from '../../utils/RecommendationServiceManager' import { singlePurchaseLimiter } from '../interactionsLimiter' -import { PurchaseModel, RatingModel } from '../interactionsEm' +import { PurchaseModel } from '../interactionsEm' type ReqParams = Record type ResBody = diff --git a/src/utils/RecommendationServiceManager.ts b/src/utils/RecommendationServiceManager.ts index 4de43c37b..0b8a1323b 100644 --- a/src/utils/RecommendationServiceManager.ts +++ b/src/utils/RecommendationServiceManager.ts @@ -8,6 +8,8 @@ import { import { createLogger } from '@subsquid/logger' import { randomUUID } from 'crypto' import { stringToHex } from '@polkadot/util' +import { PersistentQueue } from './persistingQueue' +import path from 'path' export type RecommendationItemId = `${string}-${'video' | 'channel'}` @@ -41,14 +43,14 @@ const recommendationServiceLogger = createLogger('recommendationsService') const isDevEnv = process.env.ORION_ENV === 'development' export class RecommendationServiceManager { - private _queue: ClientRequests.Request[] = [] + private _queue: PersistentQueue private client: ApiClient | null = null // if orion is launched in dev mode, always sync videos private _enabled = false private _loopInitialized = false - constructor() { + constructor(queueFilePath: string) { if ( !process.env.RECOMMENDATION_SERVICE_PRIVATE_KEY || !process.env.RECOMMENDATION_SERVICE_DATABASE || @@ -59,6 +61,7 @@ export class RecommendationServiceManager { ) return } + this._queue = new PersistentQueue(queueFilePath) this.client = new ApiClient( process.env.RECOMMENDATION_SERVICE_DATABASE, process.env.RECOMMENDATION_SERVICE_PRIVATE_KEY, @@ -94,12 +97,15 @@ export class RecommendationServiceManager { const request = new ClientRequests.SetItemValues(`${video.id}-video`, actionObject, { cascadeCreate: true, }) - this._queue.push(request) + this._queue.addToQueue(request).catch(() => undefined) } scheduleVideoDeletion(videoId: string) { + if (!this._enabled) { + return + } const actionObject = new ClientRequests.DeleteItem(`${videoId}-video`) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } scheduleChannelUpsert(channel: Channel) { @@ -121,12 +127,16 @@ export class RecommendationServiceManager { const request = new ClientRequests.SetItemValues(`${channel.id}-channel`, actionObject, { cascadeCreate: true, }) - this._queue.push(request) + this._queue.addToQueue(request).catch(() => undefined) } scheduleChannelDeletion(channelId: string) { + if (!this._enabled) { + return + } + const actionObject = new ClientRequests.DeleteItem(`${channelId}-channel`) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } scheduleUserUpsert(user: RSUser) { @@ -141,7 +151,7 @@ export class RecommendationServiceManager { cascadeCreate: true, } ) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } // this interaction has big model value and should we used for @@ -156,7 +166,7 @@ export class RecommendationServiceManager { cascadeCreate: true, recommId, }) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } // this interaction should be dispatched when user clicks a video to see it @@ -176,7 +186,7 @@ export class RecommendationServiceManager { recommId, duration, }) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } // this interaction is for user engagement level @@ -201,7 +211,7 @@ export class RecommendationServiceManager { recommId, } ) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } scheduleItemBookmark(itemId: RecommendationItemId, userId: string, recommId?: string) { @@ -214,7 +224,7 @@ export class RecommendationServiceManager { cascadeCreate: !isDevEnv, recommId, }) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } deleteItemBookmark(itemId: RecommendationItemId, userId: string) { @@ -223,7 +233,7 @@ export class RecommendationServiceManager { } const actionObject = new ClientRequests.DeleteBookmark(this.mapUserId(userId), itemId) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } scheduleItemRating( @@ -244,7 +254,7 @@ export class RecommendationServiceManager { cascadeCreate: !isDevEnv, recommId, }) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } deleteItemRating(itemId: RecommendationItemId, userId: string) { @@ -253,7 +263,7 @@ export class RecommendationServiceManager { } const actionObject = new ClientRequests.DeleteRating(userId, itemId) - this._queue.push(actionObject) + this._queue.addToQueue(actionObject).catch(() => undefined) } enableExport() { @@ -298,6 +308,10 @@ export class RecommendationServiceManager { } async recommendItemsToUser(userId?: string, opts?: CommonOptions) { + if (!this._enabled) { + return + } + recommendationServiceLogger.info( `Getting items recommendations to ${userId || 'empty user'}(${this.mapUserId(userId ?? '')})` ) @@ -322,6 +336,10 @@ export class RecommendationServiceManager { } async recommendNextItems(recommId: string, opts?: CommonOptions) { + if (!this._enabled) { + return + } + const request = new ClientRequests.RecommendNextItems(recommId, opts?.limit ?? 10) const res = await this.client?.send(request) if (!res) { @@ -332,6 +350,10 @@ export class RecommendationServiceManager { } async recommendItemsToItem(itemId: RecommendationItemId, userId?: string, opts?: CommonOptions) { + if (!this._enabled) { + return + } + const request = new ClientRequests.RecommendItemsToItem( itemId, userId ? this.mapUserId(userId) : randomUUID(), @@ -354,6 +376,10 @@ export class RecommendationServiceManager { } async recommendNextVideo(itemId: RecommendationItemId, userId?: string, opts?: CommonOptions) { + if (!this._enabled) { + return + } + const request = new ClientRequests.RecommendItemsToItem( itemId, userId ? this.mapUserId(userId) : randomUUID(), @@ -414,14 +440,20 @@ export class RecommendationServiceManager { private async _batchUpdateLoop(intervalMs: number): Promise { while (true) { - if (this._queue.length) { - const batchArray = [...this._queue] - this._queue.length = 0 - await this.sendBatchRequest(batchArray) + const queue = this._queue.getQueue() + if (queue.length) { + await this._queue.lockQueue(async () => { + await this.sendBatchRequest(queue) + }) + await this._queue.flushQueue() } await new Promise((resolve) => setTimeout(resolve, intervalMs)) } } } -export const recommendationServiceManager = new RecommendationServiceManager() +const dirPath = path.resolve('../app/persistedQueue') + +export const recommendationServiceManager = new RecommendationServiceManager( + `${dirPath}/interactions` +) diff --git a/src/utils/persistingQueue.ts b/src/utils/persistingQueue.ts new file mode 100644 index 000000000..75672c68f --- /dev/null +++ b/src/utils/persistingQueue.ts @@ -0,0 +1,88 @@ +import fs from 'fs' +import AsyncLock from 'async-lock' + +export class PersistentQueue { + private queue: T[] = [] + private writeStream: fs.WriteStream | null = null + private initialized = false + private asyncLock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER }) + filepath = '' + + constructor(filepath: string) { + this.filepath = filepath + this.queue = [] + this.loadQueue() + this.writeStream = fs.createWriteStream(this.filepath, { flags: 'a' }) + this.initialized = true + } + + loadQueue() { + try { + if (!fs.existsSync(this.filepath)) { + return + } + const data = fs.readFileSync(this.filepath, { encoding: 'utf-8' }) + this.queue = JSON.parse(`[${data.split('\n').slice(0, -1).join(',')}]`) + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + console.error('Failed to load queue:', error) + } + } + } + + lockQueue(cb: () => Promise) { + return this.asyncLock.acquire( + this.filepath, + async (done) => { + await cb() + done() + }, + { skipQueue: true } + ) + } + + flushQueue() { + return this.asyncLock.acquire( + this.filepath, + async (done) => { + this.writeStream?.end() + fs.truncate(this.filepath, 0, (err) => { + if (err) { + done() + throw err + } + this.queue = [] + this.writeStream = fs.createWriteStream(this.filepath, { flags: 'a' }) + done() + }) + }, + { skipQueue: true } + ) + } + + async addToQueue(item: T) { + if (!this.initialized) { + return + } + await this.writeItemToFile(item) + this.queue.push(item) + } + + writeItemToFile(item: T) { + return this.asyncLock.acquire(this.filepath, async (done) => { + const dataToWrite = JSON.stringify(item).trim().replace('\n', '') + '\n' + this.writeStream?.write(dataToWrite, 'utf8', (err) => { + if (err) { + done() + throw err + } else { + done() + } + }) + }) + } + + getQueue() { + return this.queue + } +}