From b151051ff68eb6878331edabb4fd997314c09c42 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Thu, 1 Aug 2024 13:09:45 -0400 Subject: [PATCH 1/3] patch: fix mocktr181 unknow command response message typo - fix response message typo for unknown mocktr181 cmds --- internal/wrphandlers/mocktr181/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/wrphandlers/mocktr181/handler.go b/internal/wrphandlers/mocktr181/handler.go index ab593a0..0bb7b77 100644 --- a/internal/wrphandlers/mocktr181/handler.go +++ b/internal/wrphandlers/mocktr181/handler.go @@ -143,7 +143,7 @@ func (h Handler) HandleWrp(msg wrp.Message) error { default: // currently only get and set are implemented for existing mocktr181 statusCode = 520 - payloadResponse = []byte(fmt.Sprintf(`{"message": "command %s is not support", "statusCode": %d}`, command, statusCode)) + payloadResponse = []byte(fmt.Sprintf(`{"message": "command '%s' is not supported", "statusCode": %d}`, command, statusCode)) } response := msg From 4139f13d5fbca090f925b072da4dc1af3d17ad8c Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Thu, 1 Aug 2024 13:14:10 -0400 Subject: [PATCH 2/3] patch: match parodus' default qos queue behavior - disable individual message size validation - increase the default for max_queue_bytes (required for qa testing) --- cmd/xmidt-agent/default-config.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/xmidt-agent/default-config.yaml b/cmd/xmidt-agent/default-config.yaml index 080e92a..ef45d42 100644 --- a/cmd/xmidt-agent/default-config.yaml +++ b/cmd/xmidt-agent/default-config.yaml @@ -127,8 +127,7 @@ mock_tr_181: xmidt_agent_crud: service_name: xmidt_agent qos: - max_queue_bytes: 1048576 # 1 * 1024 * 1024 // 1MB max/queue, - max_message_bytes: 262144 # 256 * 1024 // 256 KB + max_queue_bytes: 104857600 # 100 * 1024 * 1024 // 100MB max/queue, priority: newest metadata: fields: From 5eed84a6ef3ab6706a12251da8589c097d1b256e Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Thu, 1 Aug 2024 16:16:47 -0400 Subject: [PATCH 3/3] feat: support disableable qos message size validation - disableable qos message size validation - this will match parodus's default behavior - no individual message size validation --- internal/wrphandlers/qos/options.go | 2 -- internal/wrphandlers/qos/priority_queue.go | 6 ++++-- internal/wrphandlers/qos/priority_queue_test.go | 6 ++++++ internal/wrphandlers/qos/qos_test.go | 13 ++++++++++++- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/internal/wrphandlers/qos/options.go b/internal/wrphandlers/qos/options.go index 4904f5b..bc3ce2a 100644 --- a/internal/wrphandlers/qos/options.go +++ b/internal/wrphandlers/qos/options.go @@ -37,8 +37,6 @@ func MaxMessageBytes(s int) Option { func(h *Handler) error { if s < 0 { return fmt.Errorf("%w: negative MaxMessageBytes", ErrMisconfiguredQOS) - } else if s == 0 { - s = DefaultMaxMessageBytes } h.maxMessageBytes = s diff --git a/internal/wrphandlers/qos/priority_queue.go b/internal/wrphandlers/qos/priority_queue.go index 39a59cb..9397306 100644 --- a/internal/wrphandlers/qos/priority_queue.go +++ b/internal/wrphandlers/qos/priority_queue.go @@ -21,7 +21,8 @@ type priorityQueue struct { queue []item // tieBreaker breaks any QualityOfService ties. tieBreaker tieBreaker - // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads + // maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads. + // Zero value will disable individual message size validation. maxQueueBytes int64 // MaxMessageBytes is the largest allowable wrp message payload. maxMessageBytes int @@ -52,7 +53,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) { // Enqueue queues the given message. func (pq *priorityQueue) Enqueue(msg wrp.Message) error { // Check whether msg violates maxMessageBytes. - if len(msg.Payload) > pq.maxMessageBytes { + // The zero value of `pq.maxMessageBytes` will disable individual message size validation. + if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes { return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes) } diff --git a/internal/wrphandlers/qos/priority_queue_test.go b/internal/wrphandlers/qos/priority_queue_test.go index e6ca034..817b149 100644 --- a/internal/wrphandlers/qos/priority_queue_test.go +++ b/internal/wrphandlers/qos/priority_queue_test.go @@ -166,6 +166,12 @@ func testEnqueueDequeue(t *testing.T) { maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1, expectedQueueSize: 0, }, + { + description: "allow any message size", + messages: []wrp.Message{largeCriticalQOSMsg}, + maxQueueBytes: len(largeCriticalQOSMsg.Payload), + expectedQueueSize: 1, + }, { description: "message too large with a nonempty queue", messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg}, diff --git a/internal/wrphandlers/qos/qos_test.go b/internal/wrphandlers/qos/qos_test.go index c643718..38a0069 100644 --- a/internal/wrphandlers/qos/qos_test.go +++ b/internal/wrphandlers/qos/qos_test.go @@ -45,6 +45,17 @@ func TestHandler_HandleWrp(t *testing.T) { expectedHandleWRPErr error }{ // success cases + { + description: "enqueued and delivered message prioritizing newer messages with no message size restriction", + maxQueueBytes: 100, + priority: qos.NewestType, + nextCallCount: 1, + next: wrpkit.HandlerFunc(func(wrp.Message) error { + nextCallCount.Add(1) + + return nil + }), + }, { description: "enqueued and delivered message prioritizing newer messages", maxQueueBytes: 100, @@ -103,7 +114,7 @@ func TestHandler_HandleWrp(t *testing.T) { { description: "zero MaxQueueBytes option value", maxQueueBytes: 0, - maxMessageBytes: 50, + maxMessageBytes: qos.DefaultMaxMessageBytes, priority: qos.NewestType, nextCallCount: 1, next: wrpkit.HandlerFunc(func(wrp.Message) error {