Skip to content

Commit

Permalink
remove logging when not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 5, 2024
1 parent c42ee50 commit 065a754
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/env/limits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const limits = {
// ***** APPLICATION CONFIG *****
AUTHOR_FEED_MAX_RESULTS: 30, // sets the limit parameter requesting an author's posts - 30 is what bsky.app uses so the cache should be fresher
DB_WRITE_INTERVAL_MS: 15 * 60 * 1000, // time between pauses to update firehose sequence and scavenge cache - higher is generally better but you will have to reprocess this much on restart
MAX_CONCURRENT_PROCESSCOMMITS: 256, // this influences # of http requests, so lower can be faster
MAX_CONCURRENT_PROCESSCOMMITS: 128, // this influences # of http requests, so lower can be faster
MAX_FIREHOSE_DELAY: 3 * 60 * 1000, // how long between events before considering the firehose stalled
MIN_FIREHOSE_OPS: 30, // the minimum number of operations per interval before considering the firehose stalled
MAX_PENDING_INSERTS_WAIT_MS: 30 * 1000, // the maximum amount of time between inserting pending label events
Expand Down
41 changes: 26 additions & 15 deletions src/lib/processCommit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import wait from '@/helpers/wait.js'
import Denque from 'denque'
import { insertListItemOperation } from '@/lib/insertListItemOperation.js'

const debug: Boolean = env.DANGEROUSLY_EXPOSE_SECRETS

type Commit = {
record?: any
atURL: any
Expand Down Expand Up @@ -53,7 +55,7 @@ export function validateCommit(commit: Commit): { seq?: number; did?: string } {

const matchDid = did.match(regexDid)
if (!matchDid) {
logger.debug(`${seq}: invalid did at ${commit.repo}`)
if (debug) logger.debug(`${seq}: invalid did at ${commit.repo}`)
return {}
}
return { seq: seq, did: did }
Expand Down Expand Up @@ -83,7 +85,7 @@ export function _processCommit(commit: Commit): Promise<void> {
}

failTimeout = setTimeout(() => {
logger.debug(`${seq}: took too long, failing...`)
if (debug) logger.debug(`${seq}: took too long, failing...`)
reject(new Error(`ProcessCommitTimeout`))
}, env.limits.MAX_PROCESSING_TIME_MS)

Expand All @@ -96,14 +98,18 @@ export function _processCommit(commit: Commit): Promise<void> {
commit.meta['$type'] == 'com.atproto.sync.subscribeRepos#identity'
) {
if (purgePlcDirectoryCache(did, time)) {
logger.debug(`got identity change, refreshing plc cache of ${did}`)
if (debug)
logger.debug(
`got identity change, refreshing plc cache of ${did}`,
)
}
}
if (commit.record['$type'] === 'app.bsky.actor.profile') {
if (purgeDetailsCache(did, time)) {
logger.debug(
`got profile change, refreshing profile cache of ${did}`,
)
if (debug)
logger.debug(
`got profile change, refreshing profile cache of ${did}`,
)
}
}
}
Expand All @@ -113,7 +119,8 @@ export function _processCommit(commit: Commit): Promise<void> {
await getUserDetails(did)

if (tmpData.error) {
logger.debug(`${seq}: error ${tmpData.error} retreiving ${did}`)
if (debug)
logger.debug(`${seq}: error ${tmpData.error} retreiving ${did}`)

clearTimeout(failTimeout)
reject()
Expand All @@ -139,7 +146,8 @@ export function _processCommit(commit: Commit): Promise<void> {
/at:\/\/(did:[^:]+:[^\/]+)\/app\.bsky\.feed\.post\/([^\/]+)/
const match = commit.atURL.match(regex)
if (!match) {
logger.debug(`${seq}: invalid commit URL ${commit.atURL}`)
if (debug)
logger.debug(`${seq}: invalid commit URL ${commit.atURL}`)

clearTimeout(failTimeout)
reject()
Expand Down Expand Up @@ -253,7 +261,7 @@ export function _processCommit(commit: Commit): Promise<void> {

if (labelOperations.remove.length > 0) {
for (const newLabel of labelOperations.remove) {
logger.debug(`${seq}: unlabel ${did} ${newLabel}`)
if (debug) logger.debug(`${seq}: unlabel ${did} ${newLabel}`)
operations.push({
label: newLabel,
action: 'remove',
Expand All @@ -264,7 +272,7 @@ export function _processCommit(commit: Commit): Promise<void> {
}
if (labelOperations.create.length > 0) {
for (const newLabel of labelOperations.create) {
logger.debug(`${seq}: label ${did} ${newLabel}`)
if (debug) logger.debug(`${seq}: label ${did} ${newLabel}`)
operations.push({
label: newLabel,
action: 'create',
Expand All @@ -279,7 +287,7 @@ export function _processCommit(commit: Commit): Promise<void> {
labelOperations.create.length > 0 ||
labelOperations.remove.length > 0
) {
insertOperations(operations)
await insertOperations(operations)
}

clearTimeout(failTimeout)
Expand All @@ -298,7 +306,7 @@ export function _processCommit(commit: Commit): Promise<void> {
const processQueue = new Denque<Commit>()

const maxActiveTasks = env.limits.MAX_CONCURRENT_PROCESSCOMMITS
const taskBuffer = maxActiveTasks * 2
const taskBuffer = maxActiveTasks * 3
let runningCommits = new Set<number>()

const knownBadCommits = new Set<number>()
Expand All @@ -322,19 +330,22 @@ async function queueManager() {
_processCommit(commit)
.then(() => {
if (knownBadCommits.has(seq)) {
logger.debug(`commit (seq ${seq}) succeeded after retry`)
if (debug)
logger.debug(`commit (seq ${seq}) succeeded after retry`)
knownBadCommits.delete(seq)
}
})
.catch((e) => {
if (e.message === 'ProcessCommitTimeout') {
if (!knownBadCommits.has(seq)) {
logger.debug(`${seq} timed out and will be retried`)
if (debug)
logger.debug(`${seq} timed out and will be retried`)
processCommit(commit)
// will know to fail it next time:
knownBadCommits.add(seq)
} else {
logger.warn(`commit (seq ${seq}) failed after retry`)
if (debug)
logger.warn(`commit (seq ${seq}) failed after retry`)
knownBadCommits.delete(seq)
}
} else {
Expand Down

0 comments on commit 065a754

Please sign in to comment.