From a1ee3c536cf92ecf134581d0ce138c1058a3225c Mon Sep 17 00:00:00 2001 From: Bum-Seok Hwang Date: Wed, 29 Sep 2021 06:17:00 +0000 Subject: [PATCH 1/3] feature: Send logs to Slack using a Slack webhook #13 --- controller/controller.go | 12 +-- controller/onUpdateImageHash.go | 11 ++- controller/sync.go | 16 ++-- imageNotifier/imageNotifier.go | 23 ++++-- interfaces/interfaces.go | 6 ++ logger/logger.go | 56 +++++++++++++ logger/slack.go | 135 ++++++++++++++++++++++++++++++++ main.go | 80 ++++++++++++++++--- readme.md | 20 ++++- remoteRegistry/docker/docker.go | 11 ++- remoteRegistry/docker/ecr.go | 22 +++--- util/kubernetes.go | 54 +++++++------ watcher/watcher.go | 17 ++-- 13 files changed, 388 insertions(+), 75 deletions(-) create mode 100644 logger/logger.go create mode 100644 logger/slack.go diff --git a/controller/controller.go b/controller/controller.go index 725e2c9..6be84a7 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -6,7 +6,6 @@ import ( "time" "github.com/pubg/kube-image-deployer/interfaces" - "k8s.io/klog/v2" pkgRuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" @@ -24,6 +23,7 @@ type Controller struct { informer cache.Controller imageNotifier interfaces.IImageNotifier applyStrategicMergePatch ApplyStrategicMergePatch + logger interfaces.ILogger syncedImages map[Image]bool syncedImagesMutex sync.RWMutex @@ -45,6 +45,7 @@ type ControllerOpt struct { ImageNotifier interfaces.IImageNotifier ApplyStrategicMergePatch ApplyStrategicMergePatch ControllerWatchKey string + Logger interfaces.ILogger } // NewController creates a new Controller. @@ -58,6 +59,7 @@ func NewController(opt ControllerOpt) *Controller { imageNotifier: opt.ImageNotifier, applyStrategicMergePatch: opt.ApplyStrategicMergePatch, watchKey: opt.ControllerWatchKey, + logger: opt.Logger, syncedImages: make(map[Image]bool), syncedImagesMutex: sync.RWMutex{}, imageUpdateNotifyList: make([]imageUpdateNotify, 0), @@ -95,7 +97,7 @@ func (c *Controller) handleErr(err error, key interface{}) { // This controller retries 5 times if something goes wrong. After that, it stops trying. if c.queue.NumRequeues(key) < 5 { - klog.V(2).Infof("[%s] Error syncing %v: %v", c.resource, key, err) + c.logger.Infof("[%s] Error syncing %v: %v", c.resource, key, err) // Re-enqueue the key rate limited. Based on the rate limiter on the // queue and the re-enqueue history, the key will be processed later again. @@ -106,7 +108,7 @@ func (c *Controller) handleErr(err error, key interface{}) { c.queue.Forget(key) // Report to an external entity that, even after several retries, we could not successfully process this key runtime.HandleError(err) - klog.V(2).Infof("[%s] Dropping out of the queue: key:%q, err:%v", c.resource, key, err) + c.logger.Infof("[%s] Dropping out of the queue: key:%q, err:%v", c.resource, key, err) } // Run begins watching and syncing. @@ -115,7 +117,7 @@ func (c *Controller) Run(workers int, stopCh chan struct{}) { // Let the workers stop when we are done defer c.queue.ShutDown() - klog.V(2).Infof("[%s] Starting controller", c.resource) + c.logger.Infof("[%s] Starting controller", c.resource) go c.informer.Run(stopCh) @@ -132,7 +134,7 @@ func (c *Controller) Run(workers int, stopCh chan struct{}) { go wait.Until(c.patchUpdateNotifyList, time.Second, stopCh) <-stopCh - klog.V(2).Infof("[%s] Stopping controller", c.resource) + c.logger.Infof("[%s] Stopping controller", c.resource) } func (c *Controller) GetReresourceName() string { diff --git a/controller/onUpdateImageHash.go b/controller/onUpdateImageHash.go index 83e6cf7..78f3387 100644 --- a/controller/onUpdateImageHash.go +++ b/controller/onUpdateImageHash.go @@ -6,7 +6,6 @@ import ( "github.com/pubg/kube-image-deployer/util" v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" ) type patch struct { @@ -44,7 +43,7 @@ func (c *Controller) getPatchMapByUpdates(updates []imageUpdateNotify) map[strin patchMap := make(map[string][]patch) for _, update := range updates { - klog.V(2).Infof("[%s] OnUpdateImageString %s, %s, %s", c.resource, update.url, update.tag, update.imageString) + c.logger.Infof("[%s] OnUpdateImageString %s, %s, %s", c.resource, update.url, update.tag, update.imageString) c.syncedImagesMutex.RLock() defer c.syncedImagesMutex.RUnlock() @@ -90,7 +89,7 @@ func (c *Controller) applyPatchList(key string, patchList []patch) error { } for _, patch := range patchList { - klog.V(2).Infof("[%s] OnUpdateImageString patch %+v", c.resource, patch) + c.logger.Infof("[%s] OnUpdateImageString patch %+v", c.resource, patch) if !exists { // 삭제된 리소스인 경우 무시 return fmt.Errorf("[%s] OnUpdateImageString patch not exists key=%s", c.resource, key) @@ -124,7 +123,7 @@ func (c *Controller) applyPatchList(key string, patchList []patch) error { } if len(Containers) == 0 && len(InitContainers) == 0 { // 변경된 이미지가 없는 경우 무시 - klog.V(2).Infof("[%s] OnUpdateImageString patch containers not changed %+v", c.resource, patchList) + c.logger.Infof("[%s] OnUpdateImageString patch containers not changed %+v", c.resource, patchList) return nil } @@ -140,7 +139,7 @@ func (c *Controller) applyPatchList(key string, patchList []patch) error { return fmt.Errorf("[%s] OnUpdateImageString patch apply error namespace=%s, name=%s, patchString=%s, err=%s", c.resource, namespace, name, patchString, err) } - klog.Warningf("[%s] OnUpdateImageString patch apply success namespace=%s, name=%s, patchString=%s", c.resource, namespace, name, patchString) + c.logger.Warningf("[%s] OnUpdateImageString patch apply success namespace=%s, name=%s, patchString=%s", c.resource, namespace, name, patchString) return nil } @@ -176,7 +175,7 @@ func (c *Controller) patchUpdateNotifyList() { for key, patchList := range patchMap { if err := c.applyPatchList(key, patchList); err != nil { - klog.Error(err) + c.logger.Errorf(err.Error()) // just logging } } diff --git a/controller/sync.go b/controller/sync.go index e738950..ebe98a3 100644 --- a/controller/sync.go +++ b/controller/sync.go @@ -4,7 +4,6 @@ import ( "strings" "github.com/pubg/kube-image-deployer/util" - "k8s.io/klog" ) type Image struct { @@ -18,7 +17,7 @@ func (c *Controller) syncKey(key string) error { obj, exists, err := c.indexer.GetByKey(key) if err != nil { - klog.Errorf("[%s] Fetching object with key %s from store failed with %v", c.resource, key, err) + c.logger.Errorf("[%s] Fetching object with key %s from store failed with %v", c.resource, key, err) return err } @@ -50,7 +49,12 @@ func (c *Controller) syncKey(key string) error { func (c *Controller) getImagesFromCurrentWorkload(obj interface{}, key string) (images map[Image]bool) { images = make(map[Image]bool) - annotations := util.GetAnnotations(obj) + annotations, err := util.GetAnnotations(obj) + + if err != nil { + c.logger.Errorf("[%s] GetAnnotations error : %v", c.resource, err) + return + } for annotationKey, annotationValue := range annotations { @@ -79,7 +83,7 @@ func (c *Controller) getImagesFromCurrentWorkload(obj interface{}, key string) ( } if len(images) == 0 { - klog.Warningf("[%s] getImagesFromCurrentWorkload undefined or invalid annotation key=%s\n", c.resource, key) + c.logger.Warningf("[%s] getImagesFromCurrentWorkload undefined or invalid annotation key=%s\n", c.resource, key) } return @@ -112,7 +116,7 @@ func (c *Controller) registImage(image Image) { c.syncedImagesMutex.Unlock() return } else { - klog.V(2).Infof("[%s] registImage image=%+v\n", c.resource, image) + c.logger.Infof("[%s] registImage image=%+v\n", c.resource, image) c.syncedImages[image] = true c.syncedImagesMutex.Unlock() } @@ -127,7 +131,7 @@ func (c *Controller) unregistImage(image Image) { return } - klog.V(2).Infof("[%s] unregistImage image=%+v\n", c.resource, image) + c.logger.Infof("[%s] unregistImage image=%+v\n", c.resource, image) c.syncedImagesMutex.Lock() delete(c.syncedImages, image) diff --git a/imageNotifier/imageNotifier.go b/imageNotifier/imageNotifier.go index e852731..05717b2 100644 --- a/imageNotifier/imageNotifier.go +++ b/imageNotifier/imageNotifier.go @@ -5,8 +5,8 @@ import ( "time" "github.com/pubg/kube-image-deployer/interfaces" + l "github.com/pubg/kube-image-deployer/logger" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog/v2" ) type ImageNotifierId struct { @@ -22,19 +22,26 @@ type ImageNotifier struct { stopCh chan struct{} remoteRegistry interfaces.IRemoteRegistry + logger interfaces.ILogger } func NewImageNotifier(stopCh chan struct{}, remoteRegistry interfaces.IRemoteRegistry, imageCheckIntervalSec uint) *ImageNotifier { - imageNotifier := &ImageNotifier{ + r := &ImageNotifier{ list: make(map[ImageNotifierId]*ImageUpdateNotify), mutex: sync.RWMutex{}, stopCh: stopCh, remoteRegistry: remoteRegistry, + logger: l.NewLogger(), } - go wait.Until(imageNotifier.checkAllImageNotifyList, time.Second*time.Duration(imageCheckIntervalSec), stopCh) + go wait.Until(r.checkAllImageNotifyList, time.Second*time.Duration(imageCheckIntervalSec), stopCh) - return imageNotifier + return r +} + +func (r *ImageNotifier) WithLogger(logger interfaces.ILogger) *ImageNotifier { + r.logger = logger + return r } // RegistImage regist to imageNotifier @@ -53,7 +60,7 @@ func (r *ImageNotifier) RegistImage(controller interfaces.IController, url, tag, } // 신규 - klog.Infof("[%s] RegistImage %s:%s\n", controller.GetReresourceName(), url, tag) + r.logger.Infof("[%s] RegistImage %s:%s\n", controller.GetReresourceName(), url, tag) imageUpdateNotify := NewImageUpdateNotify(url, tag, "", controller) @@ -71,7 +78,7 @@ func (r *ImageNotifier) UnregistImage(controller interfaces.IController, url, ta existsImageUpdateNotify, ok := r.list[notifyId] if !ok { // ?? - klog.Errorf("[%s] UnregistImage existsImageUpdateNotify notfound url=%s, tag=%s", controller.GetReresourceName(), url, tag) + r.logger.Errorf("[%s] UnregistImage existsImageUpdateNotify notfound url=%s, tag=%s", controller.GetReresourceName(), url, tag) return } @@ -79,7 +86,7 @@ func (r *ImageNotifier) UnregistImage(controller interfaces.IController, url, ta if referenceCount <= 0 { // 이미지를 참조하는 대상이 더이상 없으면 삭제 delete(r.list, notifyId) - klog.Infof("[%s] UnregistImage %s:%s\n", controller.GetReresourceName(), url, tag) + r.logger.Infof("[%s] UnregistImage %s:%s\n", controller.GetReresourceName(), url, tag) } } @@ -87,7 +94,7 @@ func (r *ImageNotifier) UnregistImage(controller interfaces.IController, url, ta func (r *ImageNotifier) checkImageUpdate(image checkImage) { imageString, err := r.remoteRegistry.GetImageString(image.url, image.tag, "") if err != nil { - klog.Errorf("[%s] checkImageUpdate %s:%s err=%s\n", image.controller.GetReresourceName(), image.url, image.tag, err) + r.logger.Errorf("[%s] checkImageUpdate %s:%s err=%s\n", image.controller.GetReresourceName(), image.url, image.tag, err) return } diff --git a/interfaces/interfaces.go b/interfaces/interfaces.go index b06db3f..aa71801 100644 --- a/interfaces/interfaces.go +++ b/interfaces/interfaces.go @@ -14,3 +14,9 @@ type IImageNotifier interface { type IRemoteRegistry interface { GetImageString(url, tag, platformString string) (string, error) } + +type ILogger interface { + Infof(format string, args ...interface{}) + Errorf(format string, args ...interface{}) + Warningf(format string, args ...interface{}) +} diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..a976b79 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,56 @@ +package logger + +import ( + "fmt" + + "k8s.io/klog" +) + +type Logger struct { + slack *Slack + depth int +} + +func NewLogger() *Logger { + return &Logger{ + depth: 1, + slack: nil, // default=disabled + } +} + +func (l *Logger) SetDepth(depth int) *Logger { + l.depth = depth + return l +} + +func (l *Logger) WithSlack(stopCh chan struct{}, webhookUrl, msgPrefix string) *Logger { + l.slack = NewSlack(stopCh, webhookUrl, msgPrefix) + return l +} + +func (l *Logger) Infof(format string, args ...interface{}) { + if !klog.V(2) { + return + } + msg := fmt.Sprintf(format, args...) + klog.InfoDepth(l.depth, msg) + if l.slack != nil { + l.slack.InfoDepth(l.depth+1, msg) + } +} + +func (l *Logger) Errorf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + klog.ErrorDepth(l.depth, msg) + if l.slack != nil { + l.slack.ErrorDepth(l.depth+1, msg) + } +} + +func (l *Logger) Warningf(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + klog.WarningDepth(l.depth, msg) + if l.slack != nil { + l.slack.WarningDepth(l.depth+1, msg) + } +} diff --git a/logger/slack.go b/logger/slack.go new file mode 100644 index 0000000..38c23cc --- /dev/null +++ b/logger/slack.go @@ -0,0 +1,135 @@ +package logger + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "runtime" + "sync" + "time" + + "k8s.io/klog/v2" +) + +type Slack struct { + webhookUrl string + msgPrefix string + httpClient *http.Client + pool []message + mutex sync.RWMutex +} + +type SlackRequestBody struct { + Text string `json:"text"` +} + +type message struct { + level string + msg string + time time.Time + file string + line int +} + +func NewSlack(stopCh chan struct{}, webhookUrl, msgPrefix string) *Slack { + s := &Slack{ + webhookUrl: webhookUrl, + msgPrefix: msgPrefix, + httpClient: &http.Client{Timeout: 10 * time.Second}, + pool: make([]message, 0), + mutex: sync.RWMutex{}, + } + + s.start(stopCh) + + return s +} + +func (s *Slack) InfoDepth(depth int, msg string) { + s.pushMessage(depth+1, "info", msg) + +} + +func (s *Slack) WarningDepth(depth int, msg string) { + s.pushMessage(depth+1, "warn", msg) +} + +func (s *Slack) ErrorDepth(depth int, msg string) { + s.pushMessage(depth+1, "error", msg) +} + +func (s *Slack) pushMessage(depth int, level, msg string) { + _, file, line, ok := runtime.Caller(depth) + + if !ok { + file = "???" + line = 0 + } + + s.mutex.Lock() + s.pool = append(s.pool, message{level: level, msg: msg, time: time.Now(), file: file, line: line}) + s.mutex.Unlock() +} + +func (s *Slack) start(stopCh chan struct{}) { + go func() { + for { + select { + case <-stopCh: // exit goroutine + return + case <-time.After(time.Second): + s.mutex.RLock() + l := len(s.pool) + s.mutex.RUnlock() + + if l == 0 { + break + } + + <-time.After(time.Second) // wait for a second before sending the next message + + var text string + + s.mutex.Lock() // lock the pool + pool := s.pool // copy the pool + s.pool = make([]message, 0) // clear the pool + s.mutex.Unlock() // unlock the pool + + for _, msg := range pool { + text += fmt.Sprintf("%s[%s][%s][%s:%d] %s\n", s.msgPrefix, msg.time.Format(time.RFC3339), msg.level, msg.file, msg.line, msg.msg) + } + + if err := s.send("```" + text + "```"); err != nil { + klog.Errorf("error sending to slack: %s\n%s", err, text) + } + } + } + }() +} + +func (s *Slack) send(msg string) error { + + slackBody, _ := json.Marshal(SlackRequestBody{Text: msg}) + req, err := http.NewRequest(http.MethodPost, s.webhookUrl, bytes.NewBuffer(slackBody)) + + if err != nil { + return err + } + + req.Header.Add("Content-Type", "application/json") + + resp, err := s.httpClient.Do(req) + if err != nil { + return err + } + + buf := new(bytes.Buffer) + buf.ReadFrom(resp.Body) + + if buf.String() != "ok" { + return fmt.Errorf("non-ok response returned from Slack") + } + + return nil +} diff --git a/main.go b/main.go index 3bd1c94..aea4dd7 100644 --- a/main.go +++ b/main.go @@ -5,10 +5,13 @@ import ( "flag" "os" "os/signal" + "strconv" "syscall" "time" + "github.com/joho/godotenv" "github.com/pubg/kube-image-deployer/imageNotifier" + "github.com/pubg/kube-image-deployer/logger" "github.com/pubg/kube-image-deployer/remoteRegistry/docker" "github.com/pubg/kube-image-deployer/watcher" appV1 "k8s.io/api/apps/v1" @@ -33,13 +36,64 @@ var ( controllerWatchKey = *flag.String("controller-watch-key", "kube-image-deployer", "controller watch key") controllerWatchNamespace = *flag.String("controller-watch-namespace", "", "controller watch namespace. If empty, watch all namespaces") imageDefaultPlatform = *flag.String("image-default-platform", "linux/amd64", "default platform for docker images") + slackWebhook = *flag.String("slack-webhook", "", "slack webhook url. If empty, notifications are disabled") + slackMsgPrefix = *flag.String("slack-msg-prefix", "["+getHostname()+"]", "slack message prefix. default=[hostname]") ) +func getHostname() string { + if s, err := os.Hostname(); err == nil { + return s + } + return "unknown" +} + func init() { klog.InitFlags(nil) flag.Parse() - klog.Infof("Starting pid: %d", os.Getpid()) + godotenv.Load(".env") + + if os.Getenv("KUBECONFIG_PATH") != "" { + kubeconfig = os.Getenv("KUBECONFIG_PATH") + } + if os.Getenv("OFF_DEPLOYMENTS") != "" { + offDeployments = true + } + if os.Getenv("OFF_STATEFULSETS") != "" { + offStatefulsets = true + } + if os.Getenv("OFF_DAEMONSETS") != "" { + offDaemonsets = true + } + if os.Getenv("OFF_CRONJOBS") != "" { + offCronjobs = true + } + if os.Getenv("IMAGE_HASH_CACHE_TTL_SEC") != "" { + if v, err := strconv.ParseUint(os.Getenv("IMAGE_HASH_CACHE_TTL_SEC"), 10, 32); err == nil { + imageStringCacheTTLSec = uint(v) + } + } + if os.Getenv("IMAGE_CHECK_INTERVAL_SEC") != "" { + if v, err := strconv.ParseUint(os.Getenv("IMAGE_CHECK_INTERVAL_SEC"), 10, 32); err == nil { + imageCheckIntervalSec = uint(v) + } + } + if os.Getenv("CONTROLLER_WATCH_KEY") != "" { + controllerWatchKey = os.Getenv("CONTROLLER_WATCH_KEY") + } + if os.Getenv("CONTROLLER_WATCH_NAMESPACE") != "" { + controllerWatchNamespace = os.Getenv("CONTROLLER_WATCH_NAMESPACE") + } + if os.Getenv("IMAGE_DEFAULT_PLATFORM") != "" { + imageDefaultPlatform = os.Getenv("IMAGE_DEFAULT_PLATFORM") + } + if os.Getenv("SLACK_WEBHOOK") != "" { + slackWebhook = os.Getenv("SLACK_WEBHOOK") + } + if os.Getenv("SLACK_MSG_PREFIX") != "" { + slackMsgPrefix = os.Getenv("SLACK_MSG_PREFIX") + } + klog.Infof("Config Flags: %v", map[string]interface{}{ "kubeconfig": kubeconfig, "offDeployments": offDeployments, @@ -50,6 +104,8 @@ func init() { "imageCheckIntervalSec": imageCheckIntervalSec, "controllerWatchKey": controllerWatchKey, "controllerWatchNamespace": controllerWatchNamespace, + "slackWebhook": slackWebhook, + "slackMsgPrefix": slackMsgPrefix, }) } @@ -87,10 +143,16 @@ func NewClientset() *kubernetes.Clientset { } func runWatchers(stopCh chan struct{}) { - clientset := NewClientset() // create a clientset - remoteRegistry := docker.NewRemoteRegistry().WithDefaultPlatform(imageDefaultPlatform) // create a docker remote registry - imageNotifier := imageNotifier.NewImageNotifier(stopCh, remoteRegistry, imageCheckIntervalSec) // create a imageNotifier - optionsModifier := func(options *metaV1.ListOptions) { // optionsModifier selector + logger := logger.NewLogger() + + if slackWebhook != "" { + logger.WithSlack(stopCh, slackWebhook, slackMsgPrefix) + } + + clientset := NewClientset() // create a clientset + remoteRegistry := docker.NewRemoteRegistry().WithDefaultPlatform(imageDefaultPlatform).WithLogger(logger) // create a docker remote registry + imageNotifier := imageNotifier.NewImageNotifier(stopCh, remoteRegistry, imageCheckIntervalSec).WithLogger(logger) // create a imageNotifier + optionsModifier := func(options *metaV1.ListOptions) { // optionsModifier selector options.LabelSelector = controllerWatchKey } @@ -99,7 +161,7 @@ func runWatchers(stopCh chan struct{}) { _, err := clientset.AppsV1().Deployments(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metaV1.PatchOptions{}) return err } - go watcher.NewWatcher("deployments", stopCh, cache.NewFilteredListWatchFromClient(clientset.AppsV1().RESTClient(), "deployments", controllerWatchNamespace, optionsModifier), &appV1.Deployment{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) + go watcher.NewWatcher("deployments", stopCh, logger, cache.NewFilteredListWatchFromClient(clientset.AppsV1().RESTClient(), "deployments", controllerWatchNamespace, optionsModifier), &appV1.Deployment{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) } if !offStatefulsets { // statefulsets watcher @@ -107,7 +169,7 @@ func runWatchers(stopCh chan struct{}) { _, err := clientset.AppsV1().StatefulSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metaV1.PatchOptions{}) return err } - go watcher.NewWatcher("statefulsets", stopCh, cache.NewFilteredListWatchFromClient(clientset.AppsV1().RESTClient(), "statefulsets", controllerWatchNamespace, optionsModifier), &appV1.StatefulSet{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) + go watcher.NewWatcher("statefulsets", stopCh, logger, cache.NewFilteredListWatchFromClient(clientset.AppsV1().RESTClient(), "statefulsets", controllerWatchNamespace, optionsModifier), &appV1.StatefulSet{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) } if !offDaemonsets { // daemonsets watcher @@ -115,7 +177,7 @@ func runWatchers(stopCh chan struct{}) { _, err := clientset.AppsV1().DaemonSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metaV1.PatchOptions{}) return err } - go watcher.NewWatcher("daemonsets", stopCh, cache.NewFilteredListWatchFromClient(clientset.AppsV1().RESTClient(), "daemonsets", controllerWatchNamespace, optionsModifier), &appV1.DaemonSet{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) + go watcher.NewWatcher("daemonsets", stopCh, logger, cache.NewFilteredListWatchFromClient(clientset.AppsV1().RESTClient(), "daemonsets", controllerWatchNamespace, optionsModifier), &appV1.DaemonSet{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) } if !offCronjobs { // cronjobs watcher @@ -123,7 +185,7 @@ func runWatchers(stopCh chan struct{}) { _, err := clientset.BatchV1beta1().CronJobs(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metaV1.PatchOptions{}) return err } - go watcher.NewWatcher("cronjobs", stopCh, cache.NewFilteredListWatchFromClient(clientset.BatchV1beta1().RESTClient(), "cronjobs", controllerWatchNamespace, optionsModifier), &batchV1beta1.CronJob{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) + go watcher.NewWatcher("cronjobs", stopCh, logger, cache.NewFilteredListWatchFromClient(clientset.BatchV1beta1().RESTClient(), "cronjobs", controllerWatchNamespace, optionsModifier), &batchV1beta1.CronJob{}, imageNotifier, controllerWatchKey, applyStrategicMergePatch) } } diff --git a/readme.md b/readme.md index 273760b..6ca44f6 100644 --- a/readme.md +++ b/readme.md @@ -23,6 +23,25 @@ imageStringCacheTTLSec = *flag.Uint("image-hash-cache-ttl-sec", 60, "image has imageCheckIntervalSec = *flag.Uint("image-check-interval-sec", 10, "image check interval in seconds") controllerWatchKey = *flag.String("controller-watch-key", "kube-image-deployer", "controller watch key") controllerWatchNamespace = *flag.String("controller-watch-namespace", "", "controller watch namespace. If empty, watch all namespaces") +imageDefaultPlatform = *flag.String("image-default-platform", "linux/amd64", "default platform for docker images") +slackWebhook = *flag.String("slack-webhook", "", "slack webhook url. If empty, notifications are disabled") +slackMsgPrefix = *flag.String("slack-msg-prefix", "[$hostname]", "slack message prefix. default=[hostname]") +``` + +# Available Environment Variables +```shell +KUBECONFIG_PATH= +OFF_DEPLOYMENTS= +OFF_STATEFULSETS= +OFF_DAEMONSETS= +OFF_CRONJOBS= +IMAGE_HASH_CACHE_TTL_SEC= +IMAGE_CHECK_INTERVAL_SEC= +CONTROLLER_WATCH_KEY= +CONTROLLER_WATCH_NAMESPACE= +IMAGE_DEFAULT_PLATFORM= +SLACK_WEBHOOK= +SLACK_MSG_PREFIX= ``` # 동작 방식 @@ -99,4 +118,3 @@ ECR 이미지 URL이 감지대상인 경우 kube-image-deloyer는 ECR의 GetAuth # Todo * Add Test Code -* Support ECR Private Registry diff --git a/remoteRegistry/docker/docker.go b/remoteRegistry/docker/docker.go index eb2c4e2..b9846cc 100644 --- a/remoteRegistry/docker/docker.go +++ b/remoteRegistry/docker/docker.go @@ -8,6 +8,8 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/pubg/kube-image-deployer/interfaces" + "github.com/pubg/kube-image-deployer/logger" "github.com/pubg/kube-image-deployer/util" ) @@ -15,6 +17,7 @@ type RemoteRegistryDocker struct { imageAuthMap map[string]authn.Authenticator defaultPlatform *v1.Platform cache *util.Cache + logger interfaces.ILogger } // NewRemoteRegistry returns a new RemoteRegistryDocker @@ -23,11 +26,17 @@ func NewRemoteRegistry() *RemoteRegistryDocker { imageAuthMap: make(map[string]authn.Authenticator), cache: util.NewCache(60), defaultPlatform: &v1.Platform{OS: "linux", Architecture: "amd64"}, + logger: logger.NewLogger(), } return d } +func (d *RemoteRegistryDocker) WithLogger(logger interfaces.ILogger) *RemoteRegistryDocker { + d.logger = logger + return d +} + func (d *RemoteRegistryDocker) WithImageAuthMap(imageAuthMap map[string]authn.Authenticator) *RemoteRegistryDocker { d.imageAuthMap = imageAuthMap return d @@ -69,7 +78,7 @@ func (d *RemoteRegistryDocker) getAuthenticator(url string) authn.Authenticator } if isECR(url) { // image가 ecr private repository 인 경우 - return NewECRAuthenticator(url) + return NewECRAuthenticator(url, d.logger) } return nil diff --git a/remoteRegistry/docker/ecr.go b/remoteRegistry/docker/ecr.go index 7188a81..8d5fc33 100644 --- a/remoteRegistry/docker/ecr.go +++ b/remoteRegistry/docker/ecr.go @@ -7,17 +7,22 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ecr" "github.com/google/go-containerregistry/pkg/authn" + "github.com/pubg/kube-image-deployer/interfaces" "github.com/pubg/kube-image-deployer/util" - "k8s.io/klog/v2" ) type ECRAuthenticator struct { url string region string + logger interfaces.ILogger } -func getRegionFromECRURL(url string) string { - return isECRRegex.FindStringSubmatch(url)[1] +func NewECRAuthenticator(url string, logger interfaces.ILogger) *ECRAuthenticator { + return &ECRAuthenticator{ + url: url, + region: getRegionFromECRURL(url), + logger: logger, + } } func (e *ECRAuthenticator) Authorization() (*authn.AuthConfig, error) { @@ -27,10 +32,10 @@ func (e *ECRAuthenticator) Authorization() (*authn.AuthConfig, error) { sess := session.Must(session.NewSessionWithOptions(session.Options{})) svc := ecr.New(sess, aws.NewConfig().WithRegion(e.region)) if token, err := svc.GetAuthorizationToken(&ecr.GetAuthorizationTokenInput{}); err != nil { - klog.Errorf("ECRAuthenticator GetAuthorizationToken error url=%s, err=%v", e.url, err) + e.logger.Errorf("ECRAuthenticator GetAuthorizationToken error url=%s, err=%v", e.url, err) return nil, err } else { - klog.V(4).Infof("ECRAuthenticator GetAuthorizationToken success url=%s, token=%v", e.url, token) + e.logger.Infof("ECRAuthenticator GetAuthorizationToken success url=%s, token=%v", e.url, token) return *token.AuthorizationData[0].AuthorizationToken, nil } }) @@ -56,9 +61,6 @@ func isECR(url string) bool { return isECRRegex.Match([]byte(url)) } -func NewECRAuthenticator(url string) *ECRAuthenticator { - return &ECRAuthenticator{ - url: url, - region: getRegionFromECRURL(url), - } +func getRegionFromECRURL(url string) string { + return isECRRegex.FindStringSubmatch(url)[1] } diff --git a/util/kubernetes.go b/util/kubernetes.go index 41b4fdf..4976080 100644 --- a/util/kubernetes.go +++ b/util/kubernetes.go @@ -7,59 +7,60 @@ import ( appV1 "k8s.io/api/apps/v1" batchV1 "k8s.io/api/batch/v1" coreV1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" ) -func GetAnnotations(obj interface{}) map[string]string { +func GetAnnotations(obj interface{}) (map[string]string, error) { switch t := obj.(type) { case *appV1.Deployment: - return obj.(*appV1.Deployment).Annotations + return obj.(*appV1.Deployment).Annotations, nil case *appV1.StatefulSet: - return obj.(*appV1.StatefulSet).Annotations + return obj.(*appV1.StatefulSet).Annotations, nil case *appV1.DaemonSet: - return obj.(*appV1.DaemonSet).Annotations + return obj.(*appV1.DaemonSet).Annotations, nil case *batchV1.CronJob: - return obj.(*batchV1.CronJob).Annotations + return obj.(*batchV1.CronJob).Annotations, nil default: - klog.Errorf("GetAnnotations unknown type %T", t) - return make(map[string]string) + return make(map[string]string), fmt.Errorf("GetAnnotations unknown type %T", t) } } -func GetContainers(obj interface{}) []coreV1.Container { +func GetContainers(obj interface{}) ([]coreV1.Container, error) { switch t := obj.(type) { case *appV1.Deployment: - return obj.(*appV1.Deployment).Spec.Template.Spec.Containers + return obj.(*appV1.Deployment).Spec.Template.Spec.Containers, nil case *appV1.StatefulSet: - return obj.(*appV1.StatefulSet).Spec.Template.Spec.Containers + return obj.(*appV1.StatefulSet).Spec.Template.Spec.Containers, nil case *appV1.DaemonSet: - return obj.(*appV1.DaemonSet).Spec.Template.Spec.Containers + return obj.(*appV1.DaemonSet).Spec.Template.Spec.Containers, nil case *batchV1.CronJob: - return obj.(*batchV1.CronJob).Spec.JobTemplate.Spec.Template.Spec.Containers + return obj.(*batchV1.CronJob).Spec.JobTemplate.Spec.Template.Spec.Containers, nil default: - klog.Errorf("GetContainers unknown type %T", t) - return make([]coreV1.Container, 0) + return make([]coreV1.Container, 0), fmt.Errorf("GetContainers unknown type %T", t) } } -func GetInitContainers(obj interface{}) []coreV1.Container { +func GetInitContainers(obj interface{}) ([]coreV1.Container, error) { switch t := obj.(type) { case *appV1.Deployment: - return obj.(*appV1.Deployment).Spec.Template.Spec.InitContainers + return obj.(*appV1.Deployment).Spec.Template.Spec.InitContainers, nil case *appV1.StatefulSet: - return obj.(*appV1.StatefulSet).Spec.Template.Spec.InitContainers + return obj.(*appV1.StatefulSet).Spec.Template.Spec.InitContainers, nil case *appV1.DaemonSet: - return obj.(*appV1.DaemonSet).Spec.Template.Spec.InitContainers + return obj.(*appV1.DaemonSet).Spec.Template.Spec.InitContainers, nil case *batchV1.CronJob: - return obj.(*batchV1.CronJob).Spec.JobTemplate.Spec.Template.Spec.InitContainers + return obj.(*batchV1.CronJob).Spec.JobTemplate.Spec.Template.Spec.InitContainers, nil default: - klog.Errorf("GetInitContainers unknown type %T", t) - return make([]coreV1.Container, 0) + return make([]coreV1.Container, 0), fmt.Errorf("GetInitContainers unknown type %T", t) } } func GetContainerByName(obj interface{}, name string) (coreV1.Container, error) { - containers := GetContainers(obj) + containers, err := GetContainers(obj) + + if err != nil { + return coreV1.Container{}, err + } + for _, container := range containers { if container.Name == name { return container, nil @@ -69,7 +70,12 @@ func GetContainerByName(obj interface{}, name string) (coreV1.Container, error) } func GetInitContainerByName(obj interface{}, name string) (coreV1.Container, error) { - containers := GetInitContainers(obj) + containers, err := GetInitContainers(obj) + + if err != nil { + return coreV1.Container{}, err + } + for _, container := range containers { if container.Name == name { return container, nil diff --git a/watcher/watcher.go b/watcher/watcher.go index f52ff77..9e0998d 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -3,6 +3,7 @@ package watcher import ( "github.com/pubg/kube-image-deployer/controller" "github.com/pubg/kube-image-deployer/interfaces" + l "github.com/pubg/kube-image-deployer/logger" pkgRuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -13,27 +14,28 @@ type ApplyStrategicMergePatch = controller.ApplyStrategicMergePatch func NewWatcher( name string, stop chan struct{}, + logger interfaces.ILogger, listWatcher cache.ListerWatcher, objType pkgRuntime.Object, imageNotifier interfaces.IImageNotifier, controllerWatchKey string, applyStrategicMergePatch ApplyStrategicMergePatch, ) { - controller := createDefaultController(name, stop, listWatcher, objType, imageNotifier, controllerWatchKey, applyStrategicMergePatch) - NewWatcherWithController(stop, controller) + controller := createDefaultController(name, stop, logger, listWatcher, objType, imageNotifier, controllerWatchKey, applyStrategicMergePatch) + RunController(stop, controller) } -func NewWatcherWithController( +func RunController( stop chan struct{}, controller interfaces.IController, ) { - go controller.Run(1, stop) // Let's start the controller - <-stop + controller.Run(1, stop) // Let's start the controller } func createDefaultController( name string, stop chan struct{}, + logger interfaces.ILogger, listWatcher cache.ListerWatcher, objType pkgRuntime.Object, imageNotifier interfaces.IImageNotifier, @@ -80,6 +82,11 @@ func createDefaultController( Informer: informer, ImageNotifier: imageNotifier, ControllerWatchKey: controllerWatchKey, + Logger: logger, + } + + if controllerOpt.Logger == nil { + controllerOpt.Logger = l.NewLogger() } controller := controller.NewController(controllerOpt) From bb71a94a56af77d693f3a52fbcf472cb9de245a3 Mon Sep 17 00:00:00 2001 From: Bum-Seok Hwang Date: Wed, 29 Sep 2021 08:55:40 +0000 Subject: [PATCH 2/3] fix readme --- readme.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/readme.md b/readme.md index 6ca44f6..04698ef 100644 --- a/readme.md +++ b/readme.md @@ -31,10 +31,10 @@ slackMsgPrefix = *flag.String("slack-msg-prefix", "[$hostname]", "slac # Available Environment Variables ```shell KUBECONFIG_PATH= -OFF_DEPLOYMENTS= -OFF_STATEFULSETS= -OFF_DAEMONSETS= -OFF_CRONJOBS= +OFF_DEPLOYMENTS= +OFF_STATEFULSETS= +OFF_DAEMONSETS= +OFF_CRONJOBS= IMAGE_HASH_CACHE_TTL_SEC= IMAGE_CHECK_INTERVAL_SEC= CONTROLLER_WATCH_KEY= From dcd7f6e09a644aa6088af58fc1ba26f3791654b5 Mon Sep 17 00:00:00 2001 From: Bum-Seok Hwang Date: Wed, 29 Sep 2021 11:55:01 +0000 Subject: [PATCH 3/3] code clean --- util/kubernetes.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/util/kubernetes.go b/util/kubernetes.go index 4976080..73b8486 100644 --- a/util/kubernetes.go +++ b/util/kubernetes.go @@ -12,13 +12,13 @@ import ( func GetAnnotations(obj interface{}) (map[string]string, error) { switch t := obj.(type) { case *appV1.Deployment: - return obj.(*appV1.Deployment).Annotations, nil + return t.Annotations, nil case *appV1.StatefulSet: - return obj.(*appV1.StatefulSet).Annotations, nil + return t.Annotations, nil case *appV1.DaemonSet: - return obj.(*appV1.DaemonSet).Annotations, nil + return t.Annotations, nil case *batchV1.CronJob: - return obj.(*batchV1.CronJob).Annotations, nil + return t.Annotations, nil default: return make(map[string]string), fmt.Errorf("GetAnnotations unknown type %T", t) } @@ -27,13 +27,13 @@ func GetAnnotations(obj interface{}) (map[string]string, error) { func GetContainers(obj interface{}) ([]coreV1.Container, error) { switch t := obj.(type) { case *appV1.Deployment: - return obj.(*appV1.Deployment).Spec.Template.Spec.Containers, nil + return t.Spec.Template.Spec.Containers, nil case *appV1.StatefulSet: - return obj.(*appV1.StatefulSet).Spec.Template.Spec.Containers, nil + return t.Spec.Template.Spec.Containers, nil case *appV1.DaemonSet: - return obj.(*appV1.DaemonSet).Spec.Template.Spec.Containers, nil + return t.Spec.Template.Spec.Containers, nil case *batchV1.CronJob: - return obj.(*batchV1.CronJob).Spec.JobTemplate.Spec.Template.Spec.Containers, nil + return t.Spec.JobTemplate.Spec.Template.Spec.Containers, nil default: return make([]coreV1.Container, 0), fmt.Errorf("GetContainers unknown type %T", t) } @@ -42,13 +42,13 @@ func GetContainers(obj interface{}) ([]coreV1.Container, error) { func GetInitContainers(obj interface{}) ([]coreV1.Container, error) { switch t := obj.(type) { case *appV1.Deployment: - return obj.(*appV1.Deployment).Spec.Template.Spec.InitContainers, nil + return t.Spec.Template.Spec.InitContainers, nil case *appV1.StatefulSet: - return obj.(*appV1.StatefulSet).Spec.Template.Spec.InitContainers, nil + return t.Spec.Template.Spec.InitContainers, nil case *appV1.DaemonSet: - return obj.(*appV1.DaemonSet).Spec.Template.Spec.InitContainers, nil + return t.Spec.Template.Spec.InitContainers, nil case *batchV1.CronJob: - return obj.(*batchV1.CronJob).Spec.JobTemplate.Spec.Template.Spec.InitContainers, nil + return t.Spec.JobTemplate.Spec.Template.Spec.InitContainers, nil default: return make([]coreV1.Container, 0), fmt.Errorf("GetInitContainers unknown type %T", t) }