diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..c87433f --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,19 @@ +{ + // Use IntelliSense to learn about possible Node.js debug attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "yarn", + "runtimeExecutable": "yarn", + "runtimeArgs": [ + "dev" + ], + "cwd": "${workspaceRoot}", + "timeout": 10000 + } + ] +} \ No newline at end of file diff --git a/package.json b/package.json index 975ecf3..c56eec6 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,6 @@ "dotenv": "^16.4.5", "express": "^4.18.2", "follow-redirects": "1.15.8", - "moize": "^6.1.6", "mongodb": "^6.3.0", "multiformats": "^9.9.0", "node-fetch-native": "^1.6.2", diff --git a/src/addn/rateLimit.ts b/src/addn/rateLimit.ts index 96de547..b789efa 100644 --- a/src/addn/rateLimit.ts +++ b/src/addn/rateLimit.ts @@ -3,7 +3,7 @@ import { pRateLimit } from 'p-ratelimit' const _limit = pRateLimit({ interval: undefined, rate: undefined, - concurrency: 96, + concurrency: 192, maxDelay: undefined, }) diff --git a/src/algos/cats.ts b/src/algos/cats.ts index ab1154e..ad7e84d 100644 --- a/src/algos/cats.ts +++ b/src/algos/cats.ts @@ -4,6 +4,8 @@ import { AlgoManager } from '../addn/algoManager' import dotenv from 'dotenv' import { Post } from '../db/schema' import dbClient from '../db/dbClient' +import { Database } from '../db' +import { BskyAgent } from '@atproto/api' dotenv.config() @@ -38,8 +40,15 @@ export const handler = async (ctx: AppContext, params: QueryParams) => { export class manager extends AlgoManager { public name: string = shortname - public re = - /^(?!.*((\b(cat( |-)girl|cat( |-)ears|cat( |-)suit|fursona|nsfw|cat-like|furryart|doja|dojacat|anthro|anthropomorphic)\b)|#furry|#furryart|fursuit)).*\b(cat|cats|catsofbluesky|kitty|kitten|kitties)\b.*$/ims + public re: RegExp + + constructor(db: Database, agent: BskyAgent) { + super(db, agent) + + this.re = new RegExp( + /^(?!.*((\b(cat( |-)girl|cat( |-)ears|cat( |-)suit|fursona|nsfw|cat-like|furryart|doja|dojacat|anthro|anthropomorphic)\b)|#furry|#furryart|fursuit)).*\b(cat|cats|catsofbluesky|kitty|kitten|kitties)\b.*$/ims, + ) + } public async periodicTask() { await this.db.removeTagFromOldPosts( @@ -57,41 +66,39 @@ export class manager extends AlgoManager { ].includes(post.author) ) return false + if (post.replyRoot !== null) return false + if (this.agent === null) { await this.start() } - if (this.agent === null) return false - let return_value: Boolean | undefined = undefined + if (this.agent === null) return false let match = false - - let matchString = '' + let matchParts: string[] = [] if (post.embed?.images) { - const imagesArr = post.embed.images - imagesArr.forEach((image) => { - matchString = `${matchString} ${image.alt}`.replace('\n', ' ') + post.embed.images.forEach((image) => { + matchParts.push(image.alt.replace('\n', ' ')) }) } if (post.embed?.alt) { - matchString = `${matchString} ${post.embed.alt}`.replace('\n', ' ') + matchParts.push(post.embed.alt.replace('\n', ' ')) } if (post.embed?.media?.alt) { - matchString = `${matchString} ${post.embed?.media?.alt}`.replace( - '\n', - ' ', - ) + matchParts.push(post.embed.media.alt.replace('\n', ' ')) } if (post.tags) { - matchString = `${post.tags.join(' ')} ${matchString}` + matchParts.push(post.tags.join(' ')) } - matchString = `${post.text} ${matchString}`.replace('\n', ' ') + matchParts.push(post.text.replace('\n', ' ')) + + const matchString = matchParts.join(' ') if (matchString.match(this.re) !== null) { match = true diff --git a/src/algos/keyboards.ts b/src/algos/keyboards.ts index 61acc24..e84acbb 100644 --- a/src/algos/keyboards.ts +++ b/src/algos/keyboards.ts @@ -5,6 +5,9 @@ import dotenv from 'dotenv' import { Post } from '../db/schema' import dbClient from '../db/dbClient' +import { BskyAgent } from '@atproto/api' +import { Database } from '../db' + dotenv.config() // max 15 chars @@ -159,12 +162,17 @@ export class manager extends AlgoManager { 'ZMK', ] - public re = new RegExp( - `^(?!.*\\b((swiss|french|italian|austrian) alps|mountain(s)?|dice)\\b).*\\b(${this.matchTerms.join( - '|', - )})(es|s)?\\b.*$`, - 'ims', - ) + public re: RegExp + + constructor(db: Database, agent: BskyAgent) { + super(db, agent) + this.re = new RegExp( + `^(?!.*\\b((swiss|french|italian|austrian) alps|mountain(s)?|dice)\\b).*\\b(${this.matchTerms.join( + '|', + )})(es|s)?\\b.*$`, + 'ims', + ) + } public async periodicTask() { await this.db.removeTagFromOldPosts( diff --git a/src/algos/paxaus.ts b/src/algos/paxaus.ts index b73368d..aeb5d4f 100644 --- a/src/algos/paxaus.ts +++ b/src/algos/paxaus.ts @@ -58,32 +58,29 @@ export class manager extends AlgoManager { if (this.agent === null) return false let match = false - - let matchString = '' + let matchParts: string[] = [] if (post.embed?.images) { - const imagesArr = post.embed.images - imagesArr.forEach((image) => { - matchString = `${matchString} ${image.alt}`.replace('\n', ' ') + post.embed.images.forEach((image) => { + matchParts.push(image.alt.replace('\n', ' ')) }) } if (post.embed?.alt) { - matchString = `${matchString} ${post.embed.alt}`.replace('\n', ' ') + matchParts.push(post.embed.alt.replace('\n', ' ')) } if (post.embed?.media?.alt) { - matchString = `${matchString} ${post.embed?.media?.alt}`.replace( - '\n', - ' ', - ) + matchParts.push(post.embed.media.alt.replace('\n', ' ')) } if (post.tags) { - matchString = `${post.tags.join(' ')} ${matchString}` + matchParts.push(post.tags.join(' ')) } - matchString = `${post.text} ${matchString}`.replace('\n', ' ') + matchParts.push(post.text.replace('\n', ' ')) + + const matchString = matchParts.join(' ') if (matchString.match(this.re) !== null) { match = true diff --git a/src/subscription.ts b/src/subscription.ts index cdd24c9..5ce48a5 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -39,15 +39,16 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { this.algoManagers.push(new algos[algo].manager(db, agent)) }) - const startAlgosSequentially = async () => { - for (const algo of this.algoManagers) { + const startAlgosConcurrently = async () => { + const startPromises = this.algoManagers.map(async (algo) => { if (await algo._start()) { console.log(`${algo.name}: Started`) } - } + }) + await Promise.all(startPromises) } - startAlgosSequentially() + startAlgosConcurrently() } public authorList: string[] @@ -87,32 +88,38 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { })) const postsToCreatePromises = postsCreated.map(async (post) => { - const algoTagsPromises = this.algoManagers.map(async (manager) => { - try { - const includeAlgo = await manager.filter_post(post) - return includeAlgo ? manager.name : null - } catch (err) { - console.error(`${manager.name}: filter failed`, err) - return null - } - }) - - const algoTagsResults = await Promise.all(algoTagsPromises) - const algoTags = algoTagsResults.filter((tag) => tag !== null) + try { + const algoTags = ( + await Promise.all( + this.algoManagers.map(async (manager) => { + try { + const includeAlgo = await manager.filter_post(post) + return includeAlgo ? manager.name : null + } catch (err) { + console.error(`${manager.name}: filter failed`, err) + return null + } + }), + ) + ).filter((tag) => tag !== null) - if (algoTags.length === 0) return null + if (algoTags.length === 0) return null - const hash = crypto - .createHash('shake256', { outputLength: 12 }) - .update(post.uri) - .digest('hex') - .toString() + const hash = crypto + .createHash('shake256', { outputLength: 12 }) + .update(post.uri) + .digest('hex') + .toString() - return { - ...post, - _id: hash, - algoTags: algoTags, - earliestCreatedIndexedAt: Math.min(post.createdAt, post.indexedAt), + return { + ...post, + _id: hash, + algoTags, + earliestCreatedIndexedAt: Math.min(post.createdAt, post.indexedAt), + } + } catch (err) { + console.error('Post processing failed', err) + return null } }) @@ -134,6 +141,7 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { ) }) } + await Promise.all(dbOperations) } } diff --git a/src/util/subscription.ts b/src/util/subscription.ts index d5e6cae..6e5cfbc 100755 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -42,6 +42,8 @@ class Semaphore { export abstract class FirehoseSubscriptionBase { public sub: Subscription + private eventQueue: RepoEvent[] = [] + private semaphore: Semaphore constructor(public db: Database, public service: string) { this.sub = new Subscription({ @@ -53,14 +55,12 @@ export abstract class FirehoseSubscriptionBase { }, heartbeatIntervalMs: 30000, }) + this.semaphore = new Semaphore(64) } abstract handleEvent(evt: RepoEvent): Promise async run(subscriptionReconnectDelay: number) { - const maxConcurrentEvents = 16 - const semaphore = new Semaphore(maxConcurrentEvents) - let handledEvents = 0 try { for await (const evt of this.sub) { @@ -72,15 +72,10 @@ export abstract class FirehoseSubscriptionBase { if (includedRecords.has(collection)) { handledEvents++ - await semaphore.acquire().then(() => { - this.handleEvent(evt) // no longer awaiting this - .catch((err) => { - console.log(`err in handleEvent ${err}`) - }) - .finally(() => { - semaphore.release() - }) - }) + this.eventQueue.push(evt) + if (this.eventQueue.length >= 10) { + await this.processEventQueue() + } } } } @@ -100,6 +95,22 @@ export abstract class FirehoseSubscriptionBase { } } + private async processEventQueue() { + const eventsToProcess = this.eventQueue.splice(0, 10) + await Promise.all( + eventsToProcess.map(async (evt) => { + await this.semaphore.acquire() + this.handleEvent(evt) + .catch((err) => { + console.log(`err in handleEvent ${err}`) + }) + .finally(() => { + this.semaphore.release() + }) + }), + ) + } + async updateCursor(cursor: number) { await this.db.updateSubStateCursor(this.service, cursor) } diff --git a/yarn.lock b/yarn.lock index 6e8824a..cad6f46 100644 --- a/yarn.lock +++ b/yarn.lock @@ -769,11 +769,6 @@ express@^4.17.2, express@^4.18.2: utils-merge "1.0.1" vary "~1.1.2" -fast-equals@^3.0.1: - version "3.0.3" - resolved "https://registry.yarnpkg.com/fast-equals/-/fast-equals-3.0.3.tgz#8e6cb4e51ca1018d87dd41982ef92758b3e4197f" - integrity sha512-NCe8qxnZFARSHGztGMZOO/PC1qa5MIFB5Hp66WdzbCRAz8U8US3bx1UTgLS49efBQPcUtO9gf5oVEY8o7y/7Kg== - fast-glob@^3.2.9: version "3.3.2" resolved "https://registry.yarnpkg.com/fast-glob/-/fast-glob-3.3.2.tgz#a904501e57cfdd2ffcded45e99a54fef55e46129" @@ -1016,11 +1011,6 @@ methods@~1.1.2: resolved "https://registry.yarnpkg.com/methods/-/methods-1.1.2.tgz#5529a4d67654134edcc5266656835b0f851afcee" integrity sha512-iclAHeNqNm68zFtnZ0e+1L2yUIdvzNoauKU4WBA3VvH/vPFieF7qfRlwUZU+DA9P9bPXIS90ulxoUoCH23sV2w== -micro-memoize@^4.1.2: - version "4.1.2" - resolved "https://registry.yarnpkg.com/micro-memoize/-/micro-memoize-4.1.2.tgz#ce719c1ba1e41592f1cd91c64c5f41dcbf135f36" - integrity sha512-+HzcV2H+rbSJzApgkj0NdTakkC+bnyeiUxgT6/m7mjcz1CmM22KYFKp+EVj1sWe4UYcnriJr5uqHQD/gMHLD+g== - micromatch@^4.0.4: version "4.0.8" resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.8.tgz#d66fa18f3a47076789320b9b1af32bd86d9fa202" @@ -1046,14 +1036,6 @@ mime@1.6.0: resolved "https://registry.yarnpkg.com/mime/-/mime-1.6.0.tgz#32cd9e5c64553bd58d19a568af452acff04981b1" integrity sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg== -moize@^6.1.6: - version "6.1.6" - resolved "https://registry.yarnpkg.com/moize/-/moize-6.1.6.tgz#ac2e723e74b951875fe2c0c3433405c2b098c3e6" - integrity sha512-vSKdIUO61iCmTqhdoIDrqyrtp87nWZUmBPniNjO0fX49wEYmyDO4lvlnFXiGcaH1JLE/s/9HbiK4LSHsbiUY6Q== - dependencies: - fast-equals "^3.0.1" - micro-memoize "^4.1.2" - mongodb-connection-string-url@^3.0.0: version "3.0.1" resolved "https://registry.yarnpkg.com/mongodb-connection-string-url/-/mongodb-connection-string-url-3.0.1.tgz#c13e6ac284ae401752ebafdb8cd7f16c6723b141"