Skip to content

Commit

Permalink
Handle throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentBaer authored and Vincent Baer committed Dec 15, 2022
1 parent d4c3f77 commit cf90fb5
Show file tree
Hide file tree
Showing 17 changed files with 1,478 additions and 366 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ cloud-init-secret:

.PHONY: testenv
testenv: cloud-init-secret
USE_EXISTING_CLUSTER=true OSC_REGION=${OSC_REGION} IMG_UPGRADE_FROM=${IMG_UPGRADE_FROM} go test -v -coverprofile=covers.out ./testenv/ -ginkgo.v -ginkgo.progress -test.v -test.timeout 30m
USE_EXISTING_CLUSTER=true OSC_REGION=${OSC_REGION} IMG_UPGRADE_FROM=${IMG_UPGRADE_FROM} go test -v -coverprofile=covers.out ./testenv/ -ginkgo.v -ginkgo.progress -test.v -test.timeout 30m

.PHONY: testclean
testclean:
Expand Down
105 changes: 79 additions & 26 deletions cloud/services/compute/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package compute

import (
"fmt"
_nethttp "net/http"

"github.com/outscale-dev/cluster-api-provider-outscale.git/util/reconciler"
osc "github.com/outscale/osc-sdk-go/v2"
"k8s.io/apimachinery/pkg/util/wait"
)

//go:generate ../../../bin/mockgen -destination mock_compute/image_mock.go -package mock_compute -source ./image.go
Expand All @@ -36,18 +39,36 @@ func (s *Service) GetImage(imageId string) (*osc.Image, error) {
}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
readImageResponse, httpRes, err := oscApiClient.ImageApi.ReadImages(oscAuthClient).ReadImagesRequest(readImageRequest).Execute()
if err != nil {
if httpRes != nil {
return nil, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return nil, err

var readImagesResponse osc.ReadImagesResponse
readImageCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
readImagesResponse, httpRes, err = oscApiClient.ImageApi.ReadImages(oscAuthClient).ReadImagesRequest(readImageRequest).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", readImageRequest)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err
}
return true, err
}
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, readImageCallBack)
if waitErr != nil {
return nil, waitErr
}
if len(readImageResponse.GetImages()) == 0 {
if len(readImagesResponse.GetImages()) == 0 {
return nil, nil
}
image := readImageResponse.GetImages()[0]
image := readImagesResponse.GetImages()[0]

return &image, nil
}
Expand All @@ -59,19 +80,35 @@ func (s *Service) GetImageId(imageName string) (string, error) {
}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
readImageResponse, httpRes, err := oscApiClient.ImageApi.ReadImages(oscAuthClient).ReadImagesRequest(readImageRequest).Execute()
if err != nil {
if httpRes != nil {
return "", fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return "", err
var readImagesResponse osc.ReadImagesResponse
readImageCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
readImagesResponse, httpRes, err = oscApiClient.ImageApi.ReadImages(oscAuthClient).ReadImagesRequest(readImageRequest).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", readImageRequest)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err
}

return true, err
}
if len(readImageResponse.GetImages()) == 0 {
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, readImageCallBack)
if waitErr != nil {
return "", waitErr
}
if len(readImagesResponse.GetImages()) == 0 {
return "", nil
}
imageId := readImageResponse.GetImages()[0].ImageId
imageId := readImagesResponse.GetImages()[0].ImageId

return *imageId, nil
}
Expand All @@ -83,18 +120,34 @@ func (s *Service) GetImageName(imageId string) (string, error) {
}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
readImageResponse, httpRes, err := oscApiClient.ImageApi.ReadImages(oscAuthClient).ReadImagesRequest(readImageRequest).Execute()
if err != nil {
if httpRes != nil {
return "", fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return "", err
var readImagesResponse osc.ReadImagesResponse
readImageCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
readImagesResponse, httpRes, err = oscApiClient.ImageApi.ReadImages(oscAuthClient).ReadImagesRequest(readImageRequest).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", readImageRequest)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err
}
return true, err
}
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, readImageCallBack)
if waitErr != nil {
return "", waitErr
}
if len(readImageResponse.GetImages()) == 0 {
if len(readImagesResponse.GetImages()) == 0 {
return "", nil
}
imageName := readImageResponse.GetImages()[0].ImageName

imageName := readImagesResponse.GetImages()[0].ImageName
return *imageName, nil
}
161 changes: 131 additions & 30 deletions cloud/services/compute/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ import (
"strings"
"time"

_nethttp "net/http"

"github.com/benbjohnson/clock"
infrastructurev1beta1 "github.com/outscale-dev/cluster-api-provider-outscale.git/api/v1beta1"
"github.com/outscale-dev/cluster-api-provider-outscale.git/cloud/scope"
tag "github.com/outscale-dev/cluster-api-provider-outscale.git/cloud/tag"
"github.com/outscale-dev/cluster-api-provider-outscale.git/util/reconciler"
osc "github.com/outscale/osc-sdk-go/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
)

//go:generate ../../../bin/mockgen -destination mock_compute/vm_mock.go -package mock_compute -source ./vm.go
Expand Down Expand Up @@ -105,21 +109,46 @@ func (s *Service) CreateVm(machineScope *scope.MachineScope, spec *infrastructur

oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
vmResponse, httpRes, err := oscApiClient.VmApi.CreateVms(oscAuthClient).CreateVmsRequest(vmOpt).Execute()
if err != nil {
if httpRes != nil {
return nil, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return nil, err
var vmResponse osc.CreateVmsResponse
createVmCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
vmResponse, httpRes, err = oscApiClient.VmApi.CreateVms(oscAuthClient).CreateVmsRequest(vmOpt).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", vmOpt)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err
}
return true, err
}
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, createVmCallBack)
if waitErr != nil {
return nil, waitErr
}
vms, ok := vmResponse.GetVmsOk()
if !ok {
return nil, errors.New("Can not get vm")
}
vmID := *(*vmResponse.Vms)[0].VmId
resourceIds := []string{vmID}
err = tag.AddTag("Name", vmName, resourceIds, oscApiClient, oscAuthClient)
vmTag := osc.ResourceTag{
Key: "Name",
Value: vmName,
}
vmTagRequest := osc.CreateTagsRequest{
ResourceIds: resourceIds,
Tags: []osc.ResourceTag{vmTag},
}
err, httpRes := tag.AddTag(vmTagRequest, resourceIds, oscApiClient, oscAuthClient)
if err != nil {
if httpRes != nil {
return nil, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
Expand All @@ -140,13 +169,29 @@ func (s *Service) DeleteVm(vmId string) error {
deleteVmsRequest := osc.DeleteVmsRequest{VmIds: []string{vmId}}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
_, httpRes, err := oscApiClient.VmApi.DeleteVms(oscAuthClient).DeleteVmsRequest(deleteVmsRequest).Execute()
if err != nil {
if httpRes != nil {
return fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return err
deleteVmsCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
_, httpRes, err = oscApiClient.VmApi.DeleteVms(oscAuthClient).DeleteVmsRequest(deleteVmsRequest).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", deleteVmsRequest)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err
}
return true, err
}
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, deleteVmsCallBack)
if waitErr != nil {
return waitErr
}
return nil
}
Expand All @@ -160,14 +205,32 @@ func (s *Service) GetVm(vmId string) (*osc.Vm, error) {
}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
readVmsResponse, httpRes, err := oscApiClient.VmApi.ReadVms(oscAuthClient).ReadVmsRequest(readVmsRequest).Execute()
if err != nil {
if httpRes != nil {
return nil, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return nil, err
var readVmsResponse osc.ReadVmsResponse
readVmsCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
readVmsResponse, httpRes, err = oscApiClient.VmApi.ReadVms(oscAuthClient).ReadVmsRequest(readVmsRequest).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", readVmsRequest)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err
}
return true, err
}
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, readVmsCallBack)
if waitErr != nil {
return nil, waitErr
}

vms, ok := readVmsResponse.GetVmsOk()
if !ok {
return nil, errors.New("Can not get vm")
Expand All @@ -190,14 +253,33 @@ func (s *Service) GetVmListFromTag(tagKey string, tagValue string) ([]osc.Vm, er
}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()
readVmsResponse, httpRes, err := oscApiClient.VmApi.ReadVms(oscAuthClient).ReadVmsRequest(readVmsRequest).Execute()
if err != nil {
if httpRes != nil {
return nil, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
} else {
return nil, err
var readVmsResponse osc.ReadVmsResponse
readVmsCallBack := func() (bool, error) {
var httpRes *_nethttp.Response
var err error
readVmsResponse, httpRes, err = oscApiClient.VmApi.ReadVms(oscAuthClient).ReadVmsRequest(readVmsRequest).Execute()
if err != nil {
if httpRes != nil {
return false, fmt.Errorf("error %w httpRes %s", err, httpRes.Status)
}
requestStr := fmt.Sprintf("%v", readVmsRequest)
if reconciler.KeepRetryWithError(
requestStr,
httpRes.StatusCode,
reconciler.ThrottlingErrors) {
return false, nil
}
return false, err

}
return true, err
}
backoff := reconciler.EnvBackoff()
waitErr := wait.ExponentialBackoff(backoff, readVmsCallBack)
if waitErr != nil {
return nil, waitErr
}

vms, ok := readVmsResponse.GetVmsOk()
if !ok {
return nil, errors.New("Can not get vm")
Expand Down Expand Up @@ -293,16 +375,35 @@ func (s *Service) AddCcmTag(clusterName string, hostname string, vmId string) er
resourceIds := []string{vmId}
oscApiClient := s.scope.GetApi()
oscAuthClient := s.scope.GetAuth()

err := tag.AddTag("OscK8SNodeName", hostname, resourceIds, oscApiClient, oscAuthClient)
nodeTag := osc.ResourceTag{
Key: "OscK8SNodeName",
Value: hostname,
}
nodeTagRequest := osc.CreateTagsRequest{
ResourceIds: resourceIds,
Tags: []osc.ResourceTag{nodeTag},
}
err, httpRes := tag.AddTag(nodeTagRequest, resourceIds, oscApiClient, oscAuthClient)
if err != nil {
return fmt.Errorf("%w failed to add OscK8sNodeName tag", err)
fmt.Printf("Error with http result %s", httpRes.Status)
return err
}
clusterTag := osc.ResourceTag{
Key: "OscK8sClusterID/" + clusterName,
Value: "owned",
}
err = tag.AddTag("OscK8sClusterID/"+clusterName, "owned", resourceIds, oscApiClient, oscAuthClient)
clusterTagRequest := osc.CreateTagsRequest{
ResourceIds: resourceIds,
Tags: []osc.ResourceTag{clusterTag},
}

err, httpRes = tag.AddTag(clusterTagRequest, resourceIds, oscApiClient, oscAuthClient)
if err != nil {
return fmt.Errorf("%w failed to add OscK8sClusterId tag", err)
fmt.Printf("Error with http result %s", httpRes.Status)
return err
}
return nil

}

func GetCPUQuantityFromInt(cores int) (resource.Quantity, error) {
Expand Down
Loading

0 comments on commit cf90fb5

Please sign in to comment.