Skip to content

Commit

Permalink
CDPCP-13131 - Implement an exponential backoff in the Cloudera Terraf…
Browse files Browse the repository at this point in the history
…orm provider
  • Loading branch information
gregito committed Oct 24, 2024
1 parent 18de6c0 commit 76f7bdb
Show file tree
Hide file tree
Showing 10 changed files with 649 additions and 23 deletions.
51 changes: 51 additions & 0 deletions cdp-sdk-go/cdp/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 Cloudera. All Rights Reserved.
//
// This file is licensed under the Apache License Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
//
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
// OF ANY KIND, either express or implied. Refer to the License for the specific
// permissions and limitations governing your use of the file.

package cdp

import (
"log"
"math"
"math/rand"
"os"
"time"
)

const (
expDeltaMin = 0.75
expDeltaMax = 1.0
)

func backoff(retries int) time.Duration {
switch os.Getenv("CDP_TF_BACKOFF_STRATEGY") {
case "linear":
{
step := intFromEnvOrDefault("CDP_TF_BACKOFF_STEP", defaultLinearBackoffStep)
log.Default().Println("Using linear backoff strategy with step: ", step)
return linearBackoff(retries, step)
}
default:
{
log.Default().Println("Using exponential backoff strategy")
return exponentialBackoff(retries)
}
}
}

func exponentialBackoff(retries int) time.Duration {
rndSrc := rand.NewSource(time.Now().UnixNano())
delta := expDeltaMax - expDeltaMin
jitter := expDeltaMin + rand.New(rndSrc).Float64()*(delta)
return time.Duration((math.Pow(2, float64(retries))*jitter)*float64(time.Millisecond)) * 1000
}

func linearBackoff(retries int, step int) time.Duration {
return time.Duration((retries+1)*step) * time.Second
}
144 changes: 144 additions & 0 deletions cdp-sdk-go/cdp/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2024 Cloudera. All Rights Reserved.
//
// This file is licensed under the Apache License Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
//
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
// OF ANY KIND, either express or implied. Refer to the License for the specific
// permissions and limitations governing your use of the file.

package cdp

import (
"os"
"testing"
"time"
)

func TestLinearBackoffWithPositiveRetries(t *testing.T) {
retries := 3
step := 2
expected := 8 * time.Second

result := linearBackoff(retries, step)
if result != expected {
t.Fatalf("Expected %v, got %v", expected, result)
}
}

func TestLinearBackoffWithZeroRetries(t *testing.T) {
retries := 0
step := 2
expected := 2 * time.Second

result := linearBackoff(retries, step)
if result != expected {
t.Fatalf("Expected %v, got %v", expected, result)
}
}

func TestLinearBackoffWithNegativeRetries(t *testing.T) {
retries := -1
step := 2
expected := 0 * time.Second

result := linearBackoff(retries, step)
if result != expected {
t.Fatalf("Expected %v, got %v", expected, result)
}
}

func TestLinearBackoffWithLargeStep(t *testing.T) {
retries := 2
step := 1000
expected := 3000 * time.Second

result := linearBackoff(retries, step)
if result != expected {
t.Fatalf("Expected %v, got %v", expected, result)
}
}

func TestExponentialBackoffWithPositiveRetries(t *testing.T) {
retries := 3
expectedMin := 6 * time.Second
expectedMax := 8 * time.Second

result := exponentialBackoff(retries)
if result < expectedMin || result > expectedMax {
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result)
}
}

func TestExponentialBackoffWithZeroRetries(t *testing.T) {
retries := 0
expectedMin := 0.75 * float64(time.Second)
expectedMax := 1 * time.Second

result := exponentialBackoff(retries)
if result < time.Duration(int(expectedMin)) || result > expectedMax {
t.Fatalf("Expected between %v and %v, got %v", time.Duration(int(expectedMin)), expectedMax, result)
}
}

func TestExponentialBackoffWithHighRetries(t *testing.T) {
retries := 10
expectedMin := 768 * time.Second
expectedMax := 1024 * time.Second

result := exponentialBackoff(retries)
if result < expectedMin || result > expectedMax {
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result)
}
}

func TestLinearBackoffStrategyWithDefaultStep(t *testing.T) {
_ = os.Setenv("CDP_TF_BACKOFF_STRATEGY", "linear")
defer func() {
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY")
}()

result := backoff(2)
expected := defaultLinearBackoffStep * 3 * time.Second
if result != expected {
t.Fatalf("Expected %v, got %v", expected, result)
}
}

func TestExponentialBackoffStrategyWithDefault(t *testing.T) {
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY")

result := backoff(3)
expectedMin := 6 * time.Second
expectedMax := 8 * time.Second
if result < expectedMin || result > expectedMax {
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result)
}
}

func TestLinearBackoffStrategyWithCustomStep(t *testing.T) {
_ = os.Setenv("CDP_TF_BACKOFF_STRATEGY", "linear")
_ = os.Setenv("CDP_TF_BACKOFF_STEP", "5")
defer func() {
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY")
_ = os.Unsetenv("CDP_TF_BACKOFF_STEP")
}()

result := backoff(1)
expected := 10 * time.Second
if result != expected {
t.Fatalf("Expected %v, got %v", expected, result)
}
}

func TestExponentialBackoffStrategyWithHighRetries(t *testing.T) {
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY")

result := backoff(10)
expectedMin := 768 * time.Second
expectedMax := 1024 * time.Second
if result < expectedMin || result > expectedMax {
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result)
}
}
68 changes: 68 additions & 0 deletions cdp-sdk-go/cdp/retryable_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2024 Cloudera. All Rights Reserved.
//
// This file is licensed under the Apache License Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
//
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
// OF ANY KIND, either express or implied. Refer to the License for the specific
// permissions and limitations governing your use of the file.

package cdp

import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"time"
)

type RetryableTransport struct {
transport http.RoundTripper
}

func shouldRetry(err error, resp *http.Response) bool {
if err != nil {
return true
} else if resp == nil {
return false
}
return sliceContains(retryableStatusCodes, resp.StatusCode)
}

func drainBody(resp *http.Response) {
if resp != nil && resp.Body != nil {
_, err := io.Copy(io.Discard, resp.Body)
if err != nil {
log.Default().Println("Error while draining body: ", err)
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
}
}

func (t *RetryableTransport) RoundTrip(req *http.Request) (*http.Response, error) {
var bodyBytes []byte
if req.Body != nil {
bodyBytes, _ = io.ReadAll(req.Body)
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
}
resp, err := t.transport.RoundTrip(req)
retries := 0
retryCount := intFromEnvOrDefault("CDP_TF_CALL_RETRY_COUNT", 10)
for shouldRetry(err, resp) && retries < retryCount {
log.Default().Printf("Retrying request (caused by: %+v;%+v)\n", err, resp)
time.Sleep(backoff(retries))
drainBody(resp)
if req.Body != nil {
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
}
resp, err = t.transport.RoundTrip(req)
fmt.Printf("%v retry out of %v\n", retries+1, retryCount)
retries++
}
return resp, err
}
Loading

0 comments on commit 76f7bdb

Please sign in to comment.