Skip to content

Commit

Permalink
various optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Dec 31, 2024
1 parent 353bba4 commit d724811
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 93 deletions.
19 changes: 19 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
// Use IntelliSense to learn about possible Node.js debug attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "yarn",
"runtimeExecutable": "yarn",
"runtimeArgs": [
"dev"
],
"cwd": "${workspaceRoot}",
"timeout": 10000
}
]
}
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"dotenv": "^16.4.5",
"express": "^4.18.2",
"follow-redirects": "1.15.8",
"moize": "^6.1.6",
"mongodb": "^6.3.0",
"multiformats": "^9.9.0",
"node-fetch-native": "^1.6.2",
Expand Down
2 changes: 1 addition & 1 deletion src/addn/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { pRateLimit } from 'p-ratelimit'
const _limit = pRateLimit({
interval: undefined,
rate: undefined,
concurrency: 96,
concurrency: 192,
maxDelay: undefined,
})

Expand Down
39 changes: 23 additions & 16 deletions src/algos/cats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { AlgoManager } from '../addn/algoManager'
import dotenv from 'dotenv'
import { Post } from '../db/schema'
import dbClient from '../db/dbClient'
import { Database } from '../db'
import { BskyAgent } from '@atproto/api'

dotenv.config()

Expand Down Expand Up @@ -38,8 +40,15 @@ export const handler = async (ctx: AppContext, params: QueryParams) => {

export class manager extends AlgoManager {
public name: string = shortname
public re =
/^(?!.*((\b(cat( |-)girl|cat( |-)ears|cat( |-)suit|fursona|nsfw|cat-like|furryart|doja|dojacat|anthro|anthropomorphic)\b)|#furry|#furryart|fursuit)).*\b(cat|cats|catsofbluesky|kitty|kitten|kitties)\b.*$/ims
public re: RegExp

constructor(db: Database, agent: BskyAgent) {
super(db, agent)

this.re = new RegExp(
/^(?!.*((\b(cat( |-)girl|cat( |-)ears|cat( |-)suit|fursona|nsfw|cat-like|furryart|doja|dojacat|anthro|anthropomorphic)\b)|#furry|#furryart|fursuit)).*\b(cat|cats|catsofbluesky|kitty|kitten|kitties)\b.*$/ims,
)
}

public async periodicTask() {
await this.db.removeTagFromOldPosts(
Expand All @@ -57,41 +66,39 @@ export class manager extends AlgoManager {
].includes(post.author)
)
return false

if (post.replyRoot !== null) return false

if (this.agent === null) {
await this.start()
}
if (this.agent === null) return false

let return_value: Boolean | undefined = undefined
if (this.agent === null) return false

let match = false

let matchString = ''
let matchParts: string[] = []

if (post.embed?.images) {
const imagesArr = post.embed.images
imagesArr.forEach((image) => {
matchString = `${matchString} ${image.alt}`.replace('\n', ' ')
post.embed.images.forEach((image) => {
matchParts.push(image.alt.replace('\n', ' '))
})
}

if (post.embed?.alt) {
matchString = `${matchString} ${post.embed.alt}`.replace('\n', ' ')
matchParts.push(post.embed.alt.replace('\n', ' '))
}

if (post.embed?.media?.alt) {
matchString = `${matchString} ${post.embed?.media?.alt}`.replace(
'\n',
' ',
)
matchParts.push(post.embed.media.alt.replace('\n', ' '))
}

if (post.tags) {
matchString = `${post.tags.join(' ')} ${matchString}`
matchParts.push(post.tags.join(' '))
}

matchString = `${post.text} ${matchString}`.replace('\n', ' ')
matchParts.push(post.text.replace('\n', ' '))

const matchString = matchParts.join(' ')

if (matchString.match(this.re) !== null) {
match = true
Expand Down
20 changes: 14 additions & 6 deletions src/algos/keyboards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import dotenv from 'dotenv'
import { Post } from '../db/schema'
import dbClient from '../db/dbClient'

import { BskyAgent } from '@atproto/api'
import { Database } from '../db'

dotenv.config()

// max 15 chars
Expand Down Expand Up @@ -159,12 +162,17 @@ export class manager extends AlgoManager {
'ZMK',
]

public re = new RegExp(
`^(?!.*\\b((swiss|french|italian|austrian) alps|mountain(s)?|dice)\\b).*\\b(${this.matchTerms.join(
'|',
)})(es|s)?\\b.*$`,
'ims',
)
public re: RegExp

constructor(db: Database, agent: BskyAgent) {
super(db, agent)
this.re = new RegExp(
`^(?!.*\\b((swiss|french|italian|austrian) alps|mountain(s)?|dice)\\b).*\\b(${this.matchTerms.join(
'|',
)})(es|s)?\\b.*$`,
'ims',
)
}

public async periodicTask() {
await this.db.removeTagFromOldPosts(
Expand Down
21 changes: 9 additions & 12 deletions src/algos/paxaus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,29 @@ export class manager extends AlgoManager {
if (this.agent === null) return false

let match = false

let matchString = ''
let matchParts: string[] = []

if (post.embed?.images) {
const imagesArr = post.embed.images
imagesArr.forEach((image) => {
matchString = `${matchString} ${image.alt}`.replace('\n', ' ')
post.embed.images.forEach((image) => {
matchParts.push(image.alt.replace('\n', ' '))
})
}

if (post.embed?.alt) {
matchString = `${matchString} ${post.embed.alt}`.replace('\n', ' ')
matchParts.push(post.embed.alt.replace('\n', ' '))
}

if (post.embed?.media?.alt) {
matchString = `${matchString} ${post.embed?.media?.alt}`.replace(
'\n',
' ',
)
matchParts.push(post.embed.media.alt.replace('\n', ' '))
}

if (post.tags) {
matchString = `${post.tags.join(' ')} ${matchString}`
matchParts.push(post.tags.join(' '))
}

matchString = `${post.text} ${matchString}`.replace('\n', ' ')
matchParts.push(post.text.replace('\n', ' '))

const matchString = matchParts.join(' ')

if (matchString.match(this.re) !== null) {
match = true
Expand Down
62 changes: 35 additions & 27 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
this.algoManagers.push(new algos[algo].manager(db, agent))
})

const startAlgosSequentially = async () => {
for (const algo of this.algoManagers) {
const startAlgosConcurrently = async () => {
const startPromises = this.algoManagers.map(async (algo) => {
if (await algo._start()) {
console.log(`${algo.name}: Started`)
}
}
})
await Promise.all(startPromises)
}

startAlgosSequentially()
startAlgosConcurrently()
}

public authorList: string[]
Expand Down Expand Up @@ -87,32 +88,38 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
}))

const postsToCreatePromises = postsCreated.map(async (post) => {
const algoTagsPromises = this.algoManagers.map(async (manager) => {
try {
const includeAlgo = await manager.filter_post(post)
return includeAlgo ? manager.name : null
} catch (err) {
console.error(`${manager.name}: filter failed`, err)
return null
}
})

const algoTagsResults = await Promise.all(algoTagsPromises)
const algoTags = algoTagsResults.filter((tag) => tag !== null)
try {
const algoTags = (
await Promise.all(
this.algoManagers.map(async (manager) => {
try {
const includeAlgo = await manager.filter_post(post)
return includeAlgo ? manager.name : null
} catch (err) {
console.error(`${manager.name}: filter failed`, err)
return null
}
}),
)
).filter((tag) => tag !== null)

if (algoTags.length === 0) return null
if (algoTags.length === 0) return null

const hash = crypto
.createHash('shake256', { outputLength: 12 })
.update(post.uri)
.digest('hex')
.toString()
const hash = crypto
.createHash('shake256', { outputLength: 12 })
.update(post.uri)
.digest('hex')
.toString()

return {
...post,
_id: hash,
algoTags: algoTags,
earliestCreatedIndexedAt: Math.min(post.createdAt, post.indexedAt),
return {
...post,
_id: hash,
algoTags,
earliestCreatedIndexedAt: Math.min(post.createdAt, post.indexedAt),
}
} catch (err) {
console.error('Post processing failed', err)
return null
}
})

Expand All @@ -134,6 +141,7 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
)
})
}

await Promise.all(dbOperations)
}
}
35 changes: 23 additions & 12 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Semaphore {

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>
private eventQueue: RepoEvent[] = []
private semaphore: Semaphore

constructor(public db: Database, public service: string) {
this.sub = new Subscription({
Expand All @@ -53,14 +55,12 @@ export abstract class FirehoseSubscriptionBase {
},
heartbeatIntervalMs: 30000,
})
this.semaphore = new Semaphore(64)
}

abstract handleEvent(evt: RepoEvent): Promise<void>

async run(subscriptionReconnectDelay: number) {
const maxConcurrentEvents = 16
const semaphore = new Semaphore(maxConcurrentEvents)

let handledEvents = 0
try {
for await (const evt of this.sub) {
Expand All @@ -72,15 +72,10 @@ export abstract class FirehoseSubscriptionBase {

if (includedRecords.has(collection)) {
handledEvents++
await semaphore.acquire().then(() => {
this.handleEvent(evt) // no longer awaiting this
.catch((err) => {
console.log(`err in handleEvent ${err}`)
})
.finally(() => {
semaphore.release()
})
})
this.eventQueue.push(evt)
if (this.eventQueue.length >= 10) {
await this.processEventQueue()
}
}
}
}
Expand All @@ -100,6 +95,22 @@ export abstract class FirehoseSubscriptionBase {
}
}

private async processEventQueue() {
const eventsToProcess = this.eventQueue.splice(0, 10)
await Promise.all(
eventsToProcess.map(async (evt) => {
await this.semaphore.acquire()
this.handleEvent(evt)
.catch((err) => {
console.log(`err in handleEvent ${err}`)
})
.finally(() => {
this.semaphore.release()
})
}),
)
}

async updateCursor(cursor: number) {
await this.db.updateSubStateCursor(this.service, cursor)
}
Expand Down
18 changes: 0 additions & 18 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -769,11 +769,6 @@ express@^4.17.2, express@^4.18.2:
utils-merge "1.0.1"
vary "~1.1.2"

fast-equals@^3.0.1:
version "3.0.3"
resolved "https://registry.yarnpkg.com/fast-equals/-/fast-equals-3.0.3.tgz#8e6cb4e51ca1018d87dd41982ef92758b3e4197f"
integrity sha512-NCe8qxnZFARSHGztGMZOO/PC1qa5MIFB5Hp66WdzbCRAz8U8US3bx1UTgLS49efBQPcUtO9gf5oVEY8o7y/7Kg==

fast-glob@^3.2.9:
version "3.3.2"
resolved "https://registry.yarnpkg.com/fast-glob/-/fast-glob-3.3.2.tgz#a904501e57cfdd2ffcded45e99a54fef55e46129"
Expand Down Expand Up @@ -1016,11 +1011,6 @@ methods@~1.1.2:
resolved "https://registry.yarnpkg.com/methods/-/methods-1.1.2.tgz#5529a4d67654134edcc5266656835b0f851afcee"
integrity sha512-iclAHeNqNm68zFtnZ0e+1L2yUIdvzNoauKU4WBA3VvH/vPFieF7qfRlwUZU+DA9P9bPXIS90ulxoUoCH23sV2w==

micro-memoize@^4.1.2:
version "4.1.2"
resolved "https://registry.yarnpkg.com/micro-memoize/-/micro-memoize-4.1.2.tgz#ce719c1ba1e41592f1cd91c64c5f41dcbf135f36"
integrity sha512-+HzcV2H+rbSJzApgkj0NdTakkC+bnyeiUxgT6/m7mjcz1CmM22KYFKp+EVj1sWe4UYcnriJr5uqHQD/gMHLD+g==

micromatch@^4.0.4:
version "4.0.8"
resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-4.0.8.tgz#d66fa18f3a47076789320b9b1af32bd86d9fa202"
Expand All @@ -1046,14 +1036,6 @@ [email protected]:
resolved "https://registry.yarnpkg.com/mime/-/mime-1.6.0.tgz#32cd9e5c64553bd58d19a568af452acff04981b1"
integrity sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==

moize@^6.1.6:
version "6.1.6"
resolved "https://registry.yarnpkg.com/moize/-/moize-6.1.6.tgz#ac2e723e74b951875fe2c0c3433405c2b098c3e6"
integrity sha512-vSKdIUO61iCmTqhdoIDrqyrtp87nWZUmBPniNjO0fX49wEYmyDO4lvlnFXiGcaH1JLE/s/9HbiK4LSHsbiUY6Q==
dependencies:
fast-equals "^3.0.1"
micro-memoize "^4.1.2"

mongodb-connection-string-url@^3.0.0:
version "3.0.1"
resolved "https://registry.yarnpkg.com/mongodb-connection-string-url/-/mongodb-connection-string-url-3.0.1.tgz#c13e6ac284ae401752ebafdb8cd7f16c6723b141"
Expand Down

0 comments on commit d724811

Please sign in to comment.