From fcaeb57b13c199b2b9fe036b68ee85bb8ead84b8 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Fri, 13 Sep 2024 10:58:51 -0400 Subject: [PATCH] feat: update rdr codes for dropped message For dropped messages, dispose of the payload and update the rdr code to the following: - rdr value of 4 for payloads too large - rdr value of 102 for lower priority messages --- internal/wrphandlers/qos/priority_queue.go | 59 +++++++++---------- .../wrphandlers/qos/priority_queue_test.go | 28 ++++++--- internal/wrphandlers/qos/qos.go | 4 +- 3 files changed, 50 insertions(+), 41 deletions(-) diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 71626ad..2cb5d7b 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -6,7 +6,6 @@ package qos import ( "container/heap" "errors" - "fmt" "slices" "time" @@ -15,6 +14,12 @@ import ( var ErrMaxMessageBytes = errors.New("wrp message payload exceeds maxMessageBytes") +var ( + // https://xmidt.io/docs/wrp/basics/#request-delivery-response-rdr-codes + messageIsTooLarge int64 = 4 + higherPriorityMessageTookTheSpot int64 = 102 +) + // priorityQueue implements heap.Interface and holds wrp Message, using wrp.QOSValue as its priority. // https://xmidt.io/docs/wrp/basics/#qos-description-qos type priorityQueue struct { @@ -56,23 +61,27 @@ type item struct { discard bool } +func (itm *item) dispose() (payloadSize int64) { + payloadSize = int64(len(itm.msg.Payload)) + // Mark itm to be discarded. + itm.discard = true + // Preemptively discard itm's payload to reduce + // resource usage, since itm will be discarded, + itm.msg.Payload = nil + itm.msg.RequestDeliveryResponse = &higherPriorityMessageTookTheSpot + + return payloadSize +} + // Dequeue returns the next highest priority message. -func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { - var ( - msg wrp.Message - ok bool - ) - for pq.Len() != 0 { - itm := heap.Pop(pq).(item) - // itm.discard will be true if `itm` has been marked to be discarded, - // i.e. trimmed by `pq.trim()'. - if itm.discard { - continue - } +func (pq *priorityQueue) Dequeue() (msg wrp.Message, ok bool) { + if pq.Len() == 0 { + return msg, false + } + itm, ok := heap.Pop(pq).(item) + if ok { msg = *itm.msg - ok = true - break } // ok will be false if no message was found, otherwise ok will be true. @@ -80,16 +89,16 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { } // Enqueue queues the given message. -func (pq *priorityQueue) Enqueue(msg wrp.Message) error { +func (pq *priorityQueue) Enqueue(msg wrp.Message) { // Check whether msg violates maxMessageBytes. // The zero value of `pq.maxMessageBytes` will disable individual message size validation. if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes { - return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes) + msg.Payload = nil + msg.RequestDeliveryResponse = &messageIsTooLarge } heap.Push(pq, msg) pq.trim() - return nil } // trim removes messages with the lowest QualityOfService until the queue no longer violates `maxQueueSize“. @@ -111,12 +120,7 @@ func (pq *priorityQueue) trim() { } if now.After(itm.expires) { // Mark itm to be discarded. - // `pq.Dequeue()` will fully discard itm. - itm.discard = true - pq.sizeBytes -= int64(len(itm.msg.Payload)) - // Preemptively discard itm's payload to reduce - // resource usage, since itm will be discarded, - itm.msg.Payload = nil + pq.sizeBytes -= itm.dispose() continue } @@ -153,12 +157,7 @@ func (pq *priorityQueue) trim() { } // Mark itm to be discarded. - // `pq.Dequeue()` will fully discard itm. - itm.discard = true - pq.sizeBytes -= int64(len(itm.msg.Payload)) - // Preemptively discard itm's payload to reduce - // resource usage, since itm will be discarded, - itm.msg.Payload = nil + pq.sizeBytes -= itm.dispose() } } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index feb2215..bb16864 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -124,8 +124,17 @@ func testEnqueueDequeue(t *testing.T) { Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameter\"]}"), QualityOfService: wrp.QOSCriticalValue, } + xLargeCriticalQOSMsg := wrp.Message{ + Destination: "mac:00deadbeef04/config", + Payload: []byte("{\"command\":\"GET\",\"names\":[\"NoSuchParameterXL\"]}"), + QualityOfService: wrp.QOSCriticalValue, + } + emptyXLargeCriticalQOSMsg := wrp.Message{ + Destination: "mac:00deadbeef04/config", + QualityOfService: wrp.QOSCriticalValue, + RequestDeliveryResponse: &messageIsTooLarge, + } enqueueSequenceTest := []wrp.Message{ - largeCriticalQOSMsg, mediumMediumQosMsg, smallLowQOSMsg, largeCriticalQOSMsg, @@ -136,7 +145,6 @@ func testEnqueueDequeue(t *testing.T) { mediumHighQosMsg, } dequeueSequenceTest := []wrp.Message{ - largeCriticalQOSMsg, largeCriticalQOSMsg, largeCriticalQOSMsg, mediumHighQosMsg, @@ -152,8 +160,9 @@ func testEnqueueDequeue(t *testing.T) { queueSizeSequenceTest += len(msg.Payload) } - // expect 1 message to be drop - enqueueSequenceTest = append(enqueueSequenceTest, smallLowQOSMsg) + // test message payload drop + enqueueSequenceTest = append(enqueueSequenceTest, xLargeCriticalQOSMsg) + dequeueSequenceTest = append([]wrp.Message{emptyXLargeCriticalQOSMsg}, dequeueSequenceTest...) tests := []struct { description string @@ -192,11 +201,12 @@ func testEnqueueDequeue(t *testing.T) { expectedQueueSize: 1, }, { - description: "message too large with a nonempty queue", - messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, - maxQueueBytes: len(largeCriticalQOSMsg.Payload), - maxMessageBytes: len(largeCriticalQOSMsg.Payload), - expectedQueueSize: 1, + description: "message too large with a nonempty queue", + messages: []wrp.Message{largeCriticalQOSMsg, xLargeCriticalQOSMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload), + maxMessageBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 1, + expectedDequeueSequence: []wrp.Message{emptyXLargeCriticalQOSMsg, largeCriticalQOSMsg}, }, { description: "drop incoming low priority messages", diff --git a/internal/wrphandlers/qos/qos.go b/internal/wrphandlers/qos/qos.go index f20ef62..612c46b 100644 --- a/internal/wrphandlers/qos/qos.go +++ b/internal/wrphandlers/qos/qos.go @@ -155,14 +155,14 @@ func (h *Handler) serviceQOS(queue <-chan wrp.Message) { } // ErrMaxMessageBytes errrors are ignored. - _ = pq.Enqueue(msg) + pq.Enqueue(msg) case <-ready: // Previous Handler.wrpHandler has finished, check whether it // was successful or not. if msg, ok := <-failedMsg; ok { // Delivery failed, re-enqueue message and try again later. // ErrMaxMessageBytes errrors are ignored. - _ = pq.Enqueue(msg) + pq.Enqueue(msg) } ready, failedMsg = nil, nil