Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add leader election to allow for multiple replicas. #27

Merged
merged 6 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -202,6 +203,7 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
107 changes: 100 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/StatCan/ingress-istio-controller/pkg/controller"
"github.com/StatCan/ingress-istio-controller/pkg/signals"
istio "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
"k8s.io/klog"
)

Expand All @@ -23,14 +33,15 @@ var (
scopedGateways bool
ingressClass string
defaultWeight int
lockName string
lockNamespace string
lockIdentity string
)

func main() {
klog.InitFlags(nil)
flag.Parse()

stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("error building kubeconfig: %v", err)
Expand Down Expand Up @@ -63,12 +74,71 @@ func main() {
istioInformerFactory.Networking().V1beta1().VirtualServices(),
istioInformerFactory.Networking().V1beta1().Gateways())

kubeInformerFactory.Start(stopCh)
istioInformerFactory.Start(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-wait
klog.Info("received signal, shutting down")
cancel()
}()

kubeInformerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())

runWithLeaderElection(ctlr, cfg, kubeclient, ctx)
}

if err = ctlr.Run(2, stopCh); err != nil {
klog.Fatalf("error running controller: %v", err)
func runWithLeaderElection(ctlr *controller.Controller, cfg *rest.Config, kubeclient *kubernetes.Clientset, ctx context.Context) {

// Acquire a lock
// Identity used to distinguish between multiple cloud controller manager instances
klog.Infof("leader identity id: %s", lockIdentity)

var lock resourcelock.Interface

lock = &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockName,
Namespace: lockNamespace,
},
Client: kubeclient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: lockIdentity,
},
}

cfg.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
if err := ctlr.Run(2, ctx); err != nil {
if err != context.Canceled {
klog.Errorf("error running controller: %v", err)
}
}
},
OnStoppedLeading: func() {
klog.Info("stopped leading")
},
OnNewLeader: func(identity string) {
if identity == lockIdentity {
// We just acquired the lock
return
}

klog.Infof("new leader elected: %v", identity)
},
},
})
}

func init() {
Expand All @@ -79,4 +149,27 @@ func init() {
flag.BoolVar(&scopedGateways, "scoped-gateways", false, "Gateways are scoped to the same namespace they exist within. This will limit the Service search for Load Balancer status. In istiod, this is controlled via the PILOT_SCOPE_GATEWAY_TO_NAMESPACE environment variable.")
flag.StringVar(&ingressClass, "ingress-class", "", "The ingress class annotation to monitor (empty string to skip checking annotation)")
flag.IntVar(&defaultWeight, "virtual-service-weight", 100, "The weight of the Virtual Service destination.")
flag.StringVar(&lockName, "lock-name", getEnvVarOrDefault("LOCK_NAME", "ingress-istio-controller"), "The name of the leader lock.")
flag.StringVar(&lockNamespace, "lock-namespace", getEnvVarOrDefault("LOCK_NAMESPACE", "ingress-istio-controller-system"), "The namespace where the leader lock resides.")
flag.StringVar(&lockIdentity, "lock-identity", getEnvVarOrDefault("LOCK_IDENTITY", createIdentity()), "The unique identity of the replica. (Pod name is best)")
}

// Returns an environment variables value if set, otherwise returns dflt.
func getEnvVarOrDefault(envVar, dflt string) string {
val, ok := os.LookupEnv(envVar)
if ok {
return val
} else {
return dflt
}
}

// Creates a unique identity.
func createIdentity() string {
hostname, err := os.Hostname()
if err != nil {
klog.Fatal(err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
return hostname + "_" + string(uuid.NewUUID())
}
9 changes: 5 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -131,24 +132,24 @@ func NewController(
}

// Run runs the controller.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(threadiness int, ctx context.Context) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

klog.Info("starting controller")

klog.Info("waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
if ok := cache.WaitForCacheSync(ctx.Done(), c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.Info("starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
}

klog.Info("started workers")
<-stopCh
<-ctx.Done()
klog.Info("shutting down workers")

return nil
Expand Down
43 changes: 0 additions & 43 deletions pkg/signals/signal.go

This file was deleted.

23 changes: 0 additions & 23 deletions pkg/signals/signal_posix.go

This file was deleted.

20 changes: 0 additions & 20 deletions pkg/signals/signal_windows.go

This file was deleted.

Loading