diff --git a/pkg/core/event/event.go b/pkg/core/event/event.go index 35890b688..325f9e2e6 100644 --- a/pkg/core/event/event.go +++ b/pkg/core/event/event.go @@ -18,6 +18,8 @@ package event import ( "fmt" + jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" "strings" "sync" @@ -35,6 +37,8 @@ const ( Body = "body" ) +var ErrorDropEvent = errors.New("drop event") + type DefaultMeta struct { Properties map[string]interface{} `json:"properties"` } @@ -123,22 +127,12 @@ func (de *DefaultEvent) Release() { func (de *DefaultEvent) String() string { var sb strings.Builder - sb.WriteString("meta:") - if de.M != nil { - sb.WriteString(de.M.String()) - } - sb.WriteString(";") - sb.WriteString("header:") - sb.WriteString("{") - for k, v := range de.Header() { - sb.WriteString(k) - sb.WriteString(" : ") - sb.WriteString(fmt.Sprintf("%+v", v)) - sb.WriteString(", ") - } - sb.WriteString("}; body:{") + sb.WriteString(`header:`) + header, _ := jsoniter.Marshal(de.Header()) + sb.Write(header) + sb.WriteString(`, body:"`) sb.WriteString(string(de.Body())) - sb.WriteString("}") + sb.WriteString(`"`) return sb.String() } diff --git a/pkg/sink/elasticsearch/client.go b/pkg/sink/elasticsearch/client.go index ca25e8688..1fe4950d8 100644 --- a/pkg/sink/elasticsearch/client.go +++ b/pkg/sink/elasticsearch/client.go @@ -20,6 +20,8 @@ import ( "context" "encoding/json" "fmt" + eventer "github.com/loggie-io/loggie/pkg/core/event" + "github.com/loggie-io/loggie/pkg/core/log" "github.com/loggie-io/loggie/pkg/util/pattern" "strings" @@ -88,9 +90,10 @@ func (c *ClientSet) BulkIndex(ctx context.Context, batch api.Batch) error { headerObj := runtime.NewObject(event.Header()) // select index - idx, err := c.indexPattern.WithObject(headerObj).Render() + idx, err := c.indexPattern.WithObject(headerObj).RenderWithStrict() if err != nil { - return errors.WithMessagef(err, "select index pattern error") + log.Error("render index pattern err: %v; event is: %s", err, event.String()) + continue } data, err := c.codec.Encode(event) @@ -115,6 +118,11 @@ func (c *ClientSet) BulkIndex(ctx context.Context, batch api.Batch) error { req.Add(bulkIndexRequest) } + + if req.NumberOfActions() == 0 { + return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null") + } + ret, err := req.Do(ctx) if err != nil { return err diff --git a/pkg/sink/elasticsearch/elasticsearch.go b/pkg/sink/elasticsearch/elasticsearch.go index ddfed9994..a65d29391 100644 --- a/pkg/sink/elasticsearch/elasticsearch.go +++ b/pkg/sink/elasticsearch/elasticsearch.go @@ -19,6 +19,7 @@ package elasticsearch import ( "context" "fmt" + eventer "github.com/loggie-io/loggie/pkg/core/event" "github.com/pkg/errors" "github.com/loggie-io/loggie/pkg/util/pattern" @@ -103,6 +104,9 @@ func (s *Sink) Consume(batch api.Batch) api.Result { err := s.cli.BulkIndex(context.TODO(), batch) if err != nil { + if errors.Is(err, eventer.ErrorDropEvent) { + return result.DropWith(err) + } return result.Fail(errors.WithMessage(err, "send events to elasticsearch")) } diff --git a/pkg/sink/elasticsearch/pipeline.yml b/pkg/sink/elasticsearch/pipeline.yml new file mode 100644 index 000000000..25d401480 --- /dev/null +++ b/pkg/sink/elasticsearch/pipeline.yml @@ -0,0 +1,4 @@ +sink: + type: elasticsearch + hosts: ["localhost:9200"] + index: "log-${fields.topic}-${+YYYY.MM.DD}" diff --git a/pkg/sink/kafka/pipeline.yml b/pkg/sink/kafka/pipeline.yml new file mode 100644 index 000000000..acf0fd5b8 --- /dev/null +++ b/pkg/sink/kafka/pipeline.yml @@ -0,0 +1,4 @@ +sink: + type: kafka + brokers: ["127.0.0.1:6400"] + topic: "log-${fields.topic}" \ No newline at end of file diff --git a/pkg/sink/kafka/sink.go b/pkg/sink/kafka/sink.go index f7f160a41..33212e532 100644 --- a/pkg/sink/kafka/sink.go +++ b/pkg/sink/kafka/sink.go @@ -19,7 +19,6 @@ package kafka import ( "context" "fmt" - "github.com/loggie-io/loggie/pkg/util/pattern" "github.com/loggie-io/loggie/pkg/util/runtime" "github.com/pkg/errors" @@ -127,8 +126,8 @@ func (s *Sink) Consume(batch api.Batch) api.Result { for _, e := range events { topic, err := s.selectTopic(e) if err != nil { - log.Error("select kafka topic error: %+v", err) - return result.Fail(err) + log.Error("select kafka topic error: %v; event is: %s", err, e.String()) + continue } msg, err := s.cod.Encode(e) @@ -143,6 +142,10 @@ func (s *Sink) Consume(batch api.Batch) api.Result { }) } + if len(km) == 0 { + return result.DropWith(errors.New("send to kafka message batch is null")) + } + if s.writer != nil { err := s.writer.WriteMessages(context.Background(), km...) if err != nil { @@ -156,5 +159,5 @@ func (s *Sink) Consume(batch api.Batch) api.Result { } func (s *Sink) selectTopic(e api.Event) (string, error) { - return s.topicPattern.WithObject(runtime.NewObject(e.Header())).Render() + return s.topicPattern.WithObject(runtime.NewObject(e.Header())).RenderWithStrict() } diff --git a/pkg/util/pattern/pattern.go b/pkg/util/pattern/pattern.go index 5cf045f76..e0ea60ec5 100644 --- a/pkg/util/pattern/pattern.go +++ b/pkg/util/pattern/pattern.go @@ -19,6 +19,7 @@ package pattern import ( "github.com/loggie-io/loggie/pkg/util/runtime" "github.com/loggie-io/loggie/pkg/util/time" + "github.com/pkg/errors" "os" "regexp" "strings" @@ -39,6 +40,8 @@ const ( kindObject = "object" ) +var ErrEmptyMatcher = errors.New("render matcher is empty") + type Pattern struct { Raw string isConstVal bool @@ -141,7 +144,18 @@ func makeMatch(m []string) matcher { return item } +// RenderWithStrict any placeholder rendering empty will return an error +func (p *Pattern) RenderWithStrict() (string, error) { + return p.render(true) +} + func (p *Pattern) Render() (string, error) { + return p.render(false) +} + +// Render to actual results based on placeholders +// If `strict` is set to true, any placeholder rendering empty will return an error. +func (p *Pattern) render(strict bool) (string, error) { if p.isConstVal || len(p.matcher) == 0 { return p.Raw, nil } @@ -164,6 +178,10 @@ func (p *Pattern) Render() (string, error) { alt = p.K8sMatcherRender(m.key) } + if alt == "" && strict { + return "", errors.WithMessagef(ErrEmptyMatcher, "with %s", m.keyWrap) + } + // add old oldNew = append(oldNew, m.keyWrap) // add new diff --git a/pkg/util/pattern/pattern_test.go b/pkg/util/pattern/pattern_test.go index b8a614e00..e14e76126 100644 --- a/pkg/util/pattern/pattern_test.go +++ b/pkg/util/pattern/pattern_test.go @@ -207,6 +207,43 @@ func TestObjectPattern(t *testing.T) { } } +func TestObjectPatternWithStrict(t *testing.T) { + type args struct { + pattern string + obj *runtime.Object + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "got object fields", + args: args{ + pattern: "${a.b}-${a.none}", + obj: runtime.NewObject(map[string]interface{}{ + "a": map[string]interface{}{ + "b": "c", + }}), + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p, err := Init(tt.args.pattern) + if err != nil { + t.Errorf("init pattern error: %v", err) + } + + _, err = p.WithObject(tt.args.obj).RenderWithStrict() + assert.ErrorIs(t, err, ErrEmptyMatcher) + + }) + } +} + func TestK8sPattern(t *testing.T) { testpod := &corev1.Pod{}