Skip to content

Commit

Permalink
fix: slightly perf improvements, more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
AVVS committed Mar 25, 2022
1 parent 18464e8 commit 4223c74
Showing 1 changed file with 47 additions and 38 deletions.
85 changes: 47 additions & 38 deletions packages/transport-amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,13 @@ export class AMQPTransport extends EventEmitter {
assert(multiAckEvery <= neck, 'multiAckEvery must be less than prefetchCount/neck')
}

let consumerTag = consumer.consumerTag
const consumerTag = consumer.consumerTag
let interval: NodeJS.Timer | null = null
let sortedList: number[] = []
let latestConfirm = 0
let smallestUnconfirmedDeliveryTag = Number.MAX_SAFE_INTEGER

this.log.warn({ multiAckAfter, multiAckEvery, preEvent, postEvent, consumerTag }, 'setting up multi-ack')

const comparator = (val: number, idx: number, it: number[]) => {
const next = it[idx + 1]
Expand All @@ -315,48 +317,47 @@ export class AMQPTransport extends EventEmitter {
return true
}

const confirmAfter = () => {
if (!multiAckAfter ||
latestConfirm >= Date.now() - multiAckAfter ||
sortedList.length === 0 ||
sortedList[0] !== smallestUnconfirmedDeliveryTag) {
return
}

const largestUninterruptedTagIndex = sortedList.findIndex(comparator)
const tag = sortedList[largestUninterruptedTagIndex]
const before = sortedList.length
sortedList = sortedList.slice(largestUninterruptedTagIndex + 1)
const after = sortedList.length
const confirmAfter = multiAckAfter
? () => {
if (latestConfirm >= Date.now() - multiAckAfter ||
sortedList.length === 0 ||
sortedList[0] !== smallestUnconfirmedDeliveryTag) {
this.log.trace({ sortedList, latestConfirm, multiAckAfter, smallestUnconfirmedDeliveryTag }, 'skipping confirmAfter')
return
}

this.log.warn({ remove: before - after, tag, before, after, sortedList, multiAckAfter, state: consumer.state, consuming: consumer.consumerState }, 'confirmed elements')

consumer.multiAck(tag)
smallestUnconfirmedDeliveryTag = tag + 1
latestConfirm = Date.now()
}
const largestUninterruptedTagIndex = sortedList.findIndex(comparator)
const tag = sortedList[largestUninterruptedTagIndex]
const before = sortedList.length
sortedList = sortedList.slice(largestUninterruptedTagIndex + 1)
const after = sortedList.length

this.log.warn({ remove: before - after, tag, before, after, sortedList, multiAckAfter, state: consumer.state, consuming: consumer.consumerState }, 'confirmed elements')

consumer.multiAck(tag)
smallestUnconfirmedDeliveryTag = tag + 1
latestConfirm = Date.now()
}
: noop

const confirm = (tag: number, cut: number) => {
if (!multiAckEvery) return
if (interval) interval.refresh()
const confirm = multiAckEvery
? (tag: number, cut: number) => {
if (interval) interval.refresh()

const before = sortedList.length
sortedList = sortedList.slice(cut)
const after = sortedList.length
const before = sortedList.length
sortedList = sortedList.slice(cut)
const after = sortedList.length

this.log.warn({ remove: before - after, tag, sortedList, multiAckEvery, state: consumer.state, consuming: consumer.consumerState }, 'confirmed elements')
this.log.warn({ remove: before - after, tag, sortedList, multiAckEvery, state: consumer.state, consuming: consumer.consumerState }, 'confirmed elements')

consumer.multiAck(tag)
smallestUnconfirmedDeliveryTag = tag + 1
latestConfirm = Date.now()
}
consumer.multiAck(tag)
smallestUnconfirmedDeliveryTag = tag + 1
latestConfirm = Date.now()
}
: noop

// it's ok if messages come earlier as we have stable consume tag & cleanup on close
consumer.on('consuming', () => {
consumerTag = consumer.consumerTag
latestConfirm = 0
smallestUnconfirmedDeliveryTag = Number.MAX_SAFE_INTEGER
sortedList = []

if (multiAckAfter) {
interval = setInterval(confirmAfter, multiAckAfter).unref()
}
Expand Down Expand Up @@ -392,13 +393,15 @@ export class AMQPTransport extends EventEmitter {

const evaluateConfirmAndInvoke = (sliceSize: number, base: number): null | [number, number] => {
if (sortedList.length < sliceSize) {
this.log.trace({ sliceSize, sortedList }, 'slice too large')
return null
}

const firstTag = sortedList[0]
const lastTag = sortedList[sliceSize - 1]
const coveredRange = lastTag - firstTag === sliceSize - 1
if (!coveredRange) {
this.log.trace({ firstTag, lastTag, coveredRange, sliceSize, sortedList }, 'covered range skip')
return null
}

Expand All @@ -416,13 +419,19 @@ export class AMQPTransport extends EventEmitter {
// must be added after its processed
sorted.add(sortedList, deliveryTag)

if (!multiAckEvery) {
return
}

// in case we have multiple batches to be processed
if (sortedList[0] === smallestUnconfirmedDeliveryTag && multiAckEvery) {
if (sortedList[0] === smallestUnconfirmedDeliveryTag) {
this.log.trace({ sortedList, smallestUnconfirmedDeliveryTag, multiAckEvery }, 'evaluating')
const s = evaluateConfirmAndInvoke(multiAckEvery, multiAckEvery)
if (s) {
confirm(...s)
confirm(s[0], s[1])
}
} else {
this.log.trace({ sortedList, latestConfirm, multiAckEvery, smallestUnconfirmedDeliveryTag }, 'skipping confirm')
}
})
}
Expand Down

0 comments on commit 4223c74

Please sign in to comment.