Skip to content

Commit

Permalink
feat: qos queue trimming
Browse files Browse the repository at this point in the history
- 100% test coverage
- implement `priorityQueue.trim()` to remove messages with the lowest qos value while honoring the configured qos `priority` [newest, oldest message]
- the compute cost of trim will be O(n)
  • Loading branch information
denopink committed Aug 4, 2024
1 parent 50caded commit bdf4c0c
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 168 deletions.
10 changes: 9 additions & 1 deletion cmd/xmidt-agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,17 @@ type QOS struct {
MaxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
MaxMessageBytes int
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers,
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming,
// with the default being to prioritize the newest messages.
Priority qos.PriorityType
// LowQOSExpires determines when low qos messages are trimmed.
LowQOSExpires time.Duration
// MediumQOSExpires determines when medium qos messages are trimmed.
MediumQOSExpires time.Duration
// HighQOSExpires determines when high qos messages are trimmed.
HighQOSExpires time.Duration
// CriticalQOSExpires determines when critical qos messages are trimmed.
CriticalQOSExpires time.Duration
}

type Pubsub struct {
Expand Down
4 changes: 4 additions & 0 deletions cmd/xmidt-agent/wrphandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) {
qos.MaxQueueBytes(in.QOS.MaxQueueBytes),
qos.MaxMessageBytes(in.QOS.MaxMessageBytes),
qos.Priority(in.QOS.Priority),
qos.LowQOSExpires(in.QOS.LowQOSExpires),
qos.MediumQOSExpires(in.QOS.MediumQOSExpires),
qos.HighQOSExpires(in.QOS.HighQOSExpires),
qos.CriticalQOSExpires(in.QOS.CriticalQOSExpires),
)
}

Expand Down
94 changes: 80 additions & 14 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@ package qos
import (
"errors"
"fmt"
"time"
)

const (
// Priority queue defaults.
DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue
DefaultMaxMessageBytes = 256 * 1024 // 256 KB

// QOS expires defaults.
DefaultLowQOSExpires = time.Minute * 15
DefaultMediumQOSExpires = time.Minute * 20
DefaultHighQOSExpires = time.Minute * 25
DefaultCriticalQOSExpires = time.Minute * 30
)

// MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload.
Expand Down Expand Up @@ -47,25 +55,83 @@ func MaxMessageBytes(s int) Option {
})
}

// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers,
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming,
// with the default being to prioritize the newest messages.
func Priority(p PriorityType) Option {
return optionFunc(
func(h *Handler) error {
// Determine what will be used as a QualityOfService tie breaker.
switch p {
case NewestType:
// Prioritize the newest messages.
h.tieBreaker = PriorityNewestMsg
case OldestType:
// Prioritize the oldest messages.
h.tieBreaker = PriorityOldestMsg
default:
return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS)
func(h *Handler) (err error) {
h.tieBreaker, err = priority(p)
h.priority = p

return err
})
}

// priority determines which tie breakers are used during normal enqueueing.
func priority(p PriorityType) (enqueueTieBreaker tieBreaker, err error) {
// Determine what will be used as a QualityOfService tie breaker during normal enqueueing.
switch p {
case NewestType:
// Prioritize the newest messages.
enqueueTieBreaker = PriorityNewestMsg
case OldestType:
// Prioritize the oldest messages.
enqueueTieBreaker = PriorityOldestMsg
default:
return nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS)
}

return enqueueTieBreaker, nil
}

// LowQOSExpires determines when low qos messages are trimmed.
func LowQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative LowQOSExpires", ErrMisconfiguredQOS)
}

h.priority = p
h.lowQOSExpires = t
return err
})
}

return nil
// MediumQOSExpires determines when medium qos messages are trimmed.
func MediumQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative MediumQOSExpires", ErrMisconfiguredQOS)
}

h.mediumQOSExpires = t
return err
})
}

// HighQOSExpires determines when high qos messages are trimmed.
func HighQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative HighQOSExpires", ErrMisconfiguredQOS)
}

h.highQOSExpires = t
return err
})
}

// CriticalQOSExpires determines when critical qos messages are trimmed.
func CriticalQOSExpires(t time.Duration) Option {
return optionFunc(
func(h *Handler) (err error) {
if t < 0 {
return fmt.Errorf("%w: negative CriticalQOSExpires", ErrMisconfiguredQOS)
}

h.criticalQOSExpires = t
return err
})
}
149 changes: 126 additions & 23 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"container/heap"
"errors"
"fmt"
"slices"
"time"

"github.com/xmidt-org/wrp-go/v3"
Expand All @@ -19,6 +20,9 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes
type priorityQueue struct {
// queue for wrp messages, ingested by serviceQOS
queue []item
// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming,
// with the default being to prioritize the newest messages.
priority PriorityType
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
Expand All @@ -28,24 +32,49 @@ type priorityQueue struct {
// sizeBytes is the sum of all queued wrp message's payloads.
// An int64 overflow is unlikely since that'll be over 9*10^18 bytes
sizeBytes int64

// QOS expiries.
// lowQOSExpires determines when low qos messages are trimmed.
lowQOSExpires time.Duration
// mediumQOSExpires determines when medium qos messages are trimmed.
mediumQOSExpires time.Duration
// highQOSExpires determines when high qos messages are trimmed.
highQOSExpires time.Duration
// criticalQOSExpires determines when critical qos messages are trimmed.
criticalQOSExpires time.Duration
}

type tieBreaker func(i, j item) bool

type item struct {
msg wrp.Message
timestamp time.Time
// msg is the message queued for delivery.
msg *wrp.Message
// expires is the time the messge is good upto before it is eligible to be trimmed.
expires time.Time
// discard determines whether a message should be discarded or not
discard bool
}

// Dequeue returns the next highest priority message.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Required, otherwise heap.Pop will panic during an internal Swap call.
if pq.Len() == 0 {
return wrp.Message{}, false
var (
msg wrp.Message
ok bool
)
for pq.Len() != 0 {
itm := heap.Pop(pq).(item)
// itm.discard will be true if `itm` has been marked to be discarded,
// i.e. trimmed by `pq.trim()'.
if itm.discard {
continue
}

msg = *itm.msg
ok = true
break
}

msg, ok := heap.Pop(pq).(wrp.Message)

// ok will be false if no message was found, otherwise ok will be true.
return msg, ok
}

Expand All @@ -61,17 +90,75 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
return nil
}

// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“.
func (pq *priorityQueue) trim() {
// trim until the queue no longer violates maxQueueBytes.
for pq.sizeBytes > pq.maxQueueBytes {
// Note, priorityQueue.drop does not drop the least prioritized queued message.
// i.e.: a high priority queued message may be dropped instead of a lesser queued message.
pq.drop()
// If priorityQueue.queue doesn't violates `maxQueueSize`, then return.
if pq.sizeBytes <= pq.maxQueueBytes {
return
}

itemsCache := make([]*item, len(pq.queue))
// Remove all expired messages before trimming unexpired lower priority messages.
now := time.Now()
iCache := 0
for i := range pq.queue {
itm := &pq.queue[i]
// itm has already been marked to be discarded.
if itm.discard {
continue
}
if now.After(itm.expires) {
// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
continue
}

itemsCache[iCache] = itm
iCache += 1
}

// Resize itemsCache.
itemsCache = itemsCache[:iCache]
slices.SortFunc(itemsCache, func(i, j *item) int {
if i.msg.QualityOfService < j.msg.QualityOfService {
return -1
} else if i.msg.QualityOfService > j.msg.QualityOfService {
return 1
}

// Tiebreaker.
switch pq.priority {
case NewestType:
// Prioritize the newest messages.
return i.expires.Compare(j.expires)
default:
// Prioritize the oldest messages.
return j.expires.Compare(i.expires)
}
})

// Continue trimming until the pq.queue no longer violates maxQueueBytes.
// Remove the messages with the lowest priority.
for _, itm := range itemsCache {
// If pq.queue doesn't violates `maxQueueSize`, then return.
if pq.sizeBytes <= pq.maxQueueBytes {
break
}

// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
}
}

func (pq *priorityQueue) drop() {
_ = heap.Remove(pq, pq.Len()-1).(wrp.Message)
}

// heap.Interface related implementations https://pkg.go.dev/container/heap#Interface
Expand All @@ -95,9 +182,25 @@ func (pq *priorityQueue) Swap(i, j int) {
}

func (pq *priorityQueue) Push(x any) {
item := item{msg: x.(wrp.Message), timestamp: time.Now()}
pq.sizeBytes += int64(len(item.msg.Payload))
pq.queue = append(pq.queue, item)
msg := x.(wrp.Message)
pq.sizeBytes += int64(len(msg.Payload))

var qosExpires time.Duration
switch msg.QualityOfService.Level() {
case wrp.QOSLow:
qosExpires = pq.lowQOSExpires
case wrp.QOSMedium:
qosExpires = pq.mediumQOSExpires
case wrp.QOSHigh:
qosExpires = pq.highQOSExpires
case wrp.QOSCritical:
qosExpires = pq.criticalQOSExpires
}

pq.queue = append(pq.queue, item{
msg: &msg,
expires: time.Now().Add(qosExpires),
discard: false})
}

func (pq *priorityQueue) Pop() any {
Expand All @@ -106,19 +209,19 @@ func (pq *priorityQueue) Pop() any {
return nil
}

msg := pq.queue[last].msg
pq.sizeBytes -= int64(len(msg.Payload))
itm := pq.queue[last]
pq.sizeBytes -= int64(len(itm.msg.Payload))
// avoid memory leak
pq.queue[last] = item{}
pq.queue = pq.queue[0:last]

return msg
return itm
}

func PriorityNewestMsg(i, j item) bool {
return i.timestamp.After(j.timestamp)
return i.expires.After(j.expires)
}

func PriorityOldestMsg(i, j item) bool {
return i.timestamp.Before(j.timestamp)
return i.expires.Before(j.expires)
}
Loading

0 comments on commit bdf4c0c

Please sign in to comment.