From b8ae2fa979932e1a9787f471bf5d3b0998c06341 Mon Sep 17 00:00:00 2001 From: James Pickett Date: Fri, 31 Jan 2025 12:32:22 -0800 Subject: [PATCH 1/6] test and fix panic --- pkg/sendbuffer/sendbuffer.go | 12 ++++++++++++ pkg/sendbuffer/sendbuffer_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/pkg/sendbuffer/sendbuffer.go b/pkg/sendbuffer/sendbuffer.go index 19db5762a..3ea2f6af6 100644 --- a/pkg/sendbuffer/sendbuffer.go +++ b/pkg/sendbuffer/sendbuffer.go @@ -280,6 +280,12 @@ func (sb *SendBuffer) sendAndPurge() error { sb.logger.Log("msg", "failed to send, will retry", "err", err) return nil } + + // There is a small possibility that the buffer gets full while were in the middle of sending + // and gets deleted. However, we don't want to block writes while were waiting on a network call + // to send the logs. To live with this, we just have the deleteLogs func make sure that the buffer + // is not empty (ie) got purged while waiting on network call. + // testing on a new enrollment in debug mode, log size hit 130K bytes // before enrollment completed and was able to ship logs // 2023-11-16 @@ -316,7 +322,13 @@ func (sb *SendBuffer) copyLogs(w io.Writer, maxSizeBytes int) (int, error) { return lastLogIndex, nil } +// deleteLogs deletes the logs up to the provided index +// it's up to the caller to lock the write mutex func (sb *SendBuffer) deleteLogs(toIndex int) { + if len(sb.logs) == 0 { + return + } + sizeDeleted := 0 for i := 0; i < toIndex; i++ { sizeDeleted += len(sb.logs[i]) diff --git a/pkg/sendbuffer/sendbuffer_test.go b/pkg/sendbuffer/sendbuffer_test.go index a29c81d4c..811865fba 100644 --- a/pkg/sendbuffer/sendbuffer_test.go +++ b/pkg/sendbuffer/sendbuffer_test.go @@ -107,6 +107,36 @@ func TestSendBuffer(t *testing.T) { } } +// TestBufferFullPurgeWaitsForSendAndPurge resulted from a panic found in production where +// if the buffer was full while sendAndPurge was running, the buffer get wiped and then +// sendAndPurge would try to delete the portion of the buffer it just sent, causing a panic +func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { + t.Parallel() + + sb := New( + &testSender{lastReceived: &bytes.Buffer{}, t: t}, + WithMaxStorageSizeBytes(1), + WithMaxSendSizeBytes(1), + ) + + // kind of an ugly test, but it was the simplest way to reproduce the issue + // if the issue is present, we'll get a panic: runtime error: index out of range [0] with length 0 + + go func() { + for { + sb.Write([]byte("1")) + } + }() + + go func() { + for { + sb.sendAndPurge() + } + }() + + time.Sleep(1 * time.Second) +} + func testStringArray(size int) []string { arr := make([]string, size) From 2cb144e11b263bbdaba78039ba04ab3673042ae5 Mon Sep 17 00:00:00 2001 From: James Pickett Date: Fri, 31 Jan 2025 12:33:29 -0800 Subject: [PATCH 2/6] typo fix --- pkg/sendbuffer/sendbuffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sendbuffer/sendbuffer.go b/pkg/sendbuffer/sendbuffer.go index 3ea2f6af6..5dc8fb087 100644 --- a/pkg/sendbuffer/sendbuffer.go +++ b/pkg/sendbuffer/sendbuffer.go @@ -284,7 +284,7 @@ func (sb *SendBuffer) sendAndPurge() error { // There is a small possibility that the buffer gets full while were in the middle of sending // and gets deleted. However, we don't want to block writes while were waiting on a network call // to send the logs. To live with this, we just have the deleteLogs func make sure that the buffer - // is not empty (ie) got purged while waiting on network call. + // is not empty (ie: got purged while waiting on network call). // testing on a new enrollment in debug mode, log size hit 130K bytes // before enrollment completed and was able to ship logs From a3384cd3dba29a9818be855dcf5aaad4d56e72fc Mon Sep 17 00:00:00 2001 From: James Pickett Date: Fri, 31 Jan 2025 12:35:41 -0800 Subject: [PATCH 3/6] grammer --- pkg/sendbuffer/sendbuffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sendbuffer/sendbuffer.go b/pkg/sendbuffer/sendbuffer.go index 5dc8fb087..8c8633209 100644 --- a/pkg/sendbuffer/sendbuffer.go +++ b/pkg/sendbuffer/sendbuffer.go @@ -284,7 +284,7 @@ func (sb *SendBuffer) sendAndPurge() error { // There is a small possibility that the buffer gets full while were in the middle of sending // and gets deleted. However, we don't want to block writes while were waiting on a network call // to send the logs. To live with this, we just have the deleteLogs func make sure that the buffer - // is not empty (ie: got purged while waiting on network call). + // is not empty (ie: did not get purged while waiting on network call). // testing on a new enrollment in debug mode, log size hit 130K bytes // before enrollment completed and was able to ship logs From 769c1894d5eebbf91327527b84058a1e4794e915 Mon Sep 17 00:00:00 2001 From: James Pickett Date: Fri, 31 Jan 2025 14:42:06 -0800 Subject: [PATCH 4/6] use flag for tracking when logs got purged --- pkg/sendbuffer/sendbuffer.go | 24 ++++++++++++++++-------- pkg/sendbuffer/sendbuffer_test.go | 6 ++++-- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/pkg/sendbuffer/sendbuffer.go b/pkg/sendbuffer/sendbuffer.go index 8c8633209..246ccd278 100644 --- a/pkg/sendbuffer/sendbuffer.go +++ b/pkg/sendbuffer/sendbuffer.go @@ -30,6 +30,9 @@ type SendBuffer struct { sender sender sendTicker *time.Ticker isSending bool + + // logsJustPurged is used to prevent attempting to delete logs that were just purged + logsJustPurged bool } type option func(*SendBuffer) @@ -108,6 +111,10 @@ func (sb *SendBuffer) Write(in []byte) (int, error) { if len(in)+sb.size > sb.maxStorageSizeBytes { sb.deleteLogs(len(sb.logs)) + // mark that we have just purged the logs so that any waiting deletes + // will not try to delete what was purged + sb.logsJustPurged = true + sb.logger.Log( "msg", "reached capacity, dropping all data and starting over", "method", "Write", @@ -281,16 +288,20 @@ func (sb *SendBuffer) sendAndPurge() error { return nil } - // There is a small possibility that the buffer gets full while were in the middle of sending - // and gets deleted. However, we don't want to block writes while were waiting on a network call - // to send the logs. To live with this, we just have the deleteLogs func make sure that the buffer - // is not empty (ie: did not get purged while waiting on network call). - // testing on a new enrollment in debug mode, log size hit 130K bytes // before enrollment completed and was able to ship logs // 2023-11-16 sb.writeMutex.Lock() defer sb.writeMutex.Unlock() + + // There is a possibility that the log buffer gets full while were in the middle of sending + // and gets deleted. However, we don't want to block writes while were waiting on a network call + // to send the logs. To live with this, we just verify that the logs didn't just get purged. + if sb.logsJustPurged { + sb.logsJustPurged = false + return nil + } + sb.deleteLogs(lastKey) return nil @@ -325,9 +336,6 @@ func (sb *SendBuffer) copyLogs(w io.Writer, maxSizeBytes int) (int, error) { // deleteLogs deletes the logs up to the provided index // it's up to the caller to lock the write mutex func (sb *SendBuffer) deleteLogs(toIndex int) { - if len(sb.logs) == 0 { - return - } sizeDeleted := 0 for i := 0; i < toIndex; i++ { diff --git a/pkg/sendbuffer/sendbuffer_test.go b/pkg/sendbuffer/sendbuffer_test.go index 811865fba..fb77164d8 100644 --- a/pkg/sendbuffer/sendbuffer_test.go +++ b/pkg/sendbuffer/sendbuffer_test.go @@ -115,8 +115,9 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { sb := New( &testSender{lastReceived: &bytes.Buffer{}, t: t}, - WithMaxStorageSizeBytes(1), - WithMaxSendSizeBytes(1), + WithMaxStorageSizeBytes(11), + WithMaxSendSizeBytes(5), + WithSendInterval(100*time.Millisecond), ) // kind of an ugly test, but it was the simplest way to reproduce the issue @@ -130,6 +131,7 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { go func() { for { + time.Sleep(50 * time.Millisecond) sb.sendAndPurge() } }() From 6899346e2a98fa89f4a979664884a47f92664f44 Mon Sep 17 00:00:00 2001 From: James Pickett Date: Fri, 31 Jan 2025 14:43:02 -0800 Subject: [PATCH 5/6] fix comment --- pkg/sendbuffer/sendbuffer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sendbuffer/sendbuffer_test.go b/pkg/sendbuffer/sendbuffer_test.go index fb77164d8..2c8271ce8 100644 --- a/pkg/sendbuffer/sendbuffer_test.go +++ b/pkg/sendbuffer/sendbuffer_test.go @@ -121,7 +121,7 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { ) // kind of an ugly test, but it was the simplest way to reproduce the issue - // if the issue is present, we'll get a panic: runtime error: index out of range [0] with length 0 + // if the issue is present, we'll get a panic: runtime error: index out of range [x] with length x go func() { for { From 690594b24110d00f4d5002c4e62ed6f62cc2fbed Mon Sep 17 00:00:00 2001 From: James Pickett Date: Fri, 31 Jan 2025 16:33:57 -0800 Subject: [PATCH 6/6] better test name --- pkg/sendbuffer/sendbuffer.go | 1 - pkg/sendbuffer/sendbuffer_test.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/sendbuffer/sendbuffer.go b/pkg/sendbuffer/sendbuffer.go index 246ccd278..44ada8f50 100644 --- a/pkg/sendbuffer/sendbuffer.go +++ b/pkg/sendbuffer/sendbuffer.go @@ -336,7 +336,6 @@ func (sb *SendBuffer) copyLogs(w io.Writer, maxSizeBytes int) (int, error) { // deleteLogs deletes the logs up to the provided index // it's up to the caller to lock the write mutex func (sb *SendBuffer) deleteLogs(toIndex int) { - sizeDeleted := 0 for i := 0; i < toIndex; i++ { sizeDeleted += len(sb.logs[i]) diff --git a/pkg/sendbuffer/sendbuffer_test.go b/pkg/sendbuffer/sendbuffer_test.go index 2c8271ce8..5be4ec816 100644 --- a/pkg/sendbuffer/sendbuffer_test.go +++ b/pkg/sendbuffer/sendbuffer_test.go @@ -107,10 +107,10 @@ func TestSendBuffer(t *testing.T) { } } -// TestBufferFullPurgeWaitsForSendAndPurge resulted from a panic found in production where +// TestSendAndPurgeHandlesLogBufferFullPurge resulted from a panic found in production where // if the buffer was full while sendAndPurge was running, the buffer get wiped and then // sendAndPurge would try to delete the portion of the buffer it just sent, causing a panic -func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) { +func TestSendAndPurgeHandlesLogBufferFullPurge(t *testing.T) { t.Parallel() sb := New(