From bdf4c0c55aff2b788385d496f99c5152cce02a9b Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 4 Jun 2024 19:25:02 -0400 Subject: [PATCH 1/2] feat: qos queue trimming - 100% test coverage - implement `priorityQueue.trim()` to remove messages with the lowest qos value while honoring the configured qos `priority` [newest, oldest message] - the compute cost of trim will be O(n) --- cmd/xmidt-agent/config.go | 10 +- cmd/xmidt-agent/wrphandlers.go | 4 + internal/wrphandlers/qos/options.go | 94 ++++++- internal/wrphandlers/qos/priority_queue.go | 149 +++++++++-- .../wrphandlers/qos/priority_queue_test.go | 244 +++++++++++++----- internal/wrphandlers/qos/qos.go | 19 +- internal/wrphandlers/qos/qos_test.go | 189 +++++++++----- 7 files changed, 541 insertions(+), 168 deletions(-) diff --git a/cmd/xmidt-agent/config.go b/cmd/xmidt-agent/config.go index 0085982..e5ec61f 100644 --- a/cmd/xmidt-agent/config.go +++ b/cmd/xmidt-agent/config.go @@ -64,9 +64,17 @@ type QOS struct { MaxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. MaxMessageBytes int - // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, + // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. Priority qos.PriorityType + // LowQOSExpires determines when low qos messages are trimmed. + LowQOSExpires time.Duration + // MediumQOSExpires determines when medium qos messages are trimmed. + MediumQOSExpires time.Duration + // HighQOSExpires determines when high qos messages are trimmed. + HighQOSExpires time.Duration + // CriticalQOSExpires determines when critical qos messages are trimmed. + CriticalQOSExpires time.Duration } type Pubsub struct { diff --git a/cmd/xmidt-agent/wrphandlers.go b/cmd/xmidt-agent/wrphandlers.go index dfd1d11..7c24f98 100644 --- a/cmd/xmidt-agent/wrphandlers.go +++ b/cmd/xmidt-agent/wrphandlers.go @@ -75,6 +75,10 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) { qos.MaxQueueBytes(in.QOS.MaxQueueBytes), qos.MaxMessageBytes(in.QOS.MaxMessageBytes), qos.Priority(in.QOS.Priority), + qos.LowQOSExpires(in.QOS.LowQOSExpires), + qos.MediumQOSExpires(in.QOS.MediumQOSExpires), + qos.HighQOSExpires(in.QOS.HighQOSExpires), + qos.CriticalQOSExpires(in.QOS.CriticalQOSExpires), ) } diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index 4904f5b..c441df7 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -6,11 +6,19 @@ package qos import ( "errors" "fmt" + "time" ) const ( + // Priority queue defaults. DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue DefaultMaxMessageBytes = 256 * 1024 // 256 KB + + // QOS expires defaults. + DefaultLowQOSExpires = time.Minute * 15 + DefaultMediumQOSExpires = time.Minute * 20 + DefaultHighQOSExpires = time.Minute * 25 + DefaultCriticalQOSExpires = time.Minute * 30 ) // MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload. @@ -47,25 +55,83 @@ func MaxMessageBytes(s int) Option { }) } -// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, +// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. func Priority(p PriorityType) Option { return optionFunc( - func(h *Handler) error { - // Determine what will be used as a QualityOfService tie breaker. - switch p { - case NewestType: - // Prioritize the newest messages. - h.tieBreaker = PriorityNewestMsg - case OldestType: - // Prioritize the oldest messages. - h.tieBreaker = PriorityOldestMsg - default: - return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS) + func(h *Handler) (err error) { + h.tieBreaker, err = priority(p) + h.priority = p + + return err + }) +} + +// priority determines which tie breakers are used during normal enqueueing. +func priority(p PriorityType) (enqueueTieBreaker tieBreaker, err error) { + // Determine what will be used as a QualityOfService tie breaker during normal enqueueing. + switch p { + case NewestType: + // Prioritize the newest messages. + enqueueTieBreaker = PriorityNewestMsg + case OldestType: + // Prioritize the oldest messages. + enqueueTieBreaker = PriorityOldestMsg + default: + return nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS) + } + + return enqueueTieBreaker, nil +} + +// LowQOSExpires determines when low qos messages are trimmed. +func LowQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative LowQOSExpires", ErrMisconfiguredQOS) } - h.priority = p + h.lowQOSExpires = t + return err + }) +} - return nil +// MediumQOSExpires determines when medium qos messages are trimmed. +func MediumQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative MediumQOSExpires", ErrMisconfiguredQOS) + } + + h.mediumQOSExpires = t + return err + }) +} + +// HighQOSExpires determines when high qos messages are trimmed. +func HighQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative HighQOSExpires", ErrMisconfiguredQOS) + } + + h.highQOSExpires = t + return err + }) +} + +// CriticalQOSExpires determines when critical qos messages are trimmed. +func CriticalQOSExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative CriticalQOSExpires", ErrMisconfiguredQOS) + } + + h.criticalQOSExpires = t + return err }) } diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 39a59cb..99dccbd 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -7,6 +7,7 @@ import ( "container/heap" "errors" "fmt" + "slices" "time" "github.com/xmidt-org/wrp-go/v3" @@ -19,6 +20,9 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes type priorityQueue struct { // queue for wrp messages, ingested by serviceQOS queue []item + // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, + // with the default being to prioritize the newest messages. + priority PriorityType // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads @@ -28,24 +32,49 @@ type priorityQueue struct { // sizeBytes is the sum of all queued wrp message's payloads. // An int64 overflow is unlikely since that'll be over 9*10^18 bytes sizeBytes int64 + + // QOS expiries. + // lowQOSExpires determines when low qos messages are trimmed. + lowQOSExpires time.Duration + // mediumQOSExpires determines when medium qos messages are trimmed. + mediumQOSExpires time.Duration + // highQOSExpires determines when high qos messages are trimmed. + highQOSExpires time.Duration + // criticalQOSExpires determines when critical qos messages are trimmed. + criticalQOSExpires time.Duration } type tieBreaker func(i, j item) bool type item struct { - msg wrp.Message - timestamp time.Time + // msg is the message queued for delivery. + msg *wrp.Message + // expires is the time the messge is good upto before it is eligible to be trimmed. + expires time.Time + // discard determines whether a message should be discarded or not + discard bool } // Dequeue returns the next highest priority message. func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { - // Required, otherwise heap.Pop will panic during an internal Swap call. - if pq.Len() == 0 { - return wrp.Message{}, false + var ( + msg wrp.Message + ok bool + ) + for pq.Len() != 0 { + itm := heap.Pop(pq).(item) + // itm.discard will be true if `itm` has been marked to be discarded, + // i.e. trimmed by `pq.trim()'. + if itm.discard { + continue + } + + msg = *itm.msg + ok = true + break } - msg, ok := heap.Pop(pq).(wrp.Message) - + // ok will be false if no message was found, otherwise ok will be true. return msg, ok } @@ -61,17 +90,75 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error { return nil } +// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“. func (pq *priorityQueue) trim() { - // trim until the queue no longer violates maxQueueBytes. - for pq.sizeBytes > pq.maxQueueBytes { - // Note, priorityQueue.drop does not drop the least prioritized queued message. - // i.e.: a high priority queued message may be dropped instead of a lesser queued message. - pq.drop() + // If priorityQueue.queue doesn't violates `maxQueueSize`, then return. + if pq.sizeBytes <= pq.maxQueueBytes { + return + } + + itemsCache := make([]*item, len(pq.queue)) + // Remove all expired messages before trimming unexpired lower priority messages. + now := time.Now() + iCache := 0 + for i := range pq.queue { + itm := &pq.queue[i] + // itm has already been marked to be discarded. + if itm.discard { + continue + } + if now.After(itm.expires) { + // Mark itm to be discarded. + // `pq.Dequeue()` will fully discard itm. + itm.discard = true + pq.sizeBytes -= int64(len(itm.msg.Payload)) + // Preemptively discard itm's payload to reduce + // resource usage, since itm will be discarded, + itm.msg.Payload = nil + continue + } + + itemsCache[iCache] = itm + iCache += 1 + } + + // Resize itemsCache. + itemsCache = itemsCache[:iCache] + slices.SortFunc(itemsCache, func(i, j *item) int { + if i.msg.QualityOfService < j.msg.QualityOfService { + return -1 + } else if i.msg.QualityOfService > j.msg.QualityOfService { + return 1 + } + + // Tiebreaker. + switch pq.priority { + case NewestType: + // Prioritize the newest messages. + return i.expires.Compare(j.expires) + default: + // Prioritize the oldest messages. + return j.expires.Compare(i.expires) + } + }) + + // Continue trimming until the pq.queue no longer violates maxQueueBytes. + // Remove the messages with the lowest priority. + for _, itm := range itemsCache { + // If pq.queue doesn't violates `maxQueueSize`, then return. + if pq.sizeBytes <= pq.maxQueueBytes { + break + } + + // Mark itm to be discarded. + // `pq.Dequeue()` will fully discard itm. + itm.discard = true + pq.sizeBytes -= int64(len(itm.msg.Payload)) + // Preemptively discard itm's payload to reduce + // resource usage, since itm will be discarded, + itm.msg.Payload = nil } -} -func (pq *priorityQueue) drop() { - _ = heap.Remove(pq, pq.Len()-1).(wrp.Message) } // heap.Interface related implementations https://pkg.go.dev/container/heap#Interface @@ -95,9 +182,25 @@ func (pq *priorityQueue) Swap(i, j int) { } func (pq *priorityQueue) Push(x any) { - item := item{msg: x.(wrp.Message), timestamp: time.Now()} - pq.sizeBytes += int64(len(item.msg.Payload)) - pq.queue = append(pq.queue, item) + msg := x.(wrp.Message) + pq.sizeBytes += int64(len(msg.Payload)) + + var qosExpires time.Duration + switch msg.QualityOfService.Level() { + case wrp.QOSLow: + qosExpires = pq.lowQOSExpires + case wrp.QOSMedium: + qosExpires = pq.mediumQOSExpires + case wrp.QOSHigh: + qosExpires = pq.highQOSExpires + case wrp.QOSCritical: + qosExpires = pq.criticalQOSExpires + } + + pq.queue = append(pq.queue, item{ + msg: &msg, + expires: time.Now().Add(qosExpires), + discard: false}) } func (pq *priorityQueue) Pop() any { @@ -106,19 +209,19 @@ func (pq *priorityQueue) Pop() any { return nil } - msg := pq.queue[last].msg - pq.sizeBytes -= int64(len(msg.Payload)) + itm := pq.queue[last] + pq.sizeBytes -= int64(len(itm.msg.Payload)) // avoid memory leak pq.queue[last] = item{} pq.queue = pq.queue[0:last] - return msg + return itm } func PriorityNewestMsg(i, j item) bool { - return i.timestamp.After(j.timestamp) + return i.expires.After(j.expires) } func PriorityOldestMsg(i, j item) bool { - return i.timestamp.Before(j.timestamp) + return i.expires.Before(j.expires) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index e6ca034..b89501c 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -22,6 +22,7 @@ func TestPriorityQueue(t *testing.T) { {"Size", testSize}, {"Len", testLen}, {"Less", testLess}, + {"Trim", testTrim}, {"Swap", testSwap}, {"Push", testPush}, {"Pop", testPop}, @@ -52,17 +53,17 @@ func testEnqueueDequeueAgePriority(t *testing.T) { } tests := []struct { description string - tieBreaker tieBreaker + priority PriorityType expectedMsg wrp.Message }{ { description: "drop incoming low priority messages while prioritizing older messages", - tieBreaker: PriorityOldestMsg, + priority: OldestType, expectedMsg: smallLowQOSMsgOldest, }, { description: "drop incoming low priority messages while prioritizing newer messages", - tieBreaker: PriorityNewestMsg, + priority: NewestType, expectedMsg: smallLowQOSMsgNewest, }, } @@ -72,20 +73,28 @@ func testEnqueueDequeueAgePriority(t *testing.T) { assert := assert.New(t) require := require.New(t) pq := priorityQueue{ - maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), - maxMessageBytes: len(smallLowQOSMsgOldest.Payload), - tieBreaker: tc.tieBreaker, + maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), + maxMessageBytes: len(smallLowQOSMsgOldest.Payload), + lowQOSExpires: DefaultLowQOSExpires, + mediumQOSExpires: DefaultMediumQOSExpires, + highQOSExpires: DefaultHighQOSExpires, + criticalQOSExpires: DefaultCriticalQOSExpires, + priority: tc.priority, } + + var err error + pq.tieBreaker, err = priority(tc.priority) + require.NoError(err) + for _, msg := range messages { pq.Enqueue(msg) } - assert.Equal(1, pq.Len()) - actualMsg, ok := pq.Dequeue() require.True(ok) require.NotEmpty(actualMsg) assert.Equal(tc.expectedMsg, actualMsg) + assert.Equal(int64(0), pq.sizeBytes) }) } } @@ -93,18 +102,23 @@ func testEnqueueDequeueAgePriority(t *testing.T) { func testEnqueueDequeue(t *testing.T) { emptyLowQOSMsg := wrp.Message{ Destination: "mac:00deadbeef00/config", - QualityOfService: wrp.QOSLowValue, + QualityOfService: 10, } smallLowQOSMsg := wrp.Message{ Destination: "mac:00deadbeef01/config", Payload: []byte("{}"), - QualityOfService: wrp.QOSLowValue, + QualityOfService: 10, } mediumMediumQosMsg := wrp.Message{ Destination: "mac:00deadbeef02/config", Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), QualityOfService: wrp.QOSMediumValue, } + mediumHighQosMsg := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } largeCriticalQOSMsg := wrp.Message{ Destination: "mac:00deadbeef03/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), @@ -119,11 +133,13 @@ func testEnqueueDequeue(t *testing.T) { largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg, + mediumHighQosMsg, } dequeueSequenceTest := []wrp.Message{ largeCriticalQOSMsg, largeCriticalQOSMsg, largeCriticalQOSMsg, + mediumHighQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, smallLowQOSMsg, @@ -136,6 +152,9 @@ func testEnqueueDequeue(t *testing.T) { queueSizeSequenceTest += len(msg.Payload) } + // expect 1 message to be drop + enqueueSequenceTest = append(enqueueSequenceTest, smallLowQOSMsg) + tests := []struct { description string messages []wrp.Message @@ -173,10 +192,18 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload), expectedQueueSize: 1, }, + { + description: "drop incoming low priority messages", + messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload) * 2, + maxMessageBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 2, + expectedDequeueSequence: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, + }, { description: "remove some low priority messages to fit a higher priority message", - messages: []wrp.Message{mediumMediumQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, largeCriticalQOSMsg}, - maxQueueBytes: len(mediumMediumQosMsg.Payload) * 3, + messages: []wrp.Message{mediumMediumQosMsg, smallLowQOSMsg, mediumMediumQosMsg, smallLowQOSMsg, mediumMediumQosMsg, smallLowQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg}, + maxQueueBytes: len(mediumMediumQosMsg.Payload)*2 + len(largeCriticalQOSMsg.Payload), maxMessageBytes: len(largeCriticalQOSMsg.Payload), expectedQueueSize: 2, }, @@ -201,16 +228,22 @@ func testEnqueueDequeue(t *testing.T) { assert := assert.New(t) require := require.New(t) pq := priorityQueue{ - maxQueueBytes: int64(tc.maxQueueBytes), - maxMessageBytes: tc.maxMessageBytes, - tieBreaker: PriorityNewestMsg, + maxQueueBytes: int64(tc.maxQueueBytes), + maxMessageBytes: tc.maxMessageBytes, + lowQOSExpires: DefaultLowQOSExpires, + mediumQOSExpires: DefaultMediumQOSExpires, + highQOSExpires: DefaultHighQOSExpires, + criticalQOSExpires: DefaultCriticalQOSExpires, } + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range tc.messages { pq.Enqueue(msg) } - assert.Equal(tc.expectedQueueSize, pq.Len()) - if len(tc.expectedDequeueSequence) == 0 { return } @@ -221,96 +254,105 @@ func testEnqueueDequeue(t *testing.T) { require.NotEmpty(actualMsg) assert.Equal(expectedMsg, actualMsg) } + + assert.Equal(int64(0), pq.sizeBytes) + }) } } func testSize(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg := wrp.Message{ Destination: "mac:00deadbeef00/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(int64(0), pq.sizeBytes) pq.Push(msg) pq.Push(msg) assert.Equal(int64(len(msg.Payload)*2), pq.sizeBytes) } + func testLen(t *testing.T) { assert := assert.New(t) - pq := priorityQueue{queue: []item{ + require := require.New(t) + pq := priorityQueue{} + pq.queue = []item{ { - msg: wrp.Message{ - + msg: &wrp.Message{ Destination: "mac:00deadbeef00/config", }, - timestamp: time.Now(), + expires: time.Now(), }, { - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", }, - timestamp: time.Now(), + expires: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, } + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(len(pq.queue), pq.Len()) } func testLess(t *testing.T) { oldestMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef00/config", QualityOfService: wrp.QOSCriticalValue, }, - timestamp: time.Now(), + expires: time.Now(), } newestMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", QualityOfService: wrp.QOSLowValue, }, - timestamp: time.Now(), + expires: time.Now(), } tieBreakerMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef02/config", QualityOfService: wrp.QOSCriticalValue, }, - timestamp: time.Now(), + expires: time.Now(), } tests := []struct { description string priority PriorityType - tieBreaker tieBreaker }{ { description: "less", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing newer messages", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing older messages", priority: OldestType, - tieBreaker: PriorityOldestMsg, }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) + require := require.New(t) + pq := priorityQueue{} + pq.queue = []item{oldestMsg, newestMsg, tieBreakerMsg} - pq := priorityQueue{ - queue: []item{oldestMsg, newestMsg, tieBreakerMsg}, - tieBreaker: tc.tieBreaker, - } + var err error + pq.tieBreaker, err = priority(tc.priority) + require.NoError(err) // wrp.QOSCriticalValue > wrp.QOSLowValue assert.True(pq.Less(0, 1)) @@ -332,42 +374,109 @@ func testLess(t *testing.T) { } } +func testTrim(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + msg0 := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSMediumValue, + } + msg1 := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } + msg2 := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } + msg3 := wrp.Message{ + Destination: "mac:00deadbeef03/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), + QualityOfService: wrp.QOSCriticalValue, + } + + pq := priorityQueue{ + maxQueueBytes: int64(len(msg0.Payload) + len(msg3.Payload)), + maxMessageBytes: len(msg3.Payload), + sizeBytes: int64(len(msg0.Payload) + len(msg1.Payload)*2 + len(msg3.Payload)), + priority: NewestType, + } + pq.queue = []item{ + { + msg: &msg0, + expires: time.Now().Add(DefaultCriticalQOSExpires), + }, + { + msg: &msg1, + expires: time.Now(), + }, + { + msg: &msg2, + expires: time.Now(), + }, + { + msg: &msg3, + expires: time.Now().Add(DefaultCriticalQOSExpires), + }, + } + + var err error + require.NoError(err) + pq.trim() + assert.Nil(pq.queue[1].msg.Payload) + assert.Nil(pq.queue[2].msg.Payload) + assert.NotEmpty(pq.queue[0].msg.Payload) + assert.NotEmpty(pq.queue[3].msg.Payload) + assert.Equal(pq.maxQueueBytes, pq.sizeBytes) + assert.Equal(*pq.queue[0].msg, msg0) + assert.Equal(*pq.queue[2].msg, msg1) + assert.Equal(*pq.queue[2].msg, msg2) + assert.Equal(*pq.queue[3].msg, msg3) +} + func testSwap(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg0 := wrp.Message{ Destination: "mac:00deadbeef00/config", } msg2 := wrp.Message{ Destination: "mac:00deadbeef02/config", } - pq := priorityQueue{queue: []item{ + pq := priorityQueue{} + pq.queue = []item{ { - msg: msg0, - timestamp: time.Now(), + msg: &msg0, + expires: time.Now(), }, { - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", }, - timestamp: time.Now(), + expires: time.Now(), }, { - msg: msg2, - timestamp: time.Now(), + msg: &msg2, + expires: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, } + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) pq.Swap(0, 2) // pq.queue[0] should contain msg2 - assert.Equal(msg2, pq.queue[0].msg) + assert.Equal(msg2, *pq.queue[0].msg) // pq.queue[2] should contain msg0 - assert.Equal(msg0, pq.queue[2].msg) + assert.Equal(msg0, *pq.queue[2].msg) } func testPush(t *testing.T) { assert := assert.New(t) + require := require.New(t) messages := []wrp.Message{ { Destination: "mac:00deadbeef00/config", @@ -379,10 +488,15 @@ func testPush(t *testing.T) { Destination: "mac:00deadbeef02/config", }, } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range messages { pq.Push(msg) - assert.Equal(msg, pq.queue[pq.Len()-1].msg) + assert.Equal(msg, *pq.queue[pq.Len()-1].msg) } } @@ -409,7 +523,7 @@ func testPop(t *testing.T) { description: "single message with memory leak check", items: []item{ { - msg: msg0, + msg: &msg0, }, }, expectedMessage: msg0, @@ -418,13 +532,13 @@ func testPop(t *testing.T) { description: "multiple messages with memory leak check", items: []item{ { - msg: msg0, + msg: &msg0, }, { - msg: msg1, + msg: &msg1, }, { - msg: msg2, + msg: &msg2, }, }, expectedMessage: msg2, @@ -435,18 +549,22 @@ func testPop(t *testing.T) { assert := assert.New(t) require := require.New(t) - pq := priorityQueue{queue: tc.items, tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + pq.queue = tc.items + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + // no sorting is applied, Pop will pop the last message from priorityQueue's queue - switch msg := pq.Pop().(type) { + switch itm := pq.Pop().(type) { case nil: assert.Len(tc.items, 0) - case wrp.Message: - assert.Equal(tc.expectedMessage, msg) + case item: + assert.Equal(tc.expectedMessage, *itm.msg) require.NotEmpty(tc.items, "Pop() should have returned a nil instead of a wrp.Message") // check for memory leak assert.Empty(tc.items[len(tc.items)-1]) - assert.Equal(wrp.Message{}, tc.items[len(tc.items)-1].msg) - assert.True(tc.items[len(tc.items)-1].timestamp.IsZero()) default: require.Fail("Pop() returned an unknown type") } diff --git a/internal/wrphandlers/qos/qos.go b/internal/wrphandlers/qos/qos.go index ff94e96..9c520c1 100644 --- a/internal/wrphandlers/qos/qos.go +++ b/internal/wrphandlers/qos/qos.go @@ -6,6 +6,7 @@ package qos import ( "errors" "sync" + "time" "github.com/xmidt-org/wrp-go/v3" "github.com/xmidt-org/xmidt-agent/internal/wrpkit" @@ -33,7 +34,7 @@ type Handler struct { next wrpkit.Handler // queue for wrp messages, ingested by serviceQOS queue chan wrp.Message - // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, + // priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. priority PriorityType // tieBreaker breaks any QualityOfService ties. @@ -43,6 +44,16 @@ type Handler struct { // MaxMessageBytes is the largest allowable wrp message payload. maxMessageBytes int + // QOS expiries. + // lowQOSExpires determines when low qos messages are trimmed. + lowQOSExpires time.Duration + // mediumQOSExpires determines when medium qos messages are trimmed. + mediumQOSExpires time.Duration + // highQOSExpires determines when high qos messages are trimmed. + highQOSExpires time.Duration + // criticalQOSExpires determines when critical qos messages are trimmed. + criticalQOSExpires time.Duration + lock sync.Mutex } @@ -59,7 +70,11 @@ func New(next wrpkit.Handler, opts ...Option) (*Handler, error) { opts = append(opts, validateQueueConstraints(), validatePriority(), validateTieBreaker()) h := Handler{ - next: next, + next: next, + lowQOSExpires: DefaultLowQOSExpires, + mediumQOSExpires: DefaultMediumQOSExpires, + highQOSExpires: DefaultHighQOSExpires, + criticalQOSExpires: DefaultCriticalQOSExpires, } var errs error diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index c643718..5f6f7c9 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -31,10 +31,9 @@ func TestHandler_HandleWrp(t *testing.T) { ) tests := []struct { - description string - maxQueueBytes int - maxMessageBytes int - priority qos.PriorityType + description string + options []qos.Option + priority qos.PriorityType // int64 required for nextCallCount atomic.Int64 comparison nextCallCount int64 next wrpkit.Handler @@ -46,11 +45,9 @@ func TestHandler_HandleWrp(t *testing.T) { }{ // success cases { - description: "enqueued and delivered message prioritizing newer messages", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 1, + description: "enqueued and delivered message prioritizing newer messages", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -58,11 +55,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "enqueued and delivered message prioritizing older messages", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.OldestType, - nextCallCount: 1, + description: "enqueued and delivered message prioritizing older messages", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.OldestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -70,11 +65,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "re-enqueue message that failed its delivery", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 2, + description: "re-enqueue message that failed its delivery", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 2, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) if nextCallCount.Load() < 2 { @@ -87,11 +80,10 @@ func TestHandler_HandleWrp(t *testing.T) { failDeliveryOnce: true, }, { - description: "queue messages while message delivery is blocked", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 0, + description: "queue messages while message delivery is blocked", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + + nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { // halt qos message delivery time.Sleep(1 * time.Second) @@ -101,11 +93,30 @@ func TestHandler_HandleWrp(t *testing.T) { shouldHalt: true, }, { - description: "zero MaxQueueBytes option value", - maxQueueBytes: 0, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 1, + description: "zero MaxQueueBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "zero MaxMessageBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(0)), qos.MaxMessageBytes(0), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative LowQOSExpires option value", + options: []qos.Option{qos.LowQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -113,11 +124,29 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "zero MaxMessageBytes option value", - maxQueueBytes: qos.DefaultMaxQueueBytes, - maxMessageBytes: 0, - priority: qos.NewestType, - nextCallCount: 1, + description: "non-negative MediumQOSExpires option value", + options: []qos.Option{qos.MediumQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative HighQOSExpires option value", + options: []qos.Option{qos.HighQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative CriticalQOSExpires option value", + options: []qos.Option{qos.CriticalQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -126,17 +155,13 @@ func TestHandler_HandleWrp(t *testing.T) { }, // failure cases { - description: "invalid inputs for qos.New", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - expectedNewErr: qos.ErrInvalidInput, + description: "invalid inputs for qos.New", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + expectedNewErr: qos.ErrInvalidInput, }, { - description: "negative MaxQueueBytes option value", - maxQueueBytes: -1, - maxMessageBytes: 50, - priority: qos.NewestType, + description: "negative MaxQueueBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(-1)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -145,10 +170,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative MaxMessageBytes option value", - maxQueueBytes: 100, - maxMessageBytes: -1, - priority: qos.NewestType, + description: "negative MaxMessageBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(-1), qos.Priority(qos.NewestType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -157,10 +180,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative invalid priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: -1, + description: "negative invalid priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(-1)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -169,10 +190,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "positive invalid priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: math.MaxInt64, + description: "positive invalid priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(math.MaxInt64)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -181,10 +200,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "unknown priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.UnknownType, + description: "unknown priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.UnknownType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -193,11 +210,9 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "qos has stopped", - maxQueueBytes: 100, - maxMessageBytes: 50, - nextCallCount: 0, - priority: qos.NewestType, + description: "qos has stopped", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -206,13 +221,57 @@ func TestHandler_HandleWrp(t *testing.T) { shutdown: true, expectedHandleWRPErr: qos.ErrQOSHasShutdown, }, + { + description: "negative LowQOSExpires option value", + options: []qos.Option{qos.LowQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative MediumQOSExpires option value", + options: []qos.Option{qos.MediumQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative HighQOSExpires option value", + options: []qos.Option{qos.HighQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative CriticalQOSExpires option value", + options: []qos.Option{qos.CriticalQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) require := require.New(t) - h, err := qos.New(tc.next, qos.MaxQueueBytes(int64(tc.maxQueueBytes)), qos.MaxMessageBytes(tc.maxMessageBytes), qos.Priority(tc.priority)) + h, err := qos.New(tc.next, tc.options...) if tc.expectedNewErr != nil { assert.ErrorIs(err, tc.expectedNewErr) assert.Nil(h) From 19bd918768252cae490272c563862dcbdf27c124 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Mon, 5 Aug 2024 17:35:39 -0400 Subject: [PATCH 2/2] chore: update based on pr feedback --- cmd/xmidt-agent/config.go | 16 ++++---- cmd/xmidt-agent/wrphandlers.go | 8 ++-- internal/wrphandlers/qos/options.go | 40 +++++++++---------- internal/wrphandlers/qos/priority_queue.go | 24 +++++------ .../wrphandlers/qos/priority_queue_test.go | 30 +++++++------- internal/wrphandlers/qos/qos.go | 26 ++++++------ internal/wrphandlers/qos/qos_test.go | 32 +++++++-------- 7 files changed, 88 insertions(+), 88 deletions(-) diff --git a/cmd/xmidt-agent/config.go b/cmd/xmidt-agent/config.go index e5ec61f..415e679 100644 --- a/cmd/xmidt-agent/config.go +++ b/cmd/xmidt-agent/config.go @@ -67,14 +67,14 @@ type QOS struct { // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. Priority qos.PriorityType - // LowQOSExpires determines when low qos messages are trimmed. - LowQOSExpires time.Duration - // MediumQOSExpires determines when medium qos messages are trimmed. - MediumQOSExpires time.Duration - // HighQOSExpires determines when high qos messages are trimmed. - HighQOSExpires time.Duration - // CriticalQOSExpires determines when critical qos messages are trimmed. - CriticalQOSExpires time.Duration + // LowExpires determines when low qos messages are trimmed. + LowExpires time.Duration + // MediumExpires determines when medium qos messages are trimmed. + MediumExpires time.Duration + // HighExpires determines when high qos messages are trimmed. + HighExpires time.Duration + // CriticalExpires determines when critical qos messages are trimmed. + CriticalExpires time.Duration } type Pubsub struct { diff --git a/cmd/xmidt-agent/wrphandlers.go b/cmd/xmidt-agent/wrphandlers.go index 7c24f98..8a6a972 100644 --- a/cmd/xmidt-agent/wrphandlers.go +++ b/cmd/xmidt-agent/wrphandlers.go @@ -75,10 +75,10 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) { qos.MaxQueueBytes(in.QOS.MaxQueueBytes), qos.MaxMessageBytes(in.QOS.MaxMessageBytes), qos.Priority(in.QOS.Priority), - qos.LowQOSExpires(in.QOS.LowQOSExpires), - qos.MediumQOSExpires(in.QOS.MediumQOSExpires), - qos.HighQOSExpires(in.QOS.HighQOSExpires), - qos.CriticalQOSExpires(in.QOS.CriticalQOSExpires), + qos.LowExpires(in.QOS.LowExpires), + qos.MediumExpires(in.QOS.MediumExpires), + qos.HighExpires(in.QOS.HighExpires), + qos.CriticalExpires(in.QOS.CriticalExpires), ) } diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index c441df7..27dac54 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -15,10 +15,10 @@ const ( DefaultMaxMessageBytes = 256 * 1024 // 256 KB // QOS expires defaults. - DefaultLowQOSExpires = time.Minute * 15 - DefaultMediumQOSExpires = time.Minute * 20 - DefaultHighQOSExpires = time.Minute * 25 - DefaultCriticalQOSExpires = time.Minute * 30 + DefaultLowExpires = time.Minute * 15 + DefaultMediumExpires = time.Minute * 20 + DefaultHighExpires = time.Minute * 25 + DefaultCriticalExpires = time.Minute * 30 ) // MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload. @@ -84,54 +84,54 @@ func priority(p PriorityType) (enqueueTieBreaker tieBreaker, err error) { return enqueueTieBreaker, nil } -// LowQOSExpires determines when low qos messages are trimmed. -func LowQOSExpires(t time.Duration) Option { +// LowExpires determines when low qos messages are trimmed. +func LowExpires(t time.Duration) Option { return optionFunc( func(h *Handler) (err error) { if t < 0 { - return fmt.Errorf("%w: negative LowQOSExpires", ErrMisconfiguredQOS) + return fmt.Errorf("%w: negative LowExpires", ErrMisconfiguredQOS) } - h.lowQOSExpires = t + h.lowExpires = t return err }) } -// MediumQOSExpires determines when medium qos messages are trimmed. -func MediumQOSExpires(t time.Duration) Option { +// MediumExpires determines when medium qos messages are trimmed. +func MediumExpires(t time.Duration) Option { return optionFunc( func(h *Handler) (err error) { if t < 0 { - return fmt.Errorf("%w: negative MediumQOSExpires", ErrMisconfiguredQOS) + return fmt.Errorf("%w: negative MediumExpires", ErrMisconfiguredQOS) } - h.mediumQOSExpires = t + h.mediumExpires = t return err }) } -// HighQOSExpires determines when high qos messages are trimmed. -func HighQOSExpires(t time.Duration) Option { +// HighExpires determines when high qos messages are trimmed. +func HighExpires(t time.Duration) Option { return optionFunc( func(h *Handler) (err error) { if t < 0 { - return fmt.Errorf("%w: negative HighQOSExpires", ErrMisconfiguredQOS) + return fmt.Errorf("%w: negative HighExpires", ErrMisconfiguredQOS) } - h.highQOSExpires = t + h.highExpires = t return err }) } -// CriticalQOSExpires determines when critical qos messages are trimmed. -func CriticalQOSExpires(t time.Duration) Option { +// CriticalExpires determines when critical qos messages are trimmed. +func CriticalExpires(t time.Duration) Option { return optionFunc( func(h *Handler) (err error) { if t < 0 { - return fmt.Errorf("%w: negative CriticalQOSExpires", ErrMisconfiguredQOS) + return fmt.Errorf("%w: negative CriticalExpires", ErrMisconfiguredQOS) } - h.criticalQOSExpires = t + h.criticalExpires = t return err }) } diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 99dccbd..e1781df 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -34,14 +34,14 @@ type priorityQueue struct { sizeBytes int64 // QOS expiries. - // lowQOSExpires determines when low qos messages are trimmed. - lowQOSExpires time.Duration - // mediumQOSExpires determines when medium qos messages are trimmed. - mediumQOSExpires time.Duration - // highQOSExpires determines when high qos messages are trimmed. - highQOSExpires time.Duration - // criticalQOSExpires determines when critical qos messages are trimmed. - criticalQOSExpires time.Duration + // lowExpires determines when low qos messages are trimmed. + lowExpires time.Duration + // mediumExpires determines when medium qos messages are trimmed. + mediumExpires time.Duration + // highExpires determines when high qos messages are trimmed. + highExpires time.Duration + // criticalExpires determines when critical qos messages are trimmed. + criticalExpires time.Duration } type tieBreaker func(i, j item) bool @@ -188,13 +188,13 @@ func (pq *priorityQueue) Push(x any) { var qosExpires time.Duration switch msg.QualityOfService.Level() { case wrp.QOSLow: - qosExpires = pq.lowQOSExpires + qosExpires = pq.lowExpires case wrp.QOSMedium: - qosExpires = pq.mediumQOSExpires + qosExpires = pq.mediumExpires case wrp.QOSHigh: - qosExpires = pq.highQOSExpires + qosExpires = pq.highExpires case wrp.QOSCritical: - qosExpires = pq.criticalQOSExpires + qosExpires = pq.criticalExpires } pq.queue = append(pq.queue, item{ diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index b89501c..ce19e53 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -73,13 +73,13 @@ func testEnqueueDequeueAgePriority(t *testing.T) { assert := assert.New(t) require := require.New(t) pq := priorityQueue{ - maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), - maxMessageBytes: len(smallLowQOSMsgOldest.Payload), - lowQOSExpires: DefaultLowQOSExpires, - mediumQOSExpires: DefaultMediumQOSExpires, - highQOSExpires: DefaultHighQOSExpires, - criticalQOSExpires: DefaultCriticalQOSExpires, - priority: tc.priority, + maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), + maxMessageBytes: len(smallLowQOSMsgOldest.Payload), + lowExpires: DefaultLowExpires, + mediumExpires: DefaultMediumExpires, + highExpires: DefaultHighExpires, + criticalExpires: DefaultCriticalExpires, + priority: tc.priority, } var err error @@ -228,12 +228,12 @@ func testEnqueueDequeue(t *testing.T) { assert := assert.New(t) require := require.New(t) pq := priorityQueue{ - maxQueueBytes: int64(tc.maxQueueBytes), - maxMessageBytes: tc.maxMessageBytes, - lowQOSExpires: DefaultLowQOSExpires, - mediumQOSExpires: DefaultMediumQOSExpires, - highQOSExpires: DefaultHighQOSExpires, - criticalQOSExpires: DefaultCriticalQOSExpires, + maxQueueBytes: int64(tc.maxQueueBytes), + maxMessageBytes: tc.maxMessageBytes, + lowExpires: DefaultLowExpires, + mediumExpires: DefaultMediumExpires, + highExpires: DefaultHighExpires, + criticalExpires: DefaultCriticalExpires, } var err error @@ -407,7 +407,7 @@ func testTrim(t *testing.T) { pq.queue = []item{ { msg: &msg0, - expires: time.Now().Add(DefaultCriticalQOSExpires), + expires: time.Now().Add(DefaultCriticalExpires), }, { msg: &msg1, @@ -419,7 +419,7 @@ func testTrim(t *testing.T) { }, { msg: &msg3, - expires: time.Now().Add(DefaultCriticalQOSExpires), + expires: time.Now().Add(DefaultCriticalExpires), }, } diff --git a/internal/wrphandlers/qos/qos.go b/internal/wrphandlers/qos/qos.go index 9c520c1..f20ef62 100644 --- a/internal/wrphandlers/qos/qos.go +++ b/internal/wrphandlers/qos/qos.go @@ -45,14 +45,14 @@ type Handler struct { maxMessageBytes int // QOS expiries. - // lowQOSExpires determines when low qos messages are trimmed. - lowQOSExpires time.Duration - // mediumQOSExpires determines when medium qos messages are trimmed. - mediumQOSExpires time.Duration - // highQOSExpires determines when high qos messages are trimmed. - highQOSExpires time.Duration - // criticalQOSExpires determines when critical qos messages are trimmed. - criticalQOSExpires time.Duration + // lowExpires determines when low qos messages are trimmed. + lowExpires time.Duration + // mediumExpires determines when medium qos messages are trimmed. + mediumExpires time.Duration + // highExpires determines when high qos messages are trimmed. + highExpires time.Duration + // criticalExpires determines when critical qos messages are trimmed. + criticalExpires time.Duration lock sync.Mutex } @@ -70,11 +70,11 @@ func New(next wrpkit.Handler, opts ...Option) (*Handler, error) { opts = append(opts, validateQueueConstraints(), validatePriority(), validateTieBreaker()) h := Handler{ - next: next, - lowQOSExpires: DefaultLowQOSExpires, - mediumQOSExpires: DefaultMediumQOSExpires, - highQOSExpires: DefaultHighQOSExpires, - criticalQOSExpires: DefaultCriticalQOSExpires, + next: next, + lowExpires: DefaultLowExpires, + mediumExpires: DefaultMediumExpires, + highExpires: DefaultHighExpires, + criticalExpires: DefaultCriticalExpires, } var errs error diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index 5f6f7c9..eaddd51 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -114,8 +114,8 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "non-negative LowQOSExpires option value", - options: []qos.Option{qos.LowQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "non-negative LowExpires option value", + options: []qos.Option{qos.LowExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -124,8 +124,8 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "non-negative MediumQOSExpires option value", - options: []qos.Option{qos.MediumQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "non-negative MediumExpires option value", + options: []qos.Option{qos.MediumExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -134,8 +134,8 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "non-negative HighQOSExpires option value", - options: []qos.Option{qos.HighQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "non-negative HighExpires option value", + options: []qos.Option{qos.HighExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -144,8 +144,8 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "non-negative CriticalQOSExpires option value", - options: []qos.Option{qos.CriticalQOSExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "non-negative CriticalExpires option value", + options: []qos.Option{qos.CriticalExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -222,8 +222,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedHandleWRPErr: qos.ErrQOSHasShutdown, }, { - description: "negative LowQOSExpires option value", - options: []qos.Option{qos.LowQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "negative LowExpires option value", + options: []qos.Option{qos.LowExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -233,8 +233,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative MediumQOSExpires option value", - options: []qos.Option{qos.MediumQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "negative MediumExpires option value", + options: []qos.Option{qos.MediumExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -244,8 +244,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative HighQOSExpires option value", - options: []qos.Option{qos.HighQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "negative HighExpires option value", + options: []qos.Option{qos.HighExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -255,8 +255,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative CriticalQOSExpires option value", - options: []qos.Option{qos.CriticalQOSExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + description: "negative CriticalExpires option value", + options: []qos.Option{qos.CriticalExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1)