Skip to content

Commit

Permalink
Merge pull request #14 from pubg/feature/slack-webhook
Browse files Browse the repository at this point in the history
feature: Send logs to Slack using a Slack webhook #13
  • Loading branch information
bitofsky authored Sep 30, 2021
2 parents a9ab1b3 + dcd7f6e commit 5c16504
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 75 deletions.
12 changes: 7 additions & 5 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/pubg/kube-image-deployer/interfaces"
"k8s.io/klog/v2"

pkgRuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -24,6 +23,7 @@ type Controller struct {
informer cache.Controller
imageNotifier interfaces.IImageNotifier
applyStrategicMergePatch ApplyStrategicMergePatch
logger interfaces.ILogger

syncedImages map[Image]bool
syncedImagesMutex sync.RWMutex
Expand All @@ -45,6 +45,7 @@ type ControllerOpt struct {
ImageNotifier interfaces.IImageNotifier
ApplyStrategicMergePatch ApplyStrategicMergePatch
ControllerWatchKey string
Logger interfaces.ILogger
}

// NewController creates a new Controller.
Expand All @@ -58,6 +59,7 @@ func NewController(opt ControllerOpt) *Controller {
imageNotifier: opt.ImageNotifier,
applyStrategicMergePatch: opt.ApplyStrategicMergePatch,
watchKey: opt.ControllerWatchKey,
logger: opt.Logger,
syncedImages: make(map[Image]bool),
syncedImagesMutex: sync.RWMutex{},
imageUpdateNotifyList: make([]imageUpdateNotify, 0),
Expand Down Expand Up @@ -95,7 +97,7 @@ func (c *Controller) handleErr(err error, key interface{}) {

// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
klog.V(2).Infof("[%s] Error syncing %v: %v", c.resource, key, err)
c.logger.Infof("[%s] Error syncing %v: %v", c.resource, key, err)

// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
Expand All @@ -106,7 +108,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
runtime.HandleError(err)
klog.V(2).Infof("[%s] Dropping out of the queue: key:%q, err:%v", c.resource, key, err)
c.logger.Infof("[%s] Dropping out of the queue: key:%q, err:%v", c.resource, key, err)
}

// Run begins watching and syncing.
Expand All @@ -115,7 +117,7 @@ func (c *Controller) Run(workers int, stopCh chan struct{}) {

// Let the workers stop when we are done
defer c.queue.ShutDown()
klog.V(2).Infof("[%s] Starting controller", c.resource)
c.logger.Infof("[%s] Starting controller", c.resource)

go c.informer.Run(stopCh)

Expand All @@ -132,7 +134,7 @@ func (c *Controller) Run(workers int, stopCh chan struct{}) {
go wait.Until(c.patchUpdateNotifyList, time.Second, stopCh)

<-stopCh
klog.V(2).Infof("[%s] Stopping controller", c.resource)
c.logger.Infof("[%s] Stopping controller", c.resource)
}

func (c *Controller) GetReresourceName() string {
Expand Down
11 changes: 5 additions & 6 deletions controller/onUpdateImageHash.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/pubg/kube-image-deployer/util"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

type patch struct {
Expand Down Expand Up @@ -44,7 +43,7 @@ func (c *Controller) getPatchMapByUpdates(updates []imageUpdateNotify) map[strin
patchMap := make(map[string][]patch)

for _, update := range updates {
klog.V(2).Infof("[%s] OnUpdateImageString %s, %s, %s", c.resource, update.url, update.tag, update.imageString)
c.logger.Infof("[%s] OnUpdateImageString %s, %s, %s", c.resource, update.url, update.tag, update.imageString)

c.syncedImagesMutex.RLock()
defer c.syncedImagesMutex.RUnlock()
Expand Down Expand Up @@ -90,7 +89,7 @@ func (c *Controller) applyPatchList(key string, patchList []patch) error {
}

for _, patch := range patchList {
klog.V(2).Infof("[%s] OnUpdateImageString patch %+v", c.resource, patch)
c.logger.Infof("[%s] OnUpdateImageString patch %+v", c.resource, patch)

if !exists { // 삭제된 리소스인 경우 무시
return fmt.Errorf("[%s] OnUpdateImageString patch not exists key=%s", c.resource, key)
Expand Down Expand Up @@ -124,7 +123,7 @@ func (c *Controller) applyPatchList(key string, patchList []patch) error {
}

if len(Containers) == 0 && len(InitContainers) == 0 { // 변경된 이미지가 없는 경우 무시
klog.V(2).Infof("[%s] OnUpdateImageString patch containers not changed %+v", c.resource, patchList)
c.logger.Infof("[%s] OnUpdateImageString patch containers not changed %+v", c.resource, patchList)
return nil
}

Expand All @@ -140,7 +139,7 @@ func (c *Controller) applyPatchList(key string, patchList []patch) error {
return fmt.Errorf("[%s] OnUpdateImageString patch apply error namespace=%s, name=%s, patchString=%s, err=%s", c.resource, namespace, name, patchString, err)
}

klog.Warningf("[%s] OnUpdateImageString patch apply success namespace=%s, name=%s, patchString=%s", c.resource, namespace, name, patchString)
c.logger.Warningf("[%s] OnUpdateImageString patch apply success namespace=%s, name=%s, patchString=%s", c.resource, namespace, name, patchString)
return nil

}
Expand Down Expand Up @@ -176,7 +175,7 @@ func (c *Controller) patchUpdateNotifyList() {

for key, patchList := range patchMap {
if err := c.applyPatchList(key, patchList); err != nil {
klog.Error(err)
c.logger.Errorf(err.Error()) // just logging
}
}

Expand Down
16 changes: 10 additions & 6 deletions controller/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"strings"

"github.com/pubg/kube-image-deployer/util"
"k8s.io/klog"
)

type Image struct {
Expand All @@ -18,7 +17,7 @@ func (c *Controller) syncKey(key string) error {

obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("[%s] Fetching object with key %s from store failed with %v", c.resource, key, err)
c.logger.Errorf("[%s] Fetching object with key %s from store failed with %v", c.resource, key, err)
return err
}

Expand Down Expand Up @@ -50,7 +49,12 @@ func (c *Controller) syncKey(key string) error {
func (c *Controller) getImagesFromCurrentWorkload(obj interface{}, key string) (images map[Image]bool) {

images = make(map[Image]bool)
annotations := util.GetAnnotations(obj)
annotations, err := util.GetAnnotations(obj)

if err != nil {
c.logger.Errorf("[%s] GetAnnotations error : %v", c.resource, err)
return
}

for annotationKey, annotationValue := range annotations {

Expand Down Expand Up @@ -79,7 +83,7 @@ func (c *Controller) getImagesFromCurrentWorkload(obj interface{}, key string) (
}

if len(images) == 0 {
klog.Warningf("[%s] getImagesFromCurrentWorkload undefined or invalid annotation key=%s\n", c.resource, key)
c.logger.Warningf("[%s] getImagesFromCurrentWorkload undefined or invalid annotation key=%s\n", c.resource, key)
}

return
Expand Down Expand Up @@ -112,7 +116,7 @@ func (c *Controller) registImage(image Image) {
c.syncedImagesMutex.Unlock()
return
} else {
klog.V(2).Infof("[%s] registImage image=%+v\n", c.resource, image)
c.logger.Infof("[%s] registImage image=%+v\n", c.resource, image)
c.syncedImages[image] = true
c.syncedImagesMutex.Unlock()
}
Expand All @@ -127,7 +131,7 @@ func (c *Controller) unregistImage(image Image) {
return
}

klog.V(2).Infof("[%s] unregistImage image=%+v\n", c.resource, image)
c.logger.Infof("[%s] unregistImage image=%+v\n", c.resource, image)

c.syncedImagesMutex.Lock()
delete(c.syncedImages, image)
Expand Down
23 changes: 15 additions & 8 deletions imageNotifier/imageNotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"time"

"github.com/pubg/kube-image-deployer/interfaces"
l "github.com/pubg/kube-image-deployer/logger"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)

type ImageNotifierId struct {
Expand All @@ -22,19 +22,26 @@ type ImageNotifier struct {
stopCh chan struct{}

remoteRegistry interfaces.IRemoteRegistry
logger interfaces.ILogger
}

func NewImageNotifier(stopCh chan struct{}, remoteRegistry interfaces.IRemoteRegistry, imageCheckIntervalSec uint) *ImageNotifier {
imageNotifier := &ImageNotifier{
r := &ImageNotifier{
list: make(map[ImageNotifierId]*ImageUpdateNotify),
mutex: sync.RWMutex{},
stopCh: stopCh,
remoteRegistry: remoteRegistry,
logger: l.NewLogger(),
}

go wait.Until(imageNotifier.checkAllImageNotifyList, time.Second*time.Duration(imageCheckIntervalSec), stopCh)
go wait.Until(r.checkAllImageNotifyList, time.Second*time.Duration(imageCheckIntervalSec), stopCh)

return imageNotifier
return r
}

func (r *ImageNotifier) WithLogger(logger interfaces.ILogger) *ImageNotifier {
r.logger = logger
return r
}

// RegistImage regist to imageNotifier
Expand All @@ -53,7 +60,7 @@ func (r *ImageNotifier) RegistImage(controller interfaces.IController, url, tag,
}

// 신규
klog.Infof("[%s] RegistImage %s:%s\n", controller.GetReresourceName(), url, tag)
r.logger.Infof("[%s] RegistImage %s:%s\n", controller.GetReresourceName(), url, tag)

imageUpdateNotify := NewImageUpdateNotify(url, tag, "", controller)

Expand All @@ -71,23 +78,23 @@ func (r *ImageNotifier) UnregistImage(controller interfaces.IController, url, ta
existsImageUpdateNotify, ok := r.list[notifyId]

if !ok { // ??
klog.Errorf("[%s] UnregistImage existsImageUpdateNotify notfound url=%s, tag=%s", controller.GetReresourceName(), url, tag)
r.logger.Errorf("[%s] UnregistImage existsImageUpdateNotify notfound url=%s, tag=%s", controller.GetReresourceName(), url, tag)
return
}

referenceCount := existsImageUpdateNotify.subReferenceCount()

if referenceCount <= 0 { // 이미지를 참조하는 대상이 더이상 없으면 삭제
delete(r.list, notifyId)
klog.Infof("[%s] UnregistImage %s:%s\n", controller.GetReresourceName(), url, tag)
r.logger.Infof("[%s] UnregistImage %s:%s\n", controller.GetReresourceName(), url, tag)
}

}

func (r *ImageNotifier) checkImageUpdate(image checkImage) {
imageString, err := r.remoteRegistry.GetImageString(image.url, image.tag, "")
if err != nil {
klog.Errorf("[%s] checkImageUpdate %s:%s err=%s\n", image.controller.GetReresourceName(), image.url, image.tag, err)
r.logger.Errorf("[%s] checkImageUpdate %s:%s err=%s\n", image.controller.GetReresourceName(), image.url, image.tag, err)
return
}

Expand Down
6 changes: 6 additions & 0 deletions interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ type IImageNotifier interface {
type IRemoteRegistry interface {
GetImageString(url, tag, platformString string) (string, error)
}

type ILogger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
Warningf(format string, args ...interface{})
}
56 changes: 56 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package logger

import (
"fmt"

"k8s.io/klog"
)

type Logger struct {
slack *Slack
depth int
}

func NewLogger() *Logger {
return &Logger{
depth: 1,
slack: nil, // default=disabled
}
}

func (l *Logger) SetDepth(depth int) *Logger {
l.depth = depth
return l
}

func (l *Logger) WithSlack(stopCh chan struct{}, webhookUrl, msgPrefix string) *Logger {
l.slack = NewSlack(stopCh, webhookUrl, msgPrefix)
return l
}

func (l *Logger) Infof(format string, args ...interface{}) {
if !klog.V(2) {
return
}
msg := fmt.Sprintf(format, args...)
klog.InfoDepth(l.depth, msg)
if l.slack != nil {
l.slack.InfoDepth(l.depth+1, msg)
}
}

func (l *Logger) Errorf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
klog.ErrorDepth(l.depth, msg)
if l.slack != nil {
l.slack.ErrorDepth(l.depth+1, msg)
}
}

func (l *Logger) Warningf(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
klog.WarningDepth(l.depth, msg)
if l.slack != nil {
l.slack.WarningDepth(l.depth+1, msg)
}
}
Loading

0 comments on commit 5c16504

Please sign in to comment.