Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #37 from everpeace/integration-test
Browse files Browse the repository at this point in the history
implement simple e2e test
  • Loading branch information
everpeace authored Oct 15, 2021
2 parents 19ef0c2 + 899b280 commit 9513fa8
Show file tree
Hide file tree
Showing 14 changed files with 1,207 additions and 51 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ jobs:
run: make
- name: Test
run: make test
- name: E2E Test
run: make e2e
- name: Validate .goreleaser.yml
uses: goreleaser/goreleaser-action@v2
with:
Expand Down
37 changes: 36 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ build-only:

.PHONY: test
test: fmt lint
go test ./...
go test $$(go list ./... | grep -v "test/integration")

.PHONY: clean
clean:
Expand Down Expand Up @@ -97,11 +97,14 @@ GOLANGCI_LINT = $(DEV_TOOL_PREFIX)/bin/golangci-lint
GO_LICENSER = $(DEV_TOOL_PREFIX)/bin/go-licenser
GO_IMPORTS = $(DEV_TOOL_PREFIX)/bin/goimports
CONTROLLER_GEN = $(DEV_TOOL_PREFIX)/bin/controller-gen
KIND = $(DEV_TOOL_PREFIX)/bin/kind
KIND_KUBECNOFIG = $(DEV_TOOL_PREFIX)/.kubeconfig
setup:
$(call go-get-tool,$(GO_IMPORTS),golang.org/x/tools/cmd/goimports)
$(call go-get-tool,$(GO_LICENSER),github.com/elastic/go-licenser)
$(call go-get-tool,$(GIT_SEMV),github.com/linyows/git-semv/cmd/git-semv)
$(call go-get-tool,$(CONTROLLER_GEN),sigs.k8s.io/controller-tools/cmd/[email protected])
$(call go-get-tool,$(KIND),sigs.k8s.io/kind)
cd $(shell go env GOPATH) && \
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(DEV_TOOL_PREFIX)/bin v1.27.0

Expand Down Expand Up @@ -146,3 +149,35 @@ dev-run-debug: dev-scheduler-conf
--config=./hack/dev/scheduler-config.yaml \
--kubeconfig=$(HOME)/.kube/config \
--v=3

#
# E2E test
#
export E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=10s
export E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION=2s
E2E_PAUSE_IMAGE=k8s.gcr.io/pause:3.2
E2E_KIND_KUBECNOFIG = $(DEV_TOOL_PREFIX)/.kubeconfig
e2e-setup:
$(KIND) get clusters | grep kube-throttler-e2e 2>&1 >/dev/null \
|| $(KIND) create cluster --name=kube-throttler-e2e --kubeconfig=$(E2E_KIND_KUBECNOFIG)
kubectl --kubeconfig=$(E2E_KIND_KUBECNOFIG) apply -f ./deploy/crd.yaml
docker pull $(E2E_PAUSE_IMAGE)
$(KIND) load docker-image $(E2E_PAUSE_IMAGE) --name=kube-throttler-e2e
kubectl --kubeconfig=$(E2E_KIND_KUBECNOFIG) wait --timeout=120s \
--for=condition=Ready -n kube-system \
node/kube-throttler-e2e-control-plane \
pod/kube-apiserver-kube-throttler-e2e-control-plane

e2e-teardown:
$(KIND) get clusters | grep kube-throttler-e2e 2>&1 >/dev/null \
&& $(KIND) delete cluster --name=kube-throttler-e2e

e2e: fmt lint e2e-setup
GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=$(E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT) \
GOMEGA_DEFAULT_CONSISTENTLY_DURATION=$(E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION) \
go test ./test/integration --kubeconfig=$(E2E_KIND_KUBECNOFIG) --pause-image=$(E2E_PAUSE_IMAGE)

e2e-debug: fmt lint e2e-setup
GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT=$(E2E_GOMEGA_DEFAULT_EVENTUALLY_TIMEOUT) \
GOMEGA_DEFAULT_CONSISTENTLY_DURATION=$(E2E_GOMEGA_DEFAULT_CONSISTENTLY_DURATION) \
dlv test --headless --listen=0.0.0.0:2345 --api-version=2 --log ./test/integration -- --kubeconfig=$(E2E_KIND_KUBECNOFIG)
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ module github.com/everpeace/kube-throttler
go 1.16

require (
github.com/MakeNowJust/heredoc v1.0.0
github.com/google/go-cmp v0.5.5
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.14.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.16.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/spf13/cobra v1.2.1
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/GoogleCloudPlatform/k8s-cloud-provider v0.0.0-20200415212048-7901bc822317/go.mod h1:DF8FZRxMHMGv/vP2lQP6h+dYzzjpuRn24VeRiYn3qjQ=
github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab/go.mod h1:3VYc5hodBMJ5+l/7J4xAyMeuM2PNuepvHlGs8yilUCA=
github.com/MakeNowJust/heredoc v0.0.0-20170808103936-bb23615498cd/go.mod h1:64YHyfSL2R96J44Nlwm39UHepQbyR5q10x7iYa1ks2E=
github.com/MakeNowJust/heredoc v1.0.0 h1:cXCdzVdstXyiTqTvfqk9SDHpKNjxuom+DOlyEeQ4pzQ=
github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6lCpQ4fNJ8LE=
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
github.com/Microsoft/go-winio v0.4.15/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
github.com/Microsoft/hcsshim v0.8.10-0.20200715222032-5eafd1556990/go.mod h1:ay/0dTb7NsG8QMDfsRfLHgZo/6xAJShLe1+ePPflihk=
Expand Down Expand Up @@ -497,15 +499,16 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
Expand Down
77 changes: 56 additions & 21 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corev1informer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -136,35 +137,69 @@ func (c *ClusterThrottleController) reconcile(key string) error {
}
newStatus.Throttled = newStatus.CalculatedThreshold.Threshold.IsThrottled(newStatus.Used, true)

unreserveAffectedPods := func() (schedulev1alpha1.ResourceAmount, sets.String) {
// Once status is updated, affected pods is safe to un-reserve from reserved resoruce amount cache
// We make sure to un-reserve terminated pods too here because it misses to unreserve terminated pods
// when reconcile is rate-limitted
unreservedPods := []string{}
for _, p := range append(affectedNonTerminatedPods, affectedTerminatedPods...) {
unreserved := c.UnReserveOnClusterThrottle(p, thr)
if unreserved {
unreservedPods = append(unreservedPods, p.Namespace+"/"+p.Name)
}
}
if len(unreservedPods) > 0 {
klog.V(2).InfoS(
"Pods are un-reserved for ClusterThrottle",
"ClusterThrottle", thr.Namespace+"/"+thr.Name,
"#Pods", len(unreservedPods),
"Pods", strings.Join(unreservedPods, ","),
)
}
return c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})
}

if !apiequality.Semantic.DeepEqual(thr.Status, *newStatus) {
klog.V(2).InfoS("Updating status", "ClusterThrottle", thr.Namespace+"/"+thr.Name)
thr.Status = *newStatus
c.metricsRecorder.recordClusterThrottleMetrics(thr)

klog.V(2).InfoS("Updating status",
"ClusterThrottle", thr.Namespace+"/"+thr.Name,
"Used", thr.Status.Used,
"Throttled", thr.Status.Throttled,
"Threshold", thr.Status.CalculatedThreshold.Threshold,
"CalculatedAt", thr.Status.CalculatedThreshold.CalculatedAt,
"Message", strings.Join(thr.Status.CalculatedThreshold.Messages, ","),
)

if thr, err = c.scheduleClientset.ScheduleV1alpha1().ClusterThrottles().UpdateStatus(ctx, thr, metav1.UpdateOptions{}); err != nil {
utilruntime.HandleError(errors.Wrapf(err, "failed to update ClusterThrottle '%s' status", key))
return err
}

reservedAmt, reservedPodNNs := unreserveAffectedPods()
klog.V(2).InfoS("Status updated successfully",
"ClusterThrottle", thr.Namespace+"/"+thr.Name,
"Used", thr.Status.Used,
"Throttled", thr.Status.Throttled,
"CalculatedAt", thr.Status.CalculatedThreshold.CalculatedAt,
"Threshold", thr.Status.CalculatedThreshold.Threshold,
"Message", strings.Join(thr.Status.CalculatedThreshold.Messages, ","),
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(reservedPodNNs.List(), ","),
)
} else {
c.metricsRecorder.recordClusterThrottleMetrics(thr)
klog.V(2).InfoS("No need to update status", "ClusterThrottle", thr.Namespace+"/"+thr.Name)
}

// Once status is updated, affected pods is safe to un-reserve from reserved resoruce amount cache
// We make sure to un-reserve terminated pods too here because it misses to unreserve terminated pods
// when reconcile is rate-limitted
unreservedPods := []string{}
for _, p := range append(affectedNonTerminatedPods, affectedTerminatedPods...) {
unreserved := c.UnReserveOnClusterThrottle(p, thr)
if unreserved {
unreservedPods = append(unreservedPods, p.Namespace+"/"+p.Name)
}
}
if len(unreservedPods) > 0 {
klog.V(2).InfoS(
"Pods are un-reserved for ClusterThrottle",
reservedAmt, reservedPodNNs := unreserveAffectedPods()
klog.V(2).InfoS("No need to update status",
"ClusterThrottle", thr.Namespace+"/"+thr.Name,
"#Pods", len(unreservedPods),
"Pods", strings.Join(unreservedPods, ","),
"Threshold", thr.Status.CalculatedThreshold.Threshold,
"CalculatedAt", thr.Status.CalculatedThreshold.CalculatedAt,
"Message", strings.Join(thr.Status.CalculatedThreshold.Messages, ","),
"Used", thr.Status.Used,
"Throttled", thr.Status.Throttled,
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(reservedPodNNs.List(), ","),
)
}

Expand Down Expand Up @@ -404,7 +439,7 @@ func (c *ClusterThrottleController) setupEventHandler() {
if !c.isResponsibleFor(thr) {
return
}
klog.V(4).InfoS("Add event", "ClusterThrottle", thr.Name)
klog.V(4).InfoS("Delete event", "ClusterThrottle", thr.Name)
c.enqueue(thr)
},
})
Expand Down Expand Up @@ -482,8 +517,8 @@ func (c *ClusterThrottleController) setupEventHandler() {
}

// reconcile
klog.V(4).InfoS("Reconciling ClusterThrottles", "Pod", newPod.Namespace+"/"+newPod.Name, "#ClusterThrottles", len(throttleNNs))
for nn := range throttleNNs {
klog.V(4).InfoS("Enqueue clusterthrottle for pod update", "Throttle", nn.String(), "Pod", newPod.Namespace+"/"+newPod.Name)
c.enqueue(cache.ExplicitKey(nn.String()))
}
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/reserved_resource_ammounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func (c *reservedResourceAmounts) addPod(nn types.NamespacedName, pod *corev1.Po
c.cache[nn] = podResourceAmountMap{}
}

added := c.cache[nn].add(pod)
klog.V(5).InfoS("reservedResourceAmounts.addPod", "Pod", pod.Namespace+"/"+pod.Name, "NamespacedName", nn.String(), "Cache", c.cache)
return c.cache[nn].add(pod)
return added
}

func (c *reservedResourceAmounts) removePod(nn types.NamespacedName, pod *corev1.Pod) bool {
Expand Down
79 changes: 56 additions & 23 deletions pkg/controllers/throttle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,35 +134,69 @@ func (c *ThrottleController) reconcile(key string) error {
}
newStatus.Throttled = newStatus.CalculatedThreshold.Threshold.IsThrottled(newStatus.Used, true)

unreserveAffectedPods := func() (schedulev1alpha1.ResourceAmount, sets.String) {
// Once status is updated, affected pods is safe to un-reserve from reserved resoruce amount cache
// We make sure to un-reserve terminated pods too here because it misses to unreserve terminated pods
// when reconcile is rate-limitted
unreservedPods := []string{}
for _, p := range append(affectedNonTerminatedPods, affectedTerminatedPods...) {
unreserved := c.UnReserveOnThrottle(p, thr)
if unreserved {
unreservedPods = append(unreservedPods, p.Namespace+"/"+p.Name)
}
}
if len(unreservedPods) > 0 {
klog.V(2).InfoS(
"Pods are un-reserved for Throttle",
"Throttle", thr.Namespace+"/"+thr.Name,
"#Pods", len(unreservedPods),
"Pods", strings.Join(unreservedPods, ","),
)
}
return c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})
}

if !apiequality.Semantic.DeepEqual(thr.Status, *newStatus) {
klog.V(2).InfoS("Updating status", "Throttle", thr.Namespace+"/"+thr.Name)
thr.Status = *newStatus
c.metricsRecorder.recordThrottleMetrics(thr)

klog.V(2).InfoS("Updating status",
"Throttle", thr.Namespace+"/"+thr.Name,
"Used", thr.Status.Used,
"Throttled", thr.Status.Throttled,
"Threshold", thr.Status.CalculatedThreshold.Threshold,
"CalculatedAt", thr.Status.CalculatedThreshold.CalculatedAt,
"Message", strings.Join(thr.Status.CalculatedThreshold.Messages, ","),
)

if thr, err = c.scheduleClientset.ScheduleV1alpha1().Throttles(namespace).UpdateStatus(ctx, thr, metav1.UpdateOptions{}); err != nil {
utilruntime.HandleError(errors.Wrapf(err, "failed to update Throttle '%s' status", key))
return err
}

reservedAmt, reservedPodNNs := unreserveAffectedPods()
klog.V(2).InfoS("Status updated successfully",
"Throttle", thr.Namespace+"/"+thr.Name,
"Used", thr.Status.Used,
"Throttled", thr.Status.Throttled,
"CalculatedAt", thr.Status.CalculatedThreshold.CalculatedAt,
"Threshold", thr.Status.CalculatedThreshold.Threshold,
"Message", strings.Join(thr.Status.CalculatedThreshold.Messages, ","),
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(reservedPodNNs.List(), ","),
)
} else {
c.metricsRecorder.recordThrottleMetrics(thr)
klog.V(2).InfoS("No need to update status", "Throttle", thr.Namespace+"/"+thr.Name)
}

// Once status is updated, affected pods is safe to un-reserve from reserved resoruce amount cache
// We make sure to un-reserve terminated pods too here because it misses to unreserve terminated pods
// when reconcile is rate-limitted
unreservedPods := []string{}
for _, p := range append(affectedNonTerminatedPods, affectedTerminatedPods...) {
unreserved := c.UnReserveOnThrottle(p, thr)
if unreserved {
unreservedPods = append(unreservedPods, p.Namespace+"/"+p.Name)
}
}
if len(unreservedPods) > 0 {
klog.V(2).InfoS(
"Pods are un-reserved for Throttle",
reservedAmt, reservedPodNNs := unreserveAffectedPods()
klog.V(2).InfoS("No need to update status",
"Throttle", thr.Namespace+"/"+thr.Name,
"#Pods", len(unreservedPods),
"Pods", strings.Join(unreservedPods, ","),
"Threshold", thr.Status.CalculatedThreshold.Threshold,
"CalculatedAt", thr.Status.CalculatedThreshold.CalculatedAt,
"Message", strings.Join(thr.Status.CalculatedThreshold.Messages, ","),
"Used", thr.Status.Used,
"Throttled", thr.Status.Throttled,
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(reservedPodNNs.List(), ","),
)
}

Expand Down Expand Up @@ -406,7 +440,6 @@ func (c *ThrottleController) setupEventHandler() {
}
klog.V(4).InfoS("Update event", "Pod", newPod.Namespace+"/"+newPod.Name)

throttleNames := sets.NewString()
throttlesForOld, err := c.affectedThrottles(oldPod)
if err != nil {
utilruntime.HandleError(errors.Wrapf(err, "fail to get affected throttles for pod '%s'", oldPod.Namespace+"/"+oldPod.Name))
Expand Down Expand Up @@ -452,9 +485,9 @@ func (c *ThrottleController) setupEventHandler() {
}

// reconcile
klog.V(4).InfoS("Reconciling Throttles", "Pod", newPod.Namespace+"/"+newPod.Name, "Throttles", strings.Join(throttleNames.List(), ","))
for _, key := range throttleNames.List() {
c.enqueue(cache.ExplicitKey(key))
for nn := range throttleNNs {
klog.V(4).InfoS("Enqueue throttle for pod update", "Throttle", nn.String(), "Pod", newPod.Namespace+"/"+newPod.Name)
c.enqueue(cache.ExplicitKey(nn.String()))
}
},
DeleteFunc: func(obj interface{}) {
Expand Down
Loading

0 comments on commit 9513fa8

Please sign in to comment.