Skip to content

Commit

Permalink
Merge branch 'main' into denopink/feat/optimize-qos-trim-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed Aug 5, 2024
2 parents 1d2bbb6 + a5309a6 commit 84bbd43
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 11 deletions.
3 changes: 1 addition & 2 deletions cmd/xmidt-agent/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ mock_tr_181:
xmidt_agent_crud:
service_name: xmidt_agent
qos:
max_queue_bytes: 1048576 # 1 * 1024 * 1024 // 1MB max/queue,
max_message_bytes: 262144 # 256 * 1024 // 256 KB
max_queue_bytes: 104857600 # 100 * 1024 * 1024 // 100MB max/queue,
priority: newest
metadata:
fields:
Expand Down
2 changes: 1 addition & 1 deletion internal/wrphandlers/mocktr181/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (h Handler) HandleWrp(msg wrp.Message) error {
default:
// currently only get and set are implemented for existing mocktr181
statusCode = 520
payloadResponse = []byte(fmt.Sprintf(`{"message": "command %s is not support", "statusCode": %d}`, command, statusCode))
payloadResponse = []byte(fmt.Sprintf(`{"message": "command '%s' is not supported", "statusCode": %d}`, command, statusCode))
}

response := msg
Expand Down
2 changes: 0 additions & 2 deletions internal/wrphandlers/qos/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func MaxMessageBytes(s int) Option {
func(h *Handler) error {
if s < 0 {
return fmt.Errorf("%w: negative MaxMessageBytes", ErrMisconfiguredQOS)
} else if s == 0 {
s = DefaultMaxMessageBytes
}

h.maxMessageBytes = s
Expand Down
6 changes: 4 additions & 2 deletions internal/wrphandlers/qos/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type priorityQueue struct {
priority PriorityType
// tieBreaker breaks any QualityOfService ties.
tieBreaker tieBreaker
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads
// maxQueueBytes is the allowable max size of the queue based on the sum of all queued wrp message's payloads.
// Zero value will disable individual message size validation.
maxQueueBytes int64
// MaxMessageBytes is the largest allowable wrp message payload.
maxMessageBytes int
Expand Down Expand Up @@ -81,7 +82,8 @@ func (pq *priorityQueue) Dequeue() (wrp.Message, bool) {
// Enqueue queues the given message.
func (pq *priorityQueue) Enqueue(msg wrp.Message) error {
// Check whether msg violates maxMessageBytes.
if len(msg.Payload) > pq.maxMessageBytes {
// The zero value of `pq.maxMessageBytes` will disable individual message size validation.
if pq.maxMessageBytes != 0 && len(msg.Payload) > pq.maxMessageBytes {
return fmt.Errorf("%w: %v", ErrMaxMessageBytes, pq.maxMessageBytes)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/wrphandlers/qos/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ func testEnqueueDequeue(t *testing.T) {
maxMessageBytes: len(largeCriticalQOSMsg.Payload) - 1,
expectedQueueSize: 0,
},
{
description: "allow any message size",
messages: []wrp.Message{largeCriticalQOSMsg},
maxQueueBytes: len(largeCriticalQOSMsg.Payload),
expectedQueueSize: 1,
},
{
description: "message too large with a nonempty queue",
messages: []wrp.Message{largeCriticalQOSMsg, largeCriticalQOSMsg},
Expand Down
16 changes: 12 additions & 4 deletions internal/wrphandlers/qos/qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestHandler_HandleWrp(t *testing.T) {
tests := []struct {
description string
options []qos.Option
priority qos.PriorityType
// int64 required for nextCallCount atomic.Int64 comparison
nextCallCount int64
next wrpkit.Handler
Expand All @@ -54,6 +53,16 @@ func TestHandler_HandleWrp(t *testing.T) {
return nil
}),
},
{
description: "enqueued and delivered message prioritizing newer messages with no message size restriction",
options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.Priority(qos.NewestType)},
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
nextCallCount.Add(1)

return nil
}),
},
{
description: "enqueued and delivered message prioritizing older messages",
options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.OldestType)},
Expand Down Expand Up @@ -93,9 +102,8 @@ func TestHandler_HandleWrp(t *testing.T) {
shouldHalt: true,
},
{
description: "zero MaxQueueBytes option value",
options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)},

description: "zero MaxQueueBytes option value",
options: []qos.Option{qos.MaxQueueBytes(int64(100)), qos.MaxMessageBytes(50), qos.Priority(qos.NewestType)},
nextCallCount: 1,
next: wrpkit.HandlerFunc(func(wrp.Message) error {
nextCallCount.Add(1)
Expand Down

0 comments on commit 84bbd43

Please sign in to comment.