Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update rdr codes for dropped message #232

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (

var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes")

const (
// https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes
messageIsTooLarge int64 = 4
denopink marked this conversation as resolved.
Show resolved Hide resolved
higherPriorityMessageTookTheSpot int64 = 102
)

// priorityQueue implements heap.Interface and holds wrp Message, using wrp.QOSValue as its priority.
// https://xmidt.io/docs/wrp/basics/#qos-description-qos
type priorityQueue struct {
Expand Down Expand Up @@ -56,23 +62,29 @@ type item struct {
discard bool
}

func (itm *item) dispose() (payloadSize int64) {
var rdr = higherPriorityMessageTookTheSpot

payloadSize = int64(len(itm.msg.Payload))
// Mark itm to be discarded.
itm.discard = true
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
itm.msg.RequestDeliveryResponse = &rdr

return payloadSize
}

// Dequeue returns the next highest priority message.
func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
var (
msg wrp.Message
ok bool
)
for pq.Len() != 0 {
itm := heap.Pop(pq).(item)
// itm.discard will be true if `itm` has been marked to be discarded,
// i.e. trimmed by `pq.trim()'.
if itm.discard {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we were discarding the message without sending any response back to xmidt

continue
}
func (pq *priorityQueue) Dequeue() (msg wrp.Message, ok bool) {
if pq.Len() == 0 {
return msg, false
}

itm, ok := heap.Pop(pq).(item)
if ok {
msg = *itm.msg
ok = true
break
}

// ok will be false if no message was found, otherwise ok will be true.
Expand All @@ -81,15 +93,22 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {

// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
var err error

// Check whether msg violates maxMessageBytes.
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
var rdr = messageIsTooLarge

msg.Payload = nil
msg.RequestDeliveryResponse = &rdr
err = fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

heap.Push(pq, msg)
pq.trim()
return nil

return err
}

// trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“.
Expand All @@ -111,12 +130,7 @@ func (pq *priorityQueue) trim() {
}
if now.After(itm.expires) {
// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
pq.sizeBytes -= itm.dispose()
continue
}

Expand Down Expand Up @@ -153,12 +167,7 @@ func (pq *priorityQueue) trim() {
}

// Mark itm to be discarded.
// `pq.Dequeue()` will fully discard itm.
itm.discard = true
pq.sizeBytes -= int64(len(itm.msg.Payload))
// Preemptively discard itm's payload to reduce
// resource usage, since itm will be discarded,
itm.msg.Payload = nil
pq.sizeBytes -= itm.dispose()
}

}
Expand Down
43 changes: 32 additions & 11 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
require.NoError(err)

for _, msg := range messages {
pq.Enqueue(msg)
err = pq.Enqueue(msg)
if len(msg.Payload) > pq.maxMessageBytes && pq.maxMessageBytes != 0 {
assert.Error(err)
} else {
assert.NoError(err)
}
}

actualMsg, ok := pq.Dequeue()
Expand All @@ -100,6 +105,7 @@ func testEnqueueDequeueAgePriority(t *testing.T) {
}

func testEnqueueDequeue(t *testing.T) {
var rdr = messageIsTooLarge
emptyLowQOSMsg := wrp.Message{
Destination: "mac:00deadbeef00/config",
QualityOfService: 10,
Expand All @@ -124,8 +130,17 @@ func testEnqueueDequeue(t *testing.T) {
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"),
QualityOfService: wrp.QOSCriticalValue,
}
xLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameterXL\"]}"),
QualityOfService: wrp.QOSCriticalValue,
}
emptyXLargeCriticalQOSMsg := wrp.Message{
Destination: "mac:00deadbeef04/config",
QualityOfService: wrp.QOSCriticalValue,
RequestDeliveryResponse: &rdr,
}
enqueueSequenceTest := []wrp.Message{
largeCriticalQOSMsg,
mediumMediumQosMsg,
smallLowQOSMsg,
largeCriticalQOSMsg,
Expand All @@ -136,7 +151,6 @@ func testEnqueueDequeue(t *testing.T) {
mediumHighQosMsg,
}
dequeueSequenceTest := []wrp.Message{
largeCriticalQOSMsg,
largeCriticalQOSMsg,
largeCriticalQOSMsg,
mediumHighQosMsg,
Expand All @@ -152,8 +166,9 @@ func testEnqueueDequeue(t *testing.T) {
queueSizeSequenceTest += len(msg.Payload)
}

// expect 1 message to be drop
enqueueSequenceTest = append(enqueueSequenceTest, smallLowQOSMsg)
// test message payload drop
enqueueSequenceTest = append(enqueueSequenceTest, xLargeCriticalQOSMsg)
dequeueSequenceTest = append([]wrp.Message{emptyXLargeCriticalQOSMsg}, dequeueSequenceTest...)

tests := []struct {
description string
Expand Down Expand Up @@ -192,11 +207,12 @@ func testEnqueueDequeue(t *testing.T) {
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, xLargeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
maxMessageBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
expectedDequeueSequence: []wrp.Message{emptyXLargeCriticalQOSMsg, largeCriticalQOSMsg},
},
{
description: "drop incoming low priority messages",
Expand Down Expand Up @@ -247,7 +263,12 @@ func testEnqueueDequeue(t *testing.T) {
require.NoError(err)

for _, msg := range tc.messages {
pq.Enqueue(msg)
err = pq.Enqueue(msg)
if len(msg.Payload) > pq.maxMessageBytes && pq.maxMessageBytes != 0 {
assert.Error(err)
} else {
assert.NoError(err)
}
}

if len(tc.expectedDequeueSequence) == 0 {
Expand Down