Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow writing events after fluent.Close() #105

Merged
merged 1 commit into from
Nov 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type Fluent struct {
cancelDialings context.CancelFunc
pending chan *msgToSend
pendingMutex sync.RWMutex
chanClosed bool
closed bool
wg sync.WaitGroup

muconn sync.RWMutex
Expand Down Expand Up @@ -274,7 +274,11 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
if f.Config.Async {
return f.appendBuffer(msg)
}

// Synchronous write
if f.closed {
return fmt.Errorf("fluent#postRawData: Logger already closed")
}
return f.writeWithRetry(context.Background(), msg)
}

Expand Down Expand Up @@ -350,11 +354,11 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
func (f *Fluent) Close() (err error) {
if f.Config.Async {
f.pendingMutex.Lock()
if f.chanClosed {
if f.closed {
f.pendingMutex.Unlock()
return nil
}
f.chanClosed = true
f.closed = true
f.pendingMutex.Unlock()

if f.Config.ForceStopAsyncSend {
Expand All @@ -364,7 +368,7 @@ func (f *Fluent) Close() (err error) {

close(f.pending)
// If ForceStopAsyncSend is false, all logs in the channel have to be sent
// before closing the connection. At this point chanClosed is true so no more
// before closing the connection. At this point closed is true so no more
// logs are written to the channel and f.pending has been closed, so run()
// goroutine will exit as soon as all logs in the channel are sent.
if !f.Config.ForceStopAsyncSend {
Expand All @@ -374,6 +378,7 @@ func (f *Fluent) Close() (err error) {

f.muconn.Lock()
f.close()
f.closed = true
f.muconn.Unlock()

// If ForceStopAsyncSend is true, we shall close the connection before waiting for
Expand All @@ -383,15 +388,14 @@ func (f *Fluent) Close() (err error) {
if f.Config.ForceStopAsyncSend {
f.wg.Wait()
}

return
}

// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(msg *msgToSend) error {
f.pendingMutex.RLock()
defer f.pendingMutex.RUnlock()
if f.chanClosed {
if f.closed {
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
}
select {
Expand Down
33 changes: 33 additions & 0 deletions fluent/fluent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,39 @@ func TestCloseWhileWaitingForAckResponse(t *testing.T) {
}, "failed to close the logger")
}

func TestSyncWriteAfterCloseFails(t *testing.T) {
d := newTestDialer()

go func() {
f, err := newWithDialer(Config{Async: false}, d)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

err = f.Close()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

// Now let's post some event after Fluent.Close().
err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"})

// The event submission must fail,
assert.NotEqual(t, err, nil);

// and also must keep Fluentd closed.
assert.NotEqual(t, f.closed, false);
}()

conn := d.waitForNextDialing(true, false)
conn.waitForNextWrite(true, "")
}

func Benchmark_PostWithShortMessage(b *testing.B) {
b.StopTimer()
d := newTestDialer()
Expand Down