Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Commit

Permalink
Merge pull request #58 from opsgenie/multiple-fixes-for-issues
Browse files Browse the repository at this point in the history
Multiple fixes for issues
  • Loading branch information
Fahri YARDIMCI authored Jun 22, 2020
2 parents ae8bfb5 + 077bb66 commit f5e174c
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 111 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM golang:1.14 AS builder

ADD . /app
WORKDIR /app
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO11MODULE=on go build -mod=vendor -v -a -o /main .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO11MODULE=on go build -mod=vendor -a -o /main .

FROM gcr.io/distroless/base
COPY --from=builder /main /kubernetes-event-exporter
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ SQS is an AWS service for message queuing that allows high throughput messaging.
```yaml
# ...
receivers:
- name: "file"
- name: "sqs"
sqs:
queueName: "/tmp/dump"
region: us-west-2
Expand Down
88 changes: 88 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
logLevel: debug
logFormat: json
# namespace: my-namespace-only # Omitting it defaults to all namespaces.
route:
# Main route
routes:
# This route allows dumping all events because it has no fields to match and no drop rules.
- match:
- receiver: "dump"
# This starts another route, drops all the events in *test* namespaces and Normal events
# for capturing critical events
- drop:
- namespace: "*test*"
- type: "Normal"
match:
- receiver: "alert"
- receiver: "pipe"
# This a final route for user messages
- match:
- kind: "Pod|Deployment|ReplicaSet"
labels:
version: "dev"
receiver: "slack"
receivers:
- name: "dump"
elasticsearch:
hosts:
- "http://localhost:9200"
indexFormat: "kube-events-{2006-01-02}"
- name: "alert"
opsgenie:
apiKey: ""
priority: "P3"
message: "Event {{ .Reason }} for {{ .InvolvedObject.Namespace }}/{{ .InvolvedObject.Name }} on K8s cluster"
alias: "{{ .UID }}"
description: "<pre>{{ toPrettyJson . }}</pre>"
tags:
- "event"
- "{{ .Reason }}"
- "{{ .InvolvedObject.Kind }}"
- "{{ .InvolvedObject.Name }}"
- name: "slack"
slack:
token: ""
channel: "#mustafa-test"
message: "Received a Kubernetes Event {{ .Message}}"
fields:
message: "{{ .Message }}"
namespace: "{{ .Namespace }}"
reason: "{{ .Reason }}"
object: "{{ .Namespace }}"
- name: "pipe"
webhook:
endpoint: "http://localhost:3000"
headers:
X-API-KEY: "123-456-OPSGENIE-789-ABC"
User-Agent: "kube-event-exporter 1.0"
streamName: "applicationMetric"
layout:
endpoint: "localhost2"
eventType: "kube-event"
createdAt: "{{ .GetTimestampMs }}"
details:
message: "{{ .Message }}"
reason: "{{ .Reason }}"
tip: "{{ .Type }}"
count: "{{ .Count }}"
kind: "{{ .InvolvedObject.Kind }}"
name: "{{ .InvolvedObject.Name }}"
namespace: "{{ .Namespace }}"
component: "{{ .Source.Component }}"
host: "{{ .Source.Host }}"
labels: "{{ toJson .InvolvedObject.Labels}}"
- name: "kafka"
kafka:
topic: "kube-event"
brokers:
- "localhost:9092"
tls:
enable: false
certFile: "kafka-client.crt"
keyFile: "kafka-client.key"
caFile: "kafka-ca.crt"
- name: "pubsub"
pubsub:
gcloud_project_id: "my-project"
topic: "kube-event"
create_topic: False
85 changes: 5 additions & 80 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,86 +1,11 @@
logLevel: debug
logLevel: error
logFormat: json
# namespace: tekton-pipelines
route:
# Main route
routes:
# This route allows dumping all events because it has no fields to match and no drop rules.
- match:
- receiver: "dump"
# This starts another route, drops all the events in *test* namespaces and Normal events
# for capturing critical events
- drop:
- namespace: "*test*"
- type: "Normal"
match:
- receiver: "alert"
- receiver: "pipe"
# This a final route for user messages
- match:
- kind: "Pod|Deployment|ReplicaSet"
labels:
version: "dev"
receiver: "slack"
receivers:
- name: "dump"
elasticsearch:
hosts:
- "http://localhost:9200"
indexFormat: "kube-events-{2006-01-02}"
- name: "alert"
opsgenie:
apiKey: ""
priority: "P3"
message: "Event {{ .Reason }} for {{ .InvolvedObject.Namespace }}/{{ .InvolvedObject.Name }} on K8s cluster"
alias: "{{ .UID }}"
description: "<pre>{{ toPrettyJson . }}</pre>"
tags:
- "event"
- "{{ .Reason }}"
- "{{ .InvolvedObject.Kind }}"
- "{{ .InvolvedObject.Name }}"
- name: "slack"
slack:
token: ""
channel: "#mustafa-test"
message: "Received a Kubernetes Event {{ .Message}}"
fields:
message: "{{ .Message }}"
namespace: "{{ .Namespace }}"
reason: "{{ .Reason }}"
object: "{{ .Namespace }}"
- name: "pipe"
webhook:
endpoint: "http://localhost:3000"
headers:
X-API-KEY: "123-456-OPSGENIE-789-ABC"
User-Agent: "kube-event-exporter 1.0"
streamName: "applicationMetric"
layout:
endpoint: "localhost2"
eventType: "kube-event"
createdAt: "{{ .GetTimestampMs }}"
details:
message: "{{ .Message }}"
reason: "{{ .Reason }}"
tip: "{{ .Type }}"
count: "{{ .Count }}"
kind: "{{ .InvolvedObject.Kind }}"
name: "{{ .InvolvedObject.Name }}"
namespace: "{{ .Namespace }}"
component: "{{ .Source.Component }}"
host: "{{ .Source.Host }}"
labels: "{{ toJson .InvolvedObject.Labels}}"
- name: "kafka"
kafka:
topic: "kube-event"
brokers:
- "localhost:9092"
tls:
enable: false
certFile: "kafka-client.crt"
keyFile: "kafka-client.key"
caFile: "kafka-ca.crt"
- name: "pubsub"
pubsub:
gcloud_project_id: "my-project"
topic: "kube-event"
create_topic: False
file:
path: "/dev/stdout"
13 changes: 7 additions & 6 deletions deploy/01-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ metadata:
namespace: monitoring
data:
config.yaml: |
logLevel: error
logFormat: json
route:
match:
- receiver: "dump"
routes:
- match:
- receiver: "dump"
receivers:
- name: "dump"
elasticsearch:
hosts:
- http://elasticsearch.monitoring.svc.cluster.local:9200
index: kube-events
file:
path: "/dev/stdout"
2 changes: 1 addition & 1 deletion deploy/02-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
serviceAccountName: event-exporter
containers:
- name: event-exporter
image: opsgenie/kubernetes-event-exporter:0.7
image: opsgenie/kubernetes-event-exporter:0.8
imagePullPolicy: IfNotPresent
args:
- -conf=/data/config.yaml
Expand Down
30 changes: 19 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package main
import (
"context"
"flag"
"io/ioutil"
"os"
"os/signal"
"syscall"
"time"

"github.com/opsgenie/kubernetes-event-exporter/pkg/exporter"
"github.com/opsgenie/kubernetes-event-exporter/pkg/kube"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"
"os/signal"
"syscall"
"time"
)

var (
Expand All @@ -36,10 +35,7 @@ func main() {
log.Fatal().Err(err).Msg("cannot parse config to YAML")
}

log.Logger = log.With().Caller().Logger().Output(zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: time.RFC3339,
}).Level(zerolog.DebugLevel)
log.Logger = log.With().Caller().Logger().Level(zerolog.DebugLevel)

if cfg.LogLevel != "" {
level, err := zerolog.ParseLevel(cfg.LogLevel)
Expand All @@ -49,13 +45,25 @@ func main() {
log.Logger = log.Logger.Level(level)
}

if cfg.LogFormat == "json" {
// Defaults to JSON already nothing to do
} else if cfg.LogFormat == "" || cfg.LogFormat == "pretty" {
log.Logger = log.Logger.Output(zerolog.ConsoleWriter{
Out: os.Stdout,
NoColor: false,
TimeFormat: time.RFC3339,
})
} else {
log.Fatal().Str("log_format", cfg.LogFormat).Msg("Unknown log format")
}

kubeconfig, err := kube.GetKubernetesConfig()
if err != nil {
log.Fatal().Err(err).Msg("cannot get kubeconfig")
}

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{})
w := kube.NewEventWatcher(kubeconfig, engine.OnEvent)
w := kube.NewEventWatcher(kubeconfig, cfg.Namespace, engine.OnEvent)

ctx, cancel := context.WithCancel(context.Background())
leaderLost := make(chan bool)
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type Config struct {
// TODO: There is currently a tight coupling with route and config, but not with receiver config and sink so
// TODO: I am not sure what to do here.
LogLevel string `yaml:"logLevel"`
LogFormat string `yaml:"logFormat"`
Namespace string `yaml:"namespace"`
LeaderElection kube.LeaderElectionConfig `yaml:"leaderElection"`
Route Route `yaml:"route"`
Receivers []sinks.ReceiverConfig `yaml:"receivers"`
Expand Down
29 changes: 29 additions & 0 deletions pkg/exporter/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,32 @@ func TestSubSubRouteWithDrop(t *testing.T) {
assert.True(t, reg.isEventRcvd("osman", &ev))
assert.False(t, reg.isEventRcvd("any", &ev))
}

// Test for issue: https://github.com/opsgenie/kubernetes-event-exporter/issues/51
func Test_GHIssue51(t *testing.T) {
ev1 := kube.EnhancedEvent{}
ev1.Type = "Warning"
ev1.Reason = "FailedCreatePodContainer"

ev2 := kube.EnhancedEvent{}
ev2.Type = "Warning"
ev2.Reason = "FailedCreate"

reg := testReceiverRegistry{}

r := Route{
Drop: []Rule{{
Type: "Normal",
}},
Match: []Rule{{
Reason: "FailedCreatePodContainer",
Receiver: "elastic",
}},
}

r.ProcessEvent(&ev1, &reg)
r.ProcessEvent(&ev2, &reg)

assert.True(t, reg.isEventRcvd("elastic", &ev1))
assert.False(t, reg.isEventRcvd("elastic", &ev2))
}
7 changes: 4 additions & 3 deletions pkg/kube/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ type EventWatcher struct {
fn EventHandler
}

func NewEventWatcher(config *rest.Config, fn EventHandler) *EventWatcher {
func NewEventWatcher(config *rest.Config, namespace string, fn EventHandler) *EventWatcher {
clientset := kubernetes.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactory(clientset, 0)
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace))
informer := factory.Core().V1().Events().Informer()

watcher := &EventWatcher{
Expand Down Expand Up @@ -59,6 +59,7 @@ func (e *EventWatcher) onEvent(event *corev1.Event) {
Str("msg", event.Message).
Str("namespace", event.Namespace).
Str("reason", event.Reason).
Str("involvedObject", event.InvolvedObject.Name).
Msg("Received event")

ev := &EnhancedEvent{
Expand All @@ -77,7 +78,7 @@ func (e *EventWatcher) onEvent(event *corev1.Event) {
annotations, err := e.annotationCache.GetAnnotationsWithCache(&event.InvolvedObject)
if err != nil {
log.Error().Err(err).Msg("Cannot list annotations of the object")
}else {
} else {
ev.InvolvedObject.Annotations = annotations
ev.InvolvedObject.ObjectReference = *event.InvolvedObject.DeepCopy()
}
Expand Down
Loading

0 comments on commit f5e174c

Please sign in to comment.