diff --git a/cmd/xmidt-agent/config.go b/cmd/xmidt-agent/config.go index 2d89ef8..a0f7e07 100644 --- a/cmd/xmidt-agent/config.go +++ b/cmd/xmidt-agent/config.go @@ -64,9 +64,17 @@ type QOS struct { MaxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. MaxMessageBytes int - // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, + // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. Priority qos.PriorityType + // LowExpires determines when low qos messages are trimmed. + LowExpires time.Duration + // MediumExpires determines when medium qos messages are trimmed. + MediumExpires time.Duration + // HighExpires determines when high qos messages are trimmed. + HighExpires time.Duration + // CriticalExpires determines when critical qos messages are trimmed. + CriticalExpires time.Duration } type Pubsub struct { diff --git a/cmd/xmidt-agent/wrphandlers.go b/cmd/xmidt-agent/wrphandlers.go index dfd1d11..8a6a972 100644 --- a/cmd/xmidt-agent/wrphandlers.go +++ b/cmd/xmidt-agent/wrphandlers.go @@ -75,6 +75,10 @@ func provideQOSHandler(in qosIn) (*qos.Handler, error) { qos.MaxQueueBytes(in.QOS.MaxQueueBytes), qos.MaxMessageBytes(in.QOS.MaxMessageBytes), qos.Priority(in.QOS.Priority), + qos.LowExpires(in.QOS.LowExpires), + qos.MediumExpires(in.QOS.MediumExpires), + qos.HighExpires(in.QOS.HighExpires), + qos.CriticalExpires(in.QOS.CriticalExpires), ) } diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index bc3ce2a..62d0c9b 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -6,11 +6,19 @@ package qos import ( "errors" "fmt" + "time" ) const ( + // Priority queue defaults. DefaultMaxQueueBytes = 1 * 1024 * 1024 // 1MB max/queue DefaultMaxMessageBytes = 256 * 1024 // 256 KB + + // QOS expires defaults. + DefaultLowExpires = time.Minute * 15 + DefaultMediumExpires = time.Minute * 20 + DefaultHighExpires = time.Minute * 25 + DefaultCriticalExpires = time.Minute * 30 ) // MaxQueueBytes is the allowable max size of the qos' priority queue, based on the sum of all queued wrp message's payload. @@ -45,25 +53,83 @@ func MaxMessageBytes(s int) Option { }) } -// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, +// Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. func Priority(p PriorityType) Option { return optionFunc( - func(h *Handler) error { - // Determine what will be used as a QualityOfService tie breaker. - switch p { - case NewestType: - // Prioritize the newest messages. - h.tieBreaker = PriorityNewestMsg - case OldestType: - // Prioritize the oldest messages. - h.tieBreaker = PriorityOldestMsg - default: - return errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, h.priority), ErrMisconfiguredQOS) + func(h *Handler) (err error) { + h.tieBreaker, err = priority(p) + h.priority = p + + return err + }) +} + +// priority determines which tie breakers are used during normal enqueueing. +func priority(p PriorityType) (enqueueTieBreaker tieBreaker, err error) { + // Determine what will be used as a QualityOfService tie breaker during normal enqueueing. + switch p { + case NewestType: + // Prioritize the newest messages. + enqueueTieBreaker = PriorityNewestMsg + case OldestType: + // Prioritize the oldest messages. + enqueueTieBreaker = PriorityOldestMsg + default: + return nil, errors.Join(fmt.Errorf("%w: %s", ErrPriorityTypeInvalid, p), ErrMisconfiguredQOS) + } + + return enqueueTieBreaker, nil +} + +// LowExpires determines when low qos messages are trimmed. +func LowExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative LowExpires", ErrMisconfiguredQOS) } - h.priority = p + h.lowExpires = t + return err + }) +} - return nil +// MediumExpires determines when medium qos messages are trimmed. +func MediumExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative MediumExpires", ErrMisconfiguredQOS) + } + + h.mediumExpires = t + return err + }) +} + +// HighExpires determines when high qos messages are trimmed. +func HighExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative HighExpires", ErrMisconfiguredQOS) + } + + h.highExpires = t + return err + }) +} + +// CriticalExpires determines when critical qos messages are trimmed. +func CriticalExpires(t time.Duration) Option { + return optionFunc( + func(h *Handler) (err error) { + if t < 0 { + return fmt.Errorf("%w: negative CriticalExpires", ErrMisconfiguredQOS) + } + + h.criticalExpires = t + return err }) } diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 9397306..71626ad 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -7,6 +7,7 @@ import ( "container/heap" "errors" "fmt" + "slices" "time" "github.com/xmidt-org/wrp-go/v3" @@ -19,6 +20,9 @@ var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes type priorityQueue struct { // queue for wrp messages, ingested by serviceQOS queue []item + // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, + // with the default being to prioritize the newest messages. + priority PriorityType // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads. @@ -29,24 +33,49 @@ type priorityQueue struct { // sizeBytes is the sum of all queued wrp message's payloads. // An int64 overflow is unlikely since that'll be over 9*10^18 bytes sizeBytes int64 + + // QOS expiries. + // lowExpires determines when low qos messages are trimmed. + lowExpires time.Duration + // mediumExpires determines when medium qos messages are trimmed. + mediumExpires time.Duration + // highExpires determines when high qos messages are trimmed. + highExpires time.Duration + // criticalExpires determines when critical qos messages are trimmed. + criticalExpires time.Duration } type tieBreaker func(i, j item) bool type item struct { - msg wrp.Message - timestamp time.Time + // msg is the message queued for delivery. + msg *wrp.Message + // expires is the time the messge is good upto before it is eligible to be trimmed. + expires time.Time + // discard determines whether a message should be discarded or not + discard bool } // Dequeue returns the next highest priority message. func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { - // Required, otherwise heap.Pop will panic during an internal Swap call. - if pq.Len() == 0 { - return wrp.Message{}, false + var ( + msg wrp.Message + ok bool + ) + for pq.Len() != 0 { + itm := heap.Pop(pq).(item) + // itm.discard will be true if `itm` has been marked to be discarded, + // i.e. trimmed by `pq.trim()'. + if itm.discard { + continue + } + + msg = *itm.msg + ok = true + break } - msg, ok := heap.Pop(pq).(wrp.Message) - + // ok will be false if no message was found, otherwise ok will be true. return msg, ok } @@ -63,17 +92,75 @@ func (pq *priorityQueue) Enqueue(msg wrp.Message) error { return nil } +// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“. func (pq *priorityQueue) trim() { - // trim until the queue no longer violates maxQueueBytes. - for pq.sizeBytes > pq.maxQueueBytes { - // Note, priorityQueue.drop does not drop the least prioritized queued message. - // i.e.: a high priority queued message may be dropped instead of a lesser queued message. - pq.drop() + // If priorityQueue.queue doesn't violates `maxQueueSize`, then return. + if pq.sizeBytes <= pq.maxQueueBytes { + return + } + + itemsCache := make([]*item, len(pq.queue)) + // Remove all expired messages before trimming unexpired lower priority messages. + now := time.Now() + iCache := 0 + for i := range pq.queue { + itm := &pq.queue[i] + // itm has already been marked to be discarded. + if itm.discard { + continue + } + if now.After(itm.expires) { + // Mark itm to be discarded. + // `pq.Dequeue()` will fully discard itm. + itm.discard = true + pq.sizeBytes -= int64(len(itm.msg.Payload)) + // Preemptively discard itm's payload to reduce + // resource usage, since itm will be discarded, + itm.msg.Payload = nil + continue + } + + itemsCache[iCache] = itm + iCache += 1 + } + + // Resize itemsCache. + itemsCache = itemsCache[:iCache] + slices.SortFunc(itemsCache, func(i, j *item) int { + if i.msg.QualityOfService < j.msg.QualityOfService { + return -1 + } else if i.msg.QualityOfService > j.msg.QualityOfService { + return 1 + } + + // Tiebreaker. + switch pq.priority { + case NewestType: + // Prioritize the newest messages. + return i.expires.Compare(j.expires) + default: + // Prioritize the oldest messages. + return j.expires.Compare(i.expires) + } + }) + + // Continue trimming until the pq.queue no longer violates maxQueueBytes. + // Remove the messages with the lowest priority. + for _, itm := range itemsCache { + // If pq.queue doesn't violates `maxQueueSize`, then return. + if pq.sizeBytes <= pq.maxQueueBytes { + break + } + + // Mark itm to be discarded. + // `pq.Dequeue()` will fully discard itm. + itm.discard = true + pq.sizeBytes -= int64(len(itm.msg.Payload)) + // Preemptively discard itm's payload to reduce + // resource usage, since itm will be discarded, + itm.msg.Payload = nil } -} -func (pq *priorityQueue) drop() { - _ = heap.Remove(pq, pq.Len()-1).(wrp.Message) } // heap.Interface related implementations https://pkg.go.dev/container/heap#Interface @@ -97,9 +184,25 @@ func (pq *priorityQueue) Swap(i, j int) { } func (pq *priorityQueue) Push(x any) { - item := item{msg: x.(wrp.Message), timestamp: time.Now()} - pq.sizeBytes += int64(len(item.msg.Payload)) - pq.queue = append(pq.queue, item) + msg := x.(wrp.Message) + pq.sizeBytes += int64(len(msg.Payload)) + + var qosExpires time.Duration + switch msg.QualityOfService.Level() { + case wrp.QOSLow: + qosExpires = pq.lowExpires + case wrp.QOSMedium: + qosExpires = pq.mediumExpires + case wrp.QOSHigh: + qosExpires = pq.highExpires + case wrp.QOSCritical: + qosExpires = pq.criticalExpires + } + + pq.queue = append(pq.queue, item{ + msg: &msg, + expires: time.Now().Add(qosExpires), + discard: false}) } func (pq *priorityQueue) Pop() any { @@ -108,19 +211,19 @@ func (pq *priorityQueue) Pop() any { return nil } - msg := pq.queue[last].msg - pq.sizeBytes -= int64(len(msg.Payload)) + itm := pq.queue[last] + pq.sizeBytes -= int64(len(itm.msg.Payload)) // avoid memory leak pq.queue[last] = item{} pq.queue = pq.queue[0:last] - return msg + return itm } func PriorityNewestMsg(i, j item) bool { - return i.timestamp.After(j.timestamp) + return i.expires.After(j.expires) } func PriorityOldestMsg(i, j item) bool { - return i.timestamp.Before(j.timestamp) + return i.expires.Before(j.expires) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index 817b149..feb2215 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -22,6 +22,7 @@ func TestPriorityQueue(t *testing.T) { {"Size", testSize}, {"Len", testLen}, {"Less", testLess}, + {"Trim", testTrim}, {"Swap", testSwap}, {"Push", testPush}, {"Pop", testPop}, @@ -52,17 +53,17 @@ func testEnqueueDequeueAgePriority(t *testing.T) { } tests := []struct { description string - tieBreaker tieBreaker + priority PriorityType expectedMsg wrp.Message }{ { description: "drop incoming low priority messages while prioritizing older messages", - tieBreaker: PriorityOldestMsg, + priority: OldestType, expectedMsg: smallLowQOSMsgOldest, }, { description: "drop incoming low priority messages while prioritizing newer messages", - tieBreaker: PriorityNewestMsg, + priority: NewestType, expectedMsg: smallLowQOSMsgNewest, }, } @@ -74,18 +75,26 @@ func testEnqueueDequeueAgePriority(t *testing.T) { pq := priorityQueue{ maxQueueBytes: int64(len(smallLowQOSMsgOldest.Payload)), maxMessageBytes: len(smallLowQOSMsgOldest.Payload), - tieBreaker: tc.tieBreaker, + lowExpires: DefaultLowExpires, + mediumExpires: DefaultMediumExpires, + highExpires: DefaultHighExpires, + criticalExpires: DefaultCriticalExpires, + priority: tc.priority, } + + var err error + pq.tieBreaker, err = priority(tc.priority) + require.NoError(err) + for _, msg := range messages { pq.Enqueue(msg) } - assert.Equal(1, pq.Len()) - actualMsg, ok := pq.Dequeue() require.True(ok) require.NotEmpty(actualMsg) assert.Equal(tc.expectedMsg, actualMsg) + assert.Equal(int64(0), pq.sizeBytes) }) } } @@ -93,18 +102,23 @@ func testEnqueueDequeueAgePriority(t *testing.T) { func testEnqueueDequeue(t *testing.T) { emptyLowQOSMsg := wrp.Message{ Destination: "mac:00deadbeef00/config", - QualityOfService: wrp.QOSLowValue, + QualityOfService: 10, } smallLowQOSMsg := wrp.Message{ Destination: "mac:00deadbeef01/config", Payload: []byte("{}"), - QualityOfService: wrp.QOSLowValue, + QualityOfService: 10, } mediumMediumQosMsg := wrp.Message{ Destination: "mac:00deadbeef02/config", Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), QualityOfService: wrp.QOSMediumValue, } + mediumHighQosMsg := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } largeCriticalQOSMsg := wrp.Message{ Destination: "mac:00deadbeef03/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), @@ -119,11 +133,13 @@ func testEnqueueDequeue(t *testing.T) { largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg, + mediumHighQosMsg, } dequeueSequenceTest := []wrp.Message{ largeCriticalQOSMsg, largeCriticalQOSMsg, largeCriticalQOSMsg, + mediumHighQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, smallLowQOSMsg, @@ -136,6 +152,9 @@ func testEnqueueDequeue(t *testing.T) { queueSizeSequenceTest += len(msg.Payload) } + // expect 1 message to be drop + enqueueSequenceTest = append(enqueueSequenceTest, smallLowQOSMsg) + tests := []struct { description string messages []wrp.Message @@ -179,10 +198,18 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload), expectedQueueSize: 1, }, + { + description: "drop incoming low priority messages", + messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload) * 2, + maxMessageBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 2, + expectedDequeueSequence: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, + }, { description: "remove some low priority messages to fit a higher priority message", - messages: []wrp.Message{mediumMediumQosMsg, mediumMediumQosMsg, mediumMediumQosMsg, largeCriticalQOSMsg}, - maxQueueBytes: len(mediumMediumQosMsg.Payload) * 3, + messages: []wrp.Message{mediumMediumQosMsg, smallLowQOSMsg, mediumMediumQosMsg, smallLowQOSMsg, mediumMediumQosMsg, smallLowQOSMsg, largeCriticalQOSMsg, smallLowQOSMsg, mediumMediumQosMsg}, + maxQueueBytes: len(mediumMediumQosMsg.Payload)*2 + len(largeCriticalQOSMsg.Payload), maxMessageBytes: len(largeCriticalQOSMsg.Payload), expectedQueueSize: 2, }, @@ -209,14 +236,20 @@ func testEnqueueDequeue(t *testing.T) { pq := priorityQueue{ maxQueueBytes: int64(tc.maxQueueBytes), maxMessageBytes: tc.maxMessageBytes, - tieBreaker: PriorityNewestMsg, + lowExpires: DefaultLowExpires, + mediumExpires: DefaultMediumExpires, + highExpires: DefaultHighExpires, + criticalExpires: DefaultCriticalExpires, } + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range tc.messages { pq.Enqueue(msg) } - assert.Equal(tc.expectedQueueSize, pq.Len()) - if len(tc.expectedDequeueSequence) == 0 { return } @@ -227,96 +260,105 @@ func testEnqueueDequeue(t *testing.T) { require.NotEmpty(actualMsg) assert.Equal(expectedMsg, actualMsg) } + + assert.Equal(int64(0), pq.sizeBytes) + }) } } func testSize(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg := wrp.Message{ Destination: "mac:00deadbeef00/config", Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(int64(0), pq.sizeBytes) pq.Push(msg) pq.Push(msg) assert.Equal(int64(len(msg.Payload)*2), pq.sizeBytes) } + func testLen(t *testing.T) { assert := assert.New(t) - pq := priorityQueue{queue: []item{ + require := require.New(t) + pq := priorityQueue{} + pq.queue = []item{ { - msg: wrp.Message{ - + msg: &wrp.Message{ Destination: "mac:00deadbeef00/config", }, - timestamp: time.Now(), + expires: time.Now(), }, { - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", }, - timestamp: time.Now(), + expires: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, } + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) assert.Equal(len(pq.queue), pq.Len()) } func testLess(t *testing.T) { oldestMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef00/config", QualityOfService: wrp.QOSCriticalValue, }, - timestamp: time.Now(), + expires: time.Now(), } newestMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", QualityOfService: wrp.QOSLowValue, }, - timestamp: time.Now(), + expires: time.Now(), } tieBreakerMsg := item{ - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef02/config", QualityOfService: wrp.QOSCriticalValue, }, - timestamp: time.Now(), + expires: time.Now(), } tests := []struct { description string priority PriorityType - tieBreaker tieBreaker }{ { description: "less", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing newer messages", priority: NewestType, - tieBreaker: PriorityNewestMsg, }, { description: "tie breaker prioritizing older messages", priority: OldestType, - tieBreaker: PriorityOldestMsg, }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) + require := require.New(t) + pq := priorityQueue{} + pq.queue = []item{oldestMsg, newestMsg, tieBreakerMsg} - pq := priorityQueue{ - queue: []item{oldestMsg, newestMsg, tieBreakerMsg}, - tieBreaker: tc.tieBreaker, - } + var err error + pq.tieBreaker, err = priority(tc.priority) + require.NoError(err) // wrp.QOSCriticalValue > wrp.QOSLowValue assert.True(pq.Less(0, 1)) @@ -338,42 +380,109 @@ func testLess(t *testing.T) { } } +func testTrim(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + msg0 := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSMediumValue, + } + msg1 := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } + msg2 := wrp.Message{ + Destination: "mac:00deadbeef02/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[]}"), + QualityOfService: wrp.QOSHighValue, + } + msg3 := wrp.Message{ + Destination: "mac:00deadbeef03/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), + QualityOfService: wrp.QOSCriticalValue, + } + + pq := priorityQueue{ + maxQueueBytes: int64(len(msg0.Payload) + len(msg3.Payload)), + maxMessageBytes: len(msg3.Payload), + sizeBytes: int64(len(msg0.Payload) + len(msg1.Payload)*2 + len(msg3.Payload)), + priority: NewestType, + } + pq.queue = []item{ + { + msg: &msg0, + expires: time.Now().Add(DefaultCriticalExpires), + }, + { + msg: &msg1, + expires: time.Now(), + }, + { + msg: &msg2, + expires: time.Now(), + }, + { + msg: &msg3, + expires: time.Now().Add(DefaultCriticalExpires), + }, + } + + var err error + require.NoError(err) + pq.trim() + assert.Nil(pq.queue[1].msg.Payload) + assert.Nil(pq.queue[2].msg.Payload) + assert.NotEmpty(pq.queue[0].msg.Payload) + assert.NotEmpty(pq.queue[3].msg.Payload) + assert.Equal(pq.maxQueueBytes, pq.sizeBytes) + assert.Equal(*pq.queue[0].msg, msg0) + assert.Equal(*pq.queue[2].msg, msg1) + assert.Equal(*pq.queue[2].msg, msg2) + assert.Equal(*pq.queue[3].msg, msg3) +} + func testSwap(t *testing.T) { assert := assert.New(t) + require := require.New(t) msg0 := wrp.Message{ Destination: "mac:00deadbeef00/config", } msg2 := wrp.Message{ Destination: "mac:00deadbeef02/config", } - pq := priorityQueue{queue: []item{ + pq := priorityQueue{} + pq.queue = []item{ { - msg: msg0, - timestamp: time.Now(), + msg: &msg0, + expires: time.Now(), }, { - msg: wrp.Message{ + msg: &wrp.Message{ Destination: "mac:00deadbeef01/config", }, - timestamp: time.Now(), + expires: time.Now(), }, { - msg: msg2, - timestamp: time.Now(), + msg: &msg2, + expires: time.Now(), }, - }, - tieBreaker: PriorityNewestMsg, } + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) pq.Swap(0, 2) // pq.queue[0] should contain msg2 - assert.Equal(msg2, pq.queue[0].msg) + assert.Equal(msg2, *pq.queue[0].msg) // pq.queue[2] should contain msg0 - assert.Equal(msg0, pq.queue[2].msg) + assert.Equal(msg0, *pq.queue[2].msg) } func testPush(t *testing.T) { assert := assert.New(t) + require := require.New(t) messages := []wrp.Message{ { Destination: "mac:00deadbeef00/config", @@ -385,10 +494,15 @@ func testPush(t *testing.T) { Destination: "mac:00deadbeef02/config", }, } - pq := priorityQueue{tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + for _, msg := range messages { pq.Push(msg) - assert.Equal(msg, pq.queue[pq.Len()-1].msg) + assert.Equal(msg, *pq.queue[pq.Len()-1].msg) } } @@ -415,7 +529,7 @@ func testPop(t *testing.T) { description: "single message with memory leak check", items: []item{ { - msg: msg0, + msg: &msg0, }, }, expectedMessage: msg0, @@ -424,13 +538,13 @@ func testPop(t *testing.T) { description: "multiple messages with memory leak check", items: []item{ { - msg: msg0, + msg: &msg0, }, { - msg: msg1, + msg: &msg1, }, { - msg: msg2, + msg: &msg2, }, }, expectedMessage: msg2, @@ -441,18 +555,22 @@ func testPop(t *testing.T) { assert := assert.New(t) require := require.New(t) - pq := priorityQueue{queue: tc.items, tieBreaker: PriorityNewestMsg} + pq := priorityQueue{} + pq.queue = tc.items + + var err error + pq.tieBreaker, err = priority(NewestType) + require.NoError(err) + // no sorting is applied, Pop will pop the last message from priorityQueue's queue - switch msg := pq.Pop().(type) { + switch itm := pq.Pop().(type) { case nil: assert.Len(tc.items, 0) - case wrp.Message: - assert.Equal(tc.expectedMessage, msg) + case item: + assert.Equal(tc.expectedMessage, *itm.msg) require.NotEmpty(tc.items, "Pop() should have returned a nil instead of a wrp.Message") // check for memory leak assert.Empty(tc.items[len(tc.items)-1]) - assert.Equal(wrp.Message{}, tc.items[len(tc.items)-1].msg) - assert.True(tc.items[len(tc.items)-1].timestamp.IsZero()) default: require.Fail("Pop() returned an unknown type") } diff --git a/internal/wrphandlers/qos/qos.go b/internal/wrphandlers/qos/qos.go index ff94e96..f20ef62 100644 --- a/internal/wrphandlers/qos/qos.go +++ b/internal/wrphandlers/qos/qos.go @@ -6,6 +6,7 @@ package qos import ( "errors" "sync" + "time" "github.com/xmidt-org/wrp-go/v3" "github.com/xmidt-org/xmidt-agent/internal/wrpkit" @@ -33,7 +34,7 @@ type Handler struct { next wrpkit.Handler // queue for wrp messages, ingested by serviceQOS queue chan wrp.Message - // Priority determines what is used [newest, oldest message] for QualityOfService tie breakers, + // priority determines what is used [newest, oldest message] for QualityOfService tie breakers and trimming, // with the default being to prioritize the newest messages. priority PriorityType // tieBreaker breaks any QualityOfService ties. @@ -43,6 +44,16 @@ type Handler struct { // MaxMessageBytes is the largest allowable wrp message payload. maxMessageBytes int + // QOS expiries. + // lowExpires determines when low qos messages are trimmed. + lowExpires time.Duration + // mediumExpires determines when medium qos messages are trimmed. + mediumExpires time.Duration + // highExpires determines when high qos messages are trimmed. + highExpires time.Duration + // criticalExpires determines when critical qos messages are trimmed. + criticalExpires time.Duration + lock sync.Mutex } @@ -59,7 +70,11 @@ func New(next wrpkit.Handler, opts ...Option) (*Handler, error) { opts = append(opts, validateQueueConstraints(), validatePriority(), validateTieBreaker()) h := Handler{ - next: next, + next: next, + lowExpires: DefaultLowExpires, + mediumExpires: DefaultMediumExpires, + highExpires: DefaultHighExpires, + criticalExpires: DefaultCriticalExpires, } var errs error diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index 38a0069..61c88b9 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -31,10 +31,8 @@ func TestHandler_HandleWrp(t *testing.T) { ) tests := []struct { - description string - maxQueueBytes int - maxMessageBytes int - priority qos.PriorityType + description string + options []qos.Option // int64 required for nextCallCount atomic.Int64 comparison nextCallCount int64 next wrpkit.Handler @@ -46,9 +44,8 @@ func TestHandler_HandleWrp(t *testing.T) { }{ // success cases { - description: "enqueued and delivered message prioritizing newer messages with no message size restriction", - maxQueueBytes: 100, - priority: qos.NewestType, + description: "enqueued and delivered message prioritizing newer messages", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -57,11 +54,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "enqueued and delivered message prioritizing newer messages", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 1, + description: "enqueued and delivered message prioritizing newer messages with no message size restriction", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -69,11 +64,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "enqueued and delivered message prioritizing older messages", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.OldestType, - nextCallCount: 1, + description: "enqueued and delivered message prioritizing older messages", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.OldestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -81,11 +74,9 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "re-enqueue message that failed its delivery", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 2, + description: "re-enqueue message that failed its delivery", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 2, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) if nextCallCount.Load() < 2 { @@ -98,11 +89,10 @@ func TestHandler_HandleWrp(t *testing.T) { failDeliveryOnce: true, }, { - description: "queue messages while message delivery is blocked", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - nextCallCount: 0, + description: "queue messages while message delivery is blocked", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + + nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { // halt qos message delivery time.Sleep(1 * time.Second) @@ -112,11 +102,39 @@ func TestHandler_HandleWrp(t *testing.T) { shouldHalt: true, }, { - description: "zero MaxQueueBytes option value", - maxQueueBytes: 0, - maxMessageBytes: qos.DefaultMaxMessageBytes, - priority: qos.NewestType, - nextCallCount: 1, + description: "zero MaxQueueBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "zero MaxMessageBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(0)), qos.MaxMessageBytes(0), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative LowExpires option value", + options: []qos.Option{qos.LowExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative MediumExpires option value", + options: []qos.Option{qos.MediumExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -124,11 +142,19 @@ func TestHandler_HandleWrp(t *testing.T) { }), }, { - description: "zero MaxMessageBytes option value", - maxQueueBytes: qos.DefaultMaxQueueBytes, - maxMessageBytes: 0, - priority: qos.NewestType, - nextCallCount: 1, + description: "non-negative HighExpires option value", + options: []qos.Option{qos.HighExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, + { + description: "non-negative CriticalExpires option value", + options: []qos.Option{qos.CriticalExpires(0), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -137,17 +163,13 @@ func TestHandler_HandleWrp(t *testing.T) { }, // failure cases { - description: "invalid inputs for qos.New", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.NewestType, - expectedNewErr: qos.ErrInvalidInput, + description: "invalid inputs for qos.New", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + expectedNewErr: qos.ErrInvalidInput, }, { - description: "negative MaxQueueBytes option value", - maxQueueBytes: -1, - maxMessageBytes: 50, - priority: qos.NewestType, + description: "negative MaxQueueBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(-1)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -156,10 +178,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative MaxMessageBytes option value", - maxQueueBytes: 100, - maxMessageBytes: -1, - priority: qos.NewestType, + description: "negative MaxMessageBytes option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(-1), qos.Priority(qos.NewestType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -168,10 +188,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "negative invalid priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: -1, + description: "negative invalid priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(-1)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -180,10 +198,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "positive invalid priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: math.MaxInt64, + description: "positive invalid priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(math.MaxInt64)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -192,10 +208,8 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "unknown priority type option value", - maxQueueBytes: 100, - maxMessageBytes: 50, - priority: qos.UnknownType, + description: "unknown priority type option value", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.UnknownType)}, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -204,11 +218,9 @@ func TestHandler_HandleWrp(t *testing.T) { expectedNewErr: qos.ErrMisconfiguredQOS, }, { - description: "qos has stopped", - maxQueueBytes: 100, - maxMessageBytes: 50, - nextCallCount: 0, - priority: qos.NewestType, + description: "qos has stopped", + options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, next: wrpkit.HandlerFunc(func(wrp.Message) error { nextCallCount.Add(1) @@ -217,13 +229,57 @@ func TestHandler_HandleWrp(t *testing.T) { shutdown: true, expectedHandleWRPErr: qos.ErrQOSHasShutdown, }, + { + description: "negative LowExpires option value", + options: []qos.Option{qos.LowExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative MediumExpires option value", + options: []qos.Option{qos.MediumExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative HighExpires option value", + options: []qos.Option{qos.HighExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, + { + description: "negative CriticalExpires option value", + options: []qos.Option{qos.CriticalExpires(-1), qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)}, + nextCallCount: 0, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + expectedNewErr: qos.ErrMisconfiguredQOS, + }, } for _, tc := range tests { t.Run(tc.description, func(t *testing.T) { assert := assert.New(t) require := require.New(t) - h, err := qos.New(tc.next, qos.MaxQueueBytes(int64(tc.maxQueueBytes)), qos.MaxMessageBytes(tc.maxMessageBytes), qos.Priority(tc.priority)) + h, err := qos.New(tc.next, tc.options...) if tc.expectedNewErr != nil { assert.ErrorIs(err, tc.expectedNewErr) assert.Nil(h)