diff --git a/pkg/sendbuffer/sendbuffer.go b/pkg/sendbuffer/sendbuffer.go index 8c8633209..246ccd278 100644 --- a/pkg/sendbuffer/sendbuffer.go +++ b/pkg/sendbuffer/sendbuffer.go @@ -30,6 +30,9 @@ type SendBuffer struct { sender sender sendTicker *time.Ticker isSending bool + + // logsJustPurged is used to prevent attempting to delete logs that were just purged + logsJustPurged bool } type option func(*SendBuffer) @@ -108,6 +111,10 @@ func (sb *SendBuffer) Write(in []byte) (int, error) { if len(in)+sb.size > sb.maxStorageSizeBytes { sb.deleteLogs(len(sb.logs)) + // mark that we have just purged the logs so that any waiting deletes + // will not try to delete what was purged + sb.logsJustPurged = true + sb.logger.Log( "msg", "reached capacity, dropping all data and starting over", "method", "Write", @@ -281,16 +288,20 @@ func (sb *SendBuffer) sendAndPurge() error { return nil } - // There is a small possibility that the buffer gets full while were in the middle of sending - // and gets deleted. However, we don't want to block writes while were waiting on a network call - // to send the logs. To live with this, we just have the deleteLogs func make sure that the buffer - // is not empty (ie: did not get purged while waiting on network call). - // testing on a new enrollment in debug mode, log size hit 130K bytes // before enrollment completed and was able to ship logs // 2023-11-16 sb.writeMutex.Lock() defer sb.writeMutex.Unlock() + + // There is a possibility that the log buffer gets full while were in the middle of sending + // and gets deleted. However, we don't want to block writes while were waiting on a network call + // to send the logs. To live with this, we just verify that the logs didn't just get purged. + if sb.logsJustPurged { + sb.logsJustPurged = false + return nil + } + sb.deleteLogs(lastKey) return nil @@ -325,9 +336,6 @@ func (sb *SendBuffer) copyLogs(w io.Writer, maxSizeBytes int) (int, error) { // deleteLogs deletes the logs up to the provided index // it's up to the caller to lock the write mutex func (sb *SendBuffer) deleteLogs(toIndex int) { - if len(sb.logs) == 0 { - return - } sizeDeleted := 0 for i := 0; i < toIndex; i++ { diff --git a/pkg/sendbuffer/sendbuffer_test.go b/pkg/sendbuffer/sendbuffer_test.go index 811865fba..fb77164d8 100644 --- a/pkg/sendbuffer/sendbuffer_test.go +++ b/pkg/sendbuffer/sendbuffer_test.go @@ -115,8 +115,9 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { sb := New( &testSender{lastReceived: &bytes.Buffer{}, t: t}, - WithMaxStorageSizeBytes(1), - WithMaxSendSizeBytes(1), + WithMaxStorageSizeBytes(11), + WithMaxSendSizeBytes(5), + WithSendInterval(100*time.Millisecond), ) // kind of an ugly test, but it was the simplest way to reproduce the issue @@ -130,6 +131,7 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { go func() { for { + time.Sleep(50 * time.Millisecond) sb.sendAndPurge() } }()