Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix send buffer (log shipping) out of range panic #2080

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/sendbuffer/sendbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type SendBuffer struct {
sender sender
sendTicker *time.Ticker
isSending bool

// logsJustPurged is used to prevent attempting to delete logs that were just purged
logsJustPurged bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this safe as bool and not atomic.Bool because we think it's adequately protected by the writeMutex? (it seems like it at first glance just wanted to double check)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's protected by the writeMutex... trying to think of a better way to do this, but I think this will suffice

}

type option func(*SendBuffer)
Expand Down Expand Up @@ -108,6 +111,10 @@ func (sb *SendBuffer) Write(in []byte) (int, error) {
if len(in)+sb.size > sb.maxStorageSizeBytes {
sb.deleteLogs(len(sb.logs))

// mark that we have just purged the logs so that any waiting deletes
// will not try to delete what was purged
sb.logsJustPurged = true

sb.logger.Log(
"msg", "reached capacity, dropping all data and starting over",
"method", "Write",
Expand Down Expand Up @@ -280,11 +287,21 @@ func (sb *SendBuffer) sendAndPurge() error {
sb.logger.Log("msg", "failed to send, will retry", "err", err)
return nil
}

// testing on a new enrollment in debug mode, log size hit 130K bytes
// before enrollment completed and was able to ship logs
// 2023-11-16
sb.writeMutex.Lock()
defer sb.writeMutex.Unlock()

// There is a possibility that the log buffer gets full while were in the middle of sending
// and gets deleted. However, we don't want to block writes while were waiting on a network call
// to send the logs. To live with this, we just verify that the logs didn't just get purged.
if sb.logsJustPurged {
sb.logsJustPurged = false
return nil
}

sb.deleteLogs(lastKey)

return nil
Expand Down Expand Up @@ -316,6 +333,8 @@ func (sb *SendBuffer) copyLogs(w io.Writer, maxSizeBytes int) (int, error) {
return lastLogIndex, nil
}

// deleteLogs deletes the logs up to the provided index
// it's up to the caller to lock the write mutex
func (sb *SendBuffer) deleteLogs(toIndex int) {
sizeDeleted := 0
for i := 0; i < toIndex; i++ {
Expand Down
32 changes: 32 additions & 0 deletions pkg/sendbuffer/sendbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,38 @@ func TestSendBuffer(t *testing.T) {
}
}

// TestSendAndPurgeHandlesLogBufferFullPurge resulted from a panic found in production where
// if the buffer was full while sendAndPurge was running, the buffer get wiped and then
// sendAndPurge would try to delete the portion of the buffer it just sent, causing a panic
func TestSendAndPurgeHandlesLogBufferFullPurge(t *testing.T) {
t.Parallel()

sb := New(
&testSender{lastReceived: &bytes.Buffer{}, t: t},
WithMaxStorageSizeBytes(11),
WithMaxSendSizeBytes(5),
WithSendInterval(100*time.Millisecond),
)

// kind of an ugly test, but it was the simplest way to reproduce the issue
// if the issue is present, we'll get a panic: runtime error: index out of range [x] with length x

go func() {
for {
sb.Write([]byte("1"))
}
}()

go func() {
for {
time.Sleep(50 * time.Millisecond)
sb.sendAndPurge()
}
}()

time.Sleep(1 * time.Second)
}

func testStringArray(size int) []string {
arr := make([]string, size)

Expand Down
Loading