Skip to content

Commit

Permalink
feat: update rdr codes for dropped message
Browse files Browse the repository at this point in the history
For dropped messages, dispose of the payload and update the rdr code to the following:
- rdr value of 4 for payloads too large
- rdr value of 102 for lower priority messages
  • Loading branch information
denopink committed Sep 13, 2024
1 parent d2319bd commit fcaeb57
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 41 deletions.
59 changes: 29 additions & 30 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package qos
import (
"container/heap"
"errors"
"fmt"
"slices"
"time"

Expand All @@ -15,6 +14,12 @@ import (

var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes")

var (
// https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes
messageIsTooLarge int64 = 4
higherPriorityMessageTookTheSpot int64 = 102
)

// priorityQueue implements heap.Interface and holds wrp Message, using wrp.QOSValue as its priority.
// https://xmidt.io/docs/wrp/basics/#qos-description-qos
type priorityQueue struct {
Expand Down Expand Up @@ -56,40 +61,44 @@ type item struct {
discard bool
}

func (itm *item) dispose() (payloadSize int64) {
payloadSize = int64(len(itm.msg.Payload))
// Mark itm to be discarded.
itm.discard = true
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
itm.msg.RequestDeliveryResponse = &higherPriorityMessageTookTheSpot

return payloadSize
}

// Dequeue returns the next highest priority message.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
var (
msg wrp.Message
ok bool
)
for pq.Len() != 0 {
itm := heap.Pop(pq).(item)
// itm.discard will be true if `itm` has been marked to be discarded,
// i.e. trimmed by `pq.trim()'.
if itm.discard {
continue
}
func (pq *priorityQueue) Dequeue() (msg wrp.Message, ok bool) {
if pq.Len() == 0 {
return msg, false
}

itm, ok := heap.Pop(pq).(item)
if ok {
msg = *itm.msg
ok = true
break
}

// ok will be false if no message was found, otherwise ok will be true.
return msg, ok
}

// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
func (pq *priorityQueue) Enqueue(msg wrp.Message) {
// Check whether msg violates maxMessageBytes.
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
msg.Payload = nil
msg.RequestDeliveryResponse = &messageIsTooLarge
}

heap.Push(pq, msg)
pq.trim()
return nil
}

// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“.
Expand All @@ -111,12 +120,7 @@ func (pq *priorityQueue) trim() {
}
if now.After(itm.expires) {
// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
pq.sizeBytes -= itm.dispose()
continue
}

Expand Down Expand Up @@ -153,12 +157,7 @@ func (pq *priorityQueue) trim() {
}

// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
pq.sizeBytes -= itm.dispose()
}

}
Expand Down
28 changes: 19 additions & 9 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,17 @@ func testEnqueueDequeue(t *testing.T) {
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"),
QualityOfService: wrp.QOSCriticalValue,
}
xLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameterXL\"]}"),
QualityOfService: wrp.QOSCriticalValue,
}
emptyXLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
QualityOfService: wrp.QOSCriticalValue,
RequestDeliveryResponse: &messageIsTooLarge,
}
enqueueSequenceTest := []wrp.Message{
largeCriticalQOSMsg,
mediumMediumQosMsg,
smallLowQOSMsg,
largeCriticalQOSMsg,
Expand All @@ -136,7 +145,6 @@ func testEnqueueDequeue(t *testing.T) {
mediumHighQosMsg,
}
dequeueSequenceTest := []wrp.Message{
largeCriticalQOSMsg,
largeCriticalQOSMsg,
largeCriticalQOSMsg,
mediumHighQosMsg,
Expand All @@ -152,8 +160,9 @@ func testEnqueueDequeue(t *testing.T) {
queueSizeSequenceTest += len(msg.Payload)
}

// expect 1 message to be drop
enqueueSequenceTest = append(enqueueSequenceTest, smallLowQOSMsg)
// test message payload drop
enqueueSequenceTest = append(enqueueSequenceTest, xLargeCriticalQOSMsg)
dequeueSequenceTest = append([]wrp.Message{emptyXLargeCriticalQOSMsg}, dequeueSequenceTest...)

tests := []struct {
description string
Expand Down Expand Up @@ -192,11 +201,12 @@ func testEnqueueDequeue(t *testing.T) {
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, xLargeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
expectedDequeueSequence: []wrp.Message{emptyXLargeCriticalQOSMsg, largeCriticalQOSMsg},
},
{
description: "drop incoming low priority messages",
Expand Down
4 changes: 2 additions & 2 deletions internal/wrphandlers/qos/qos.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ func (h *Handler) serviceQOS(queue <-chan wrp.Message) {
}

// ErrMaxMessageBytes errrors are ignored.
_ = pq.Enqueue(msg)
pq.Enqueue(msg)
case <-ready:
// Previous Handler.wrpHandler has finished, check whether it
// was successful or not.
if msg, ok := <-failedMsg; ok {
// Delivery failed, re-enqueue message and try again later.
// ErrMaxMessageBytes errrors are ignored.
_ = pq.Enqueue(msg)
pq.Enqueue(msg)
}

ready, failedMsg = nil, nil
Expand Down

0 comments on commit fcaeb57

Please sign in to comment.