Skip to content

Commit

Permalink
Merge pull request #251 from practo/fix-intn-args
Browse files Browse the repository at this point in the history
Fixes panic for Intn() and the random maxWait bug
  • Loading branch information
kishore-practo authored Sep 13, 2021
2 parents 532d0ee + 8e96a1a commit 2fce7ed
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
23 changes: 12 additions & 11 deletions pkg/redshiftloader/loader_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (h *loaderHandler) throttle(topic string, metric metricSetter, sinkGroup st

// randomMaxWait helps to keep the maxWait +- 20% of the specified value
// this is required to spread the load in Redshift
func (h *loaderHandler) randomMaxWait(topic string) *int {
func (h *loaderHandler) randomMaxWait(topic string) int {
var maxAllowed, minAllowed *int
_, _, table := transformer.ParseTopic(topic)
queries, err := h.prometheusClient.FilterVector(
Expand All @@ -254,7 +254,7 @@ func (h *loaderHandler) randomMaxWait(topic string) *int {
)
if err != nil {
klog.Warningf("Can't use prometheus to decide maxWait, err: %v", err)
return h.maxWaitSeconds
return *h.maxWaitSeconds
}
if queries != nil && float64(*queries) > 0.0 {
klog.V(2).Infof("%s: queries:%+v", topic, *queries)
Expand All @@ -263,9 +263,8 @@ func (h *loaderHandler) randomMaxWait(topic string) *int {
klog.V(2).Infof("%s: queries:0", topic)
minAllowed = h.maxWaitSeconds
}
newMaxWait := util.Randomize(*h.maxWaitSeconds, 0.20, maxAllowed, minAllowed)

return &newMaxWait
return util.Randomize(*h.maxWaitSeconds, 0.20, maxAllowed, minAllowed)
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
Expand Down Expand Up @@ -299,11 +298,12 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
metric,
)

maxWaitSeconds := *h.maxWaitSeconds
// randomize maxWait if prometheus and redshift metrics are available
if h.prometheusClient != nil && h.redshiftMetrics {
h.maxWaitSeconds = h.randomMaxWait(claim.Topic())
maxWaitSeconds = h.randomMaxWait(claim.Topic())
}
klog.V(2).Infof("%s: maxWaitSeconds=%vs", claim.Topic(), *h.maxWaitSeconds)
klog.V(2).Infof("%s: maxWaitSeconds=%vs", claim.Topic(), maxWaitSeconds)

if err != nil {
return fmt.Errorf(
Expand All @@ -323,7 +323,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
processor,
)
maxWaitTicker := time.NewTicker(
time.Duration(*h.maxWaitSeconds) * time.Second,
time.Duration(maxWaitSeconds) * time.Second,
)

klog.V(4).Infof("%s: read msgs", claim.Topic())
Expand Down Expand Up @@ -395,7 +395,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
)
maxWaitTicker.Stop()
err = msgBatch.Process(session)
maxWaitTicker.Reset(time.Duration(*h.maxWaitSeconds) * time.Second)
maxWaitTicker.Reset(time.Duration(maxWaitSeconds) * time.Second)
if err != nil {
return err
}
Expand All @@ -415,7 +415,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
h.loadRunning.Store(claim.Topic(), true)
err = msgBatch.Process(session)
maxWaitTicker.Reset(time.Duration(*h.maxWaitSeconds) * time.Second)
maxWaitTicker.Reset(time.Duration(maxWaitSeconds) * time.Second)
if err != nil {
h.loadRunning.Store(claim.Topic(), false)
return err
Expand All @@ -426,8 +426,9 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
case <-maxWaitTicker.C:
// Process the batch by time
klog.V(2).Infof(
"%s: maxWaitSeconds hit",
"%s: maxWaitSeconds: %vs hit",
claim.Topic(),
maxWaitSeconds,
)
maxWaitTicker.Stop()
if msgBatch.Size() > 0 {
Expand All @@ -438,7 +439,7 @@ func (h *loaderHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
}
h.loadRunning.Store(claim.Topic(), true)
err = msgBatch.Process(session)
maxWaitTicker.Reset(time.Duration(*h.maxWaitSeconds) * time.Second)
maxWaitTicker.Reset(time.Duration(maxWaitSeconds) * time.Second)
if err != nil {
h.loadRunning.Store(claim.Topic(), false)
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/util/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ func Randomize(
min = value - diff
}

return min + rand.Intn(max-min)
gap := max - min
if gap <= 0 {
return min
}

return min + rand.Intn(gap)
}
12 changes: 10 additions & 2 deletions pkg/util/random_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package util

import (
"fmt"
"math/rand"
"testing"
"time"
Expand All @@ -13,6 +12,7 @@ func TestRandomize(t *testing.T) {

maxAllowed := 1800
minAllowed := 1800
max4 := 4

tests := []struct {
name string
Expand Down Expand Up @@ -50,13 +50,21 @@ func TestRandomize(t *testing.T) {
maxAllowed: nil,
minAllowed: &minAllowed,
},
{
name: "with max-min ==0",
value: 4,
diffPercent: 0.2,
maxAllowed: &max4,
minAllowed: nil,
min: 4,
max: 4,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
v := Randomize(tc.value, tc.diffPercent, tc.maxAllowed, tc.minAllowed)
fmt.Println(v)
if v < tc.min || v > tc.max {
t.Errorf("expected in range: >%v <%v, got: %v\n", tc.min, tc.max, v)
}
Expand Down

0 comments on commit 2fce7ed

Please sign in to comment.