Skip to content

Commit

Permalink
Add metrics to measure dropped logs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlry committed Sep 11, 2023
1 parent 488d881 commit 2f8fc86
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"

"bytes"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Fluent struct {
// cancelDialings is used by Close() to stop any in-progress dialing.
cancelDialings context.CancelFunc
pending chan *msgToSend
dropped atomic.Uint64

Check failure on line 113 in fluent/fluent.go

View workflow job for this annotation

GitHub Actions / CI golang 1.16 on ubuntu

undefined: "sync/atomic".Uint64
pendingMutex sync.RWMutex
closed bool
wg sync.WaitGroup
Expand All @@ -126,7 +128,10 @@ type Metrics struct {
BufferLimit int

// number of logs waiting to be sent.
PendingLogCount int
PendingLogCount uint64

// number of logs dropped
DroppedLogCount uint64
}

type dialer interface {
Expand Down Expand Up @@ -209,27 +214,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
//
// Examples:
//
// // send map[string]
// mapStringData := map[string]string{
// "foo": "bar",
// }
// f.Post("tag_name", mapStringData)
//
// // send message with specified time
// mapStringData := map[string]string{
// "foo": "bar",
// }
// tm := time.Now()
// f.PostWithTime("tag_name", tm, mapStringData)
// // send map[string]
// mapStringData := map[string]string{
// "foo": "bar",
// }
// f.Post("tag_name", mapStringData)
//
// // send struct
// structData := struct {
// Name string `msg:"name"`
// } {
// "john smith",
// }
// f.Post("tag_name", structData)
// // send message with specified time
// mapStringData := map[string]string{
// "foo": "bar",
// }
// tm := time.Now()
// f.PostWithTime("tag_name", tm, mapStringData)
//
// // send struct
// structData := struct {
// Name string `msg:"name"`
// } {
// "john smith",
// }
// f.Post("tag_name", structData)
func (f *Fluent) Post(tag string, message interface{}) error {
timeNow := time.Now()
return f.PostWithTime(tag, timeNow, message)
Expand Down Expand Up @@ -419,17 +423,19 @@ func (f *Fluent) Close() (err error) {
}

// get current metrics about Fluent client.
func (f *Fluent) GetMetrics() interface{} {
func (f *Fluent) GetMetrics() *Metrics {
if f.Config.Async {
return &Metrics{
BufferLimit: f.Config.BufferLimit,
PendingLogCount: len(f.pending),
PendingLogCount: uint64(len(f.pending)),
DroppedLogCount: f.dropped.Load(),
}
}

return &Metrics{
BufferLimit: 0,
PendingLogCount: 0,
DroppedLogCount: 0,
}
}

Expand All @@ -443,6 +449,7 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
select {
case f.pending <- msg:
default:
f.dropped.Add(1)
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
}
return nil
Expand Down

0 comments on commit 2f8fc86

Please sign in to comment.