Skip to content

Commit

Permalink
use flag for tracking when logs got purged
Browse files Browse the repository at this point in the history
  • Loading branch information
James-Pickett committed Jan 31, 2025
1 parent a3384cd commit 769c189
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
24 changes: 16 additions & 8 deletions pkg/sendbuffer/sendbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type SendBuffer struct {
sender sender
sendTicker *time.Ticker
isSending bool

// logsJustPurged is used to prevent attempting to delete logs that were just purged
logsJustPurged bool
}

type option func(*SendBuffer)
Expand Down Expand Up @@ -108,6 +111,10 @@ func (sb *SendBuffer) Write(in []byte) (int, error) {
if len(in)+sb.size > sb.maxStorageSizeBytes {
sb.deleteLogs(len(sb.logs))

// mark that we have just purged the logs so that any waiting deletes
// will not try to delete what was purged
sb.logsJustPurged = true

sb.logger.Log(
"msg", "reached capacity, dropping all data and starting over",
"method", "Write",
Expand Down Expand Up @@ -281,16 +288,20 @@ func (sb *SendBuffer) sendAndPurge() error {
return nil
}

// There is a small possibility that the buffer gets full while were in the middle of sending
// and gets deleted. However, we don't want to block writes while were waiting on a network call
// to send the logs. To live with this, we just have the deleteLogs func make sure that the buffer
// is not empty (ie: did not get purged while waiting on network call).

// testing on a new enrollment in debug mode, log size hit 130K bytes
// before enrollment completed and was able to ship logs
// 2023-11-16
sb.writeMutex.Lock()
defer sb.writeMutex.Unlock()

// There is a possibility that the log buffer gets full while were in the middle of sending
// and gets deleted. However, we don't want to block writes while were waiting on a network call
// to send the logs. To live with this, we just verify that the logs didn't just get purged.
if sb.logsJustPurged {
sb.logsJustPurged = false
return nil
}

sb.deleteLogs(lastKey)

return nil
Expand Down Expand Up @@ -325,9 +336,6 @@ func (sb *SendBuffer) copyLogs(w io.Writer, maxSizeBytes int) (int, error) {
// deleteLogs deletes the logs up to the provided index
// it's up to the caller to lock the write mutex
func (sb *SendBuffer) deleteLogs(toIndex int) {
if len(sb.logs) == 0 {
return
}

sizeDeleted := 0
for i := 0; i < toIndex; i++ {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sendbuffer/sendbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) {

sb := New(
&testSender{lastReceived: &bytes.Buffer{}, t: t},
WithMaxStorageSizeBytes(1),
WithMaxSendSizeBytes(1),
WithMaxStorageSizeBytes(11),
WithMaxSendSizeBytes(5),
WithSendInterval(100*time.Millisecond),
)

// kind of an ugly test, but it was the simplest way to reproduce the issue
Expand All @@ -130,6 +131,7 @@ func TestBufferFullPurgeWaitsForSendAndPurge(t *testing.T) {

go func() {
for {
time.Sleep(50 * time.Millisecond)
sb.sendAndPurge()
}
}()
Expand Down

0 comments on commit 769c189

Please sign in to comment.