diff --git a/fluent/fluent.go b/fluent/fluent.go index 7963a7a..29f901e 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -13,6 +13,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "bytes" @@ -109,6 +110,7 @@ type Fluent struct { // cancelDialings is used by Close() to stop any in-progress dialing. cancelDialings context.CancelFunc pending chan *msgToSend + dropped atomic.Uint64 pendingMutex sync.RWMutex closed bool wg sync.WaitGroup @@ -126,7 +128,10 @@ type Metrics struct { BufferLimit int // number of logs waiting to be sent. - PendingLogCount int + PendingLogCount uint64 + + // number of logs dropped + DroppedLogCount uint64 } type dialer interface { @@ -209,27 +214,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { // // Examples: // -// // send map[string] -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// f.Post("tag_name", mapStringData) -// -// // send message with specified time -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// tm := time.Now() -// f.PostWithTime("tag_name", tm, mapStringData) +// // send map[string] +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// f.Post("tag_name", mapStringData) // -// // send struct -// structData := struct { -// Name string `msg:"name"` -// } { -// "john smith", -// } -// f.Post("tag_name", structData) +// // send message with specified time +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// tm := time.Now() +// f.PostWithTime("tag_name", tm, mapStringData) // +// // send struct +// structData := struct { +// Name string `msg:"name"` +// } { +// "john smith", +// } +// f.Post("tag_name", structData) func (f *Fluent) Post(tag string, message interface{}) error { timeNow := time.Now() return f.PostWithTime(tag, timeNow, message) @@ -419,17 +423,19 @@ func (f *Fluent) Close() (err error) { } // get current metrics about Fluent client. -func (f *Fluent) GetMetrics() interface{} { +func (f *Fluent) GetMetrics() *Metrics { if f.Config.Async { return &Metrics{ BufferLimit: f.Config.BufferLimit, - PendingLogCount: len(f.pending), + PendingLogCount: uint64(len(f.pending)), + DroppedLogCount: f.dropped.Load(), } } return &Metrics{ BufferLimit: 0, PendingLogCount: 0, + DroppedLogCount: 0, } } @@ -443,6 +449,7 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error { select { case f.pending <- msg: default: + f.dropped.Add(1) return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit) } return nil