Skip to content

Commit

Permalink
fix/concurrency-issue (#139)
Browse files Browse the repository at this point in the history
* use k8s lease to solve concurrent assignment issue

* increase lease duration

* Refactor lease duration from int32 to int and update lease acquisition logic
  • Loading branch information
alexei-led authored Mar 29, 2024
1 parent 376454b commit 1728875
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 13 deletions.
36 changes: 31 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/doitintl/kubeip/internal/address"
"github.com/doitintl/kubeip/internal/config"
"github.com/doitintl/kubeip/internal/lease"
nd "github.com/doitintl/kubeip/internal/node"
"github.com/doitintl/kubeip/internal/types"
"github.com/pkg/errors"
Expand All @@ -23,8 +24,10 @@ import (
type contextKey string

const (
developModeKey contextKey = "develop-mode"
unassignTimeout = 5 * time.Minute
developModeKey contextKey = "develop-mode"
unassignTimeout = 5 * time.Minute
kubeipLockName = "kubeip-lock"
defaultLeaseDuration = 5
)

var (
Expand Down Expand Up @@ -79,14 +82,17 @@ func prepareLogger(level string, json bool) *logrus.Entry {
return log
}

func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

// ticker for retry interval
ticker := time.NewTicker(cfg.RetryInterval)
defer ticker.Stop()

// create new cluster wide lock
lock := lease.NewKubeLeaseLock(client, kubeipLockName, "default", node.Instance, cfg.LeaseDuration)

for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
log.WithFields(logrus.Fields{
"node": node.Name,
Expand All @@ -95,7 +101,20 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Debug("assigning static public IP address to node")
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
err := func(ctx context.Context) error {
if err := lock.Lock(ctx); err != nil {
return errors.Wrap(err, "failed to acquire lock")
}
log.Debug("lock acquired")
defer func() {
lock.Unlock(ctx) //nolint:errcheck
log.Debug("lock released")
}()
if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
return err //nolint:wrapcheck
}
return nil
}(c)
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
return nil
}
Expand Down Expand Up @@ -152,7 +171,7 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
errorCh := make(chan error, 1) // buffered channel to avoid goroutine leak
go func() {
defer close(errorCh) // close the channel when the goroutine exits to avoid goroutine leak
e := assignAddress(ctx, log, assigner, n, cfg)
e := assignAddress(ctx, log, clientset, assigner, n, cfg)
if e != nil {
errorCh <- e
}
Expand Down Expand Up @@ -267,6 +286,13 @@ func main() {
EnvVars: []string{"RETRY_ATTEMPTS"},
Category: "Configuration",
},
&cli.IntFlag{
Name: "lease-duration",
Usage: "duration of the kubernetes lease",
Value: defaultLeaseDuration,
EnvVars: []string{"LEASE_DURATION"},
Category: "Configuration",
},
&cli.BoolFlag{
Name: "release-on-exit",
Usage: "release the static public IP address on exit",
Expand Down
9 changes: 8 additions & 1 deletion cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
mocks "github.com/doitintl/kubeip/mocks/address"
"github.com/pkg/errors"
tmock "github.com/stretchr/testify/mock"
"k8s.io/client-go/kubernetes/fake"
)

func Test_assignAddress(t *testing.T) {
Expand Down Expand Up @@ -45,6 +46,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: time.Millisecond,
LeaseDuration: 1,
},
},
},
Expand All @@ -70,6 +72,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: time.Millisecond,
LeaseDuration: 1,
},
},
},
Expand All @@ -93,6 +96,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: time.Millisecond,
LeaseDuration: 1,
},
},
wantErr: true,
Expand Down Expand Up @@ -125,6 +129,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 10,
RetryInterval: 5 * time.Millisecond,
LeaseDuration: 1,
},
},
wantErr: true,
Expand Down Expand Up @@ -152,6 +157,7 @@ func Test_assignAddress(t *testing.T) {
OrderBy: "test-order-by",
RetryAttempts: 3,
RetryInterval: 15 * time.Millisecond,
LeaseDuration: 1,
},
},
wantErr: true,
Expand All @@ -161,7 +167,8 @@ func Test_assignAddress(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
log := prepareLogger("debug", false)
assigner := tt.args.assignerFn(t)
if err := assignAddress(tt.args.c, log, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr {
client := fake.NewSimpleClientset()
if err := assignAddress(tt.args.c, log, client, assigner, tt.args.node, tt.args.cfg); (err != nil) != tt.wantErr {
t.Errorf("assignAddress() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
5 changes: 5 additions & 0 deletions examples/aws/eks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" {
resources = ["nodes"]
verbs = ["get"]
}
rule {
api_groups = ["coordination.k8s.io"]
resources = ["leases"]
verbs = ["create", "delete", "get", "list", "update"]
}
depends_on = [
kubernetes_service_account.kubeip_service_account,
module.eks
Expand Down
9 changes: 9 additions & 0 deletions examples/gcp/gke.tf
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ resource "kubernetes_cluster_role" "kubeip_cluster_role" {
resources = ["nodes"]
verbs = ["get"]
}
rule {
api_groups = ["coordination.k8s.io"]
resources = ["leases"]
verbs = ["create", "delete", "get", "list", "update"]
}
depends_on = [
kubernetes_service_account.kubeip_service_account,
google_container_cluster.kubeip_cluster
Expand Down Expand Up @@ -303,6 +308,10 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
name = "LOG_JSON"
value = "true"
}
env {
name = "LEASE_DURATION"
value = "20"
}
resources {
requests = {
cpu = "100m"
Expand Down
7 changes: 0 additions & 7 deletions internal/address/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package address
import (
"context"
"fmt"
"math/rand"
"strings"
"time"

Expand All @@ -27,7 +26,6 @@ const (
accessConfigKind = "compute#accessConfig"
defaultPrefixLength = 96
maxRetries = 10 // number of retries for assigning ephemeral public IP address
maxWaitListTime = 10 // max time to wait before listing addresses
)

var (
Expand Down Expand Up @@ -223,11 +221,6 @@ func (a *gcpAssigner) Assign(ctx context.Context, instanceID, zone string, filte
return errors.Wrapf(err, "check if static public IP is already assigned to instance %s", instanceID)
}

// add random sleep to reduce the chance of multiple kubeip instances getting the same address list
waitTime := time.Duration(rand.Intn(maxWaitListTime)) * time.Second //nolint:gosec
a.logger.WithField("waitTime", waitTime).Debug("waiting before listing addresses")
time.Sleep(waitTime)

// get available reserved public IP addresses
addresses, err := a.listAddresses(filter, orderBy, reservedStatus)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Config struct {
RetryAttempts int `json:"retry-attempts"`
// ReleaseOnExit releases the IP address on exit
ReleaseOnExit bool `json:"release-on-exit"`
// LeaseDuration is the duration of the kubernetes lease
LeaseDuration int `json:"lease-duration"`
}

func NewConfig(c *cli.Context) *Config {
Expand All @@ -44,5 +46,6 @@ func NewConfig(c *cli.Context) *Config {
cfg.Region = c.String("region")
cfg.IPv6 = c.Bool("ipv6")
cfg.ReleaseOnExit = c.Bool("release-on-exit")
cfg.LeaseDuration = c.Int("lease-duration")
return &cfg
}
135 changes: 135 additions & 0 deletions internal/lease/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package lease

import (
"context"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
)

type KubeLock interface {
Lock(ctx context.Context) error
Unlock(ctx context.Context) error
}

type kubeLeaseLock struct {
client kubernetes.Interface
leaseName string
namespace string
holderIdentity string
leaseDuration int // seconds
cancelFunc context.CancelFunc
}

func NewKubeLeaseLock(client kubernetes.Interface, leaseName, namespace, holderIdentity string, leaseDurationSeconds int) KubeLock {
return &kubeLeaseLock{
client: client,
leaseName: leaseName,
namespace: namespace,
holderIdentity: holderIdentity,
leaseDuration: leaseDurationSeconds,
}
}

func (k *kubeLeaseLock) Lock(ctx context.Context) error {
backoff := wait.Backoff{
Duration: time.Second, // start with 1 second
Factor: 1.5, //nolint:gomnd // multiply by 1.5 on each retry
Jitter: 0.5, //nolint:gomnd // add 50% jitter to wait time on each retry
Steps: 100, //nolint:gomnd // retry 100 times
Cap: time.Hour, // but never wait more than 1 hour
}

return wait.ExponentialBackoff(backoff, func() (bool, error) { //nolint:wrapcheck
timestamp := metav1.MicroTime{Time: time.Now()}
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: k.leaseName,
Namespace: k.namespace,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &k.holderIdentity,
LeaseDurationSeconds: ptr.To(int32(k.leaseDuration)),
AcquireTime: &timestamp,
RenewTime: &timestamp,
},
}

_, err := k.client.CoordinationV1().Leases(k.namespace).Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
if errors.IsAlreadyExists(err) {
// If the lease already exists, check if it's held by another holder
existingLease, getErr := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
if getErr != nil {
return false, getErr //nolint:wrapcheck
}
// check if the lease is expired
if existingLease.Spec.RenewTime != nil && time.Since(existingLease.Spec.RenewTime.Time) > time.Duration(k.leaseDuration)*time.Second {
// If the lease is expired, delete it and retry
delErr := k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{})
if delErr != nil {
return false, delErr //nolint:wrapcheck
}
return false, nil
}
// check if the lease is held by another holder
if existingLease.Spec.HolderIdentity != nil && *existingLease.Spec.HolderIdentity != k.holderIdentity {
// If the lease is held by another holder, return false to retry
return false, nil
}
return true, nil
}
return false, err //nolint:wrapcheck
}

// Create a child context with cancellation
ctx, k.cancelFunc = context.WithCancel(ctx)
go k.renewLeasePeriodically(ctx)

return true, nil
})
}

func (k *kubeLeaseLock) renewLeasePeriodically(ctx context.Context) {
// let's renew the lease every 1/2 of the lease duration; use milliseconds for ticker
ticker := time.NewTicker(time.Duration(k.leaseDuration*500) * time.Millisecond) //nolint:gomnd
defer ticker.Stop()

for {
select {
case <-ticker.C:
lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
if err != nil || lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity {
return
}

lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
k.client.CoordinationV1().Leases(k.namespace).Update(ctx, lease, metav1.UpdateOptions{}) //nolint:errcheck
case <-ctx.Done():
// Exit the goroutine when the context is cancelled
return
}
}
}

func (k *kubeLeaseLock) Unlock(ctx context.Context) error {
// Call the cancel function to stop the lease renewal process
if k.cancelFunc != nil {
k.cancelFunc()
}
lease, err := k.client.CoordinationV1().Leases(k.namespace).Get(ctx, k.leaseName, metav1.GetOptions{})
if err != nil {
return err //nolint:wrapcheck
}

if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != k.holderIdentity {
return nil
}

return k.client.CoordinationV1().Leases(k.namespace).Delete(ctx, k.leaseName, metav1.DeleteOptions{}) //nolint:wrapcheck
}
Loading

0 comments on commit 1728875

Please sign in to comment.