Skip to content

Commit

Permalink
Improve test and fix bug causing overwriting existing logs
Browse files Browse the repository at this point in the history
  • Loading branch information
banks committed Oct 7, 2024
1 parent f15d4bf commit f4390ec
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 25 deletions.
33 changes: 29 additions & 4 deletions inmem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package raft

import (
"errors"
"fmt"
"sync"
"sync/atomic"
)
Expand All @@ -13,7 +14,7 @@ import (
// It should NOT EVER be used for production. It is used only for
// unit tests. Use the MDBStore implementation instead.
type InmemStore struct {
storeFail uint32 // accessed atomically as a bool 0/1
storeFail atomic.Bool

// storeSem lets the test control exactly when s StoreLog(s) call takes
// effect.
Expand All @@ -25,8 +26,14 @@ type InmemStore struct {
logs map[uint64]*Log
kv map[string][]byte
kvInt map[string]uint64
monotonic bool
}

var (
_ LogStore = (*InmemStore)(nil)
_ MonotonicLogStore = (*InmemStore)(nil)
)

// NewInmemStore returns a new in-memory backend. Do not ever
// use for production. Only for testing.
func NewInmemStore() *InmemStore {
Expand Down Expand Up @@ -57,7 +64,7 @@ func (i *InmemStore) BlockStore() func() {
// FailNext signals that the next call to StoreLog(s) should return an error
// without modifying the log contents. Subsequent calls will succeed again.
func (i *InmemStore) FailNext() {
atomic.StoreUint32(&i.storeFail, 1)
i.storeFail.Store(true)
}

// FirstIndex implements the LogStore interface.
Expand Down Expand Up @@ -102,15 +109,19 @@ func (i *InmemStore) StoreLogs(logs []*Log) error {
}()

// Switch out fail if it is set so we only fail once
shouldFail := atomic.SwapUint32(&i.storeFail, 0)
if shouldFail == 1 {
shouldFail := i.storeFail.Swap(false)
if shouldFail {
return errors.New("IO error")
}

i.l.Lock()
defer i.l.Unlock()

for _, l := range logs {
if i.monotonic && l.Index != i.highIndex+1 {
return fmt.Errorf("non-monotonic write, log index: %d, last index %d, batch range: [%d, %d]",
l.Index, i.highIndex, logs[0].Index, logs[len(logs)-1].Index)
}
i.logs[l.Index] = l
if i.lowIndex == 0 {
i.lowIndex = l.Index
Expand Down Expand Up @@ -175,3 +186,17 @@ func (i *InmemStore) GetUint64(key []byte) (uint64, error) {
defer i.l.RUnlock()
return i.kvInt[string(key)], nil
}

// IsMonotonic implements MonotonicLogStore
func (i *InmemStore) IsMonotonic() bool {
return i.monotonic
}

// SetMonotonic allows the test to choose if the store should enforce monotonic
// writes. This is useful for testing the leader loop's handling of
// non-monotonic log stores.
func (i *InmemStore) SetMonotonic(v bool) {
i.l.Lock()
defer i.l.Unlock()
i.monotonic = v
}
54 changes: 44 additions & 10 deletions log_cache_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,36 @@ func (c *LogCacheAsync) EnableAsync(cc chan<- LogWriteCompletion) {
}

func (c *LogCacheAsync) runFlusher() {
var batch []*Log
batch := make([]*Log, 0, 128)
consecutiveErrors := 0
var lastErrTime time.Time

for {
syncReq := <-c.state.triggerChan

// If we've had more than one error in a row, back off a little to avoid a
// hot loop. Note that we specifically don't wait after the first error in
// case it was a one-off but will start backing off after 2 failures. This
// is not exponential because storage should be fast and we only are
// protecting against spinning on a software bug or similar not actually.
if consecutiveErrors > 1 {
waitTime := time.Duration(consecutiveErrors) * 10 * time.Millisecond
if waitTime > 100*time.Millisecond {
waitTime = 100 * time.Millisecond
}
// Only wait if we're being triggered faster than the backoff time anyway.
if time.Since(lastErrTime) < waitTime {
time.Sleep(waitTime - time.Since(lastErrTime))
}
}

// Load the state under lock
c.state.Lock()
persistedIdx := atomic.LoadUint64(&c.persistentIndex)
lastIdx := atomic.LoadUint64(&c.lastIndex)

// Make sure to reset batch!
batch = batch[:0]
for idx := persistedIdx + 1; idx <= lastIdx; idx++ {
batch = append(batch, c.state.cache[idx&c.sizeMask])
}
Expand Down Expand Up @@ -283,17 +303,31 @@ func (c *LogCacheAsync) runFlusher() {
// Might be a no-op if batch is empty
lwc := c.doFlush(batch, syncReq.startTime)

// Note: if the flush failed we might retry it on the next loop. This is
// safe assuming that the LogStore is atomic and not left in an invalid
// state (which Raft assumes in general already). It might loop and retry
// the write of the same logs next time which may fail again or may even
// succeed before the leaderloop notices the error and steps down. But
// either way it's fine because we don't advance the persistedIndex if it
// fails so we'll keep trying to write the same logs at least not leave a
// gap. Actually if we do error, even if there is no immediate sync trigger
// Note: if the flush failed we will retry it on the next loop. This is safe
// assuming that the LogStore is atomic and not left in an invalid state
// (which Raft assumes in general already). It might loop and retry the
// write of the same logs next time which may fail again or may even succeed
// before the leader loop notices the error and steps down. But either way
// it's fine because we don't advance the persistedIndex if it fails so
// we'll keep trying to write the same logs at least not leave a gap.
// Actually if we do error, even if there is no immediate sync trigger
// waiting, the leader will step down and disable async which will mean we
// attempt to flush again anyway. If that fails though (in the stop case
// above) we won't keep retrying and will just re-report the error.
// above) we won't keep retrying and will just re-report the error. If the
// error is coming back hard and we have new logs piling up we could end up
// in a hot loop until the leader steps down. That's probably not a big deal
// since that shouldn't take long but we'll be defensive and back off a
// little too just in case.
if lwc != nil {
if lwc.Error != nil {
consecutiveErrors++
lastErrTime = time.Now()
} else {
// Note we only reset this if we actually just flushed something not
// when lwc is nil.
consecutiveErrors = 0
}
}

// Need a lock to deliver the completion and update persistent index
c.state.Lock()
Expand Down
78 changes: 67 additions & 11 deletions log_cache_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
func TestLogCacheAsyncBasics(t *testing.T) {
underlying := NewInmemStore()

// Set it to behave monotonically to ensure we only ever write the necessary
// logs in correct sequence and not re-write the old ones.
underlying.SetMonotonic(true)

c, err := NewLogCacheAsync(32, underlying)
require.NoError(t, err)

Expand Down Expand Up @@ -53,15 +57,9 @@ func TestLogCacheAsyncBasics(t *testing.T) {
// Unblock the write on the underlying log
blockCancel()

// Wait for the completion event
select {
case lwc := <-compCh:
require.NoError(t, lwc.Error)
// Should get a single completion for all logs up to 10
require.Equal(t, 10, int(lwc.PersistentIndex))
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for IO completion")
}
// Wait for the completion event (note that we might get one completion for
// both writes or separate ones depending on timing).
assertOKCompletions(t, compCh, 8, 10)

// Now the underlying should have all the logs
assertLogContents(t, underlying, 1, 10)
Expand All @@ -84,10 +82,60 @@ func TestLogCacheAsyncBasics(t *testing.T) {
// Fail the underlying write
underlying.FailNext()
blockCancel()

// We should get the write error reported on the next completion.
lwc := expectCompletion(t, compCh)
require.ErrorContains(t, lwc.Error, "IO error")
// Persistent index should be unchanged since the flush failed
require.Equal(t, 10, int(lwc.PersistentIndex))

// But then eventually the write should succeed even if no more writes happen.
// We might see just one or both writes flush together.
assertOKCompletions(t, compCh, 12, 15)

assertLogContents(t, underlying, 1, 15)
}

// TODO:
// yt
func expectCompletion(t *testing.T, compCh <-chan LogWriteCompletion) LogWriteCompletion {
t.Helper()
select {
case lwc := <-compCh:
return lwc
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for IO completion")
}
return LogWriteCompletion{}
}

// assertOKCompletions helps to assert that one or more non-error completions
// are received. The max index written in each call to StoreLogsAsync should be
// passed since it's non deterministic which ones are persisted together. They
// must be in ascending order. We will assert that a completion arrives in a
// timely manner, and that it's for a valid prefix of the batches written. If
// it's not all of them, we keep waiting for the rest recursively.
func assertOKCompletions(t *testing.T, compCh <-chan LogWriteCompletion, maxBatchIndexes ...int) {
t.Helper()
lwc := expectCompletion(t, compCh)
require.NoError(t, lwc.Error)

foundBatchIdx := -1
for i, idx := range maxBatchIndexes {
if int(lwc.PersistentIndex) == idx {
foundBatchIdx = i
break
}
}
require.GreaterOrEqual(t, foundBatchIdx, 0,
"unexpected persistent index in completion: %d, wanted one of %v",
lwc.PersistentIndex,
maxBatchIndexes,
)

if foundBatchIdx < len(maxBatchIndexes)-1 {
// We didn't get all the batches acknowledged yet, keep waiting.
assertOKCompletions(t, compCh, maxBatchIndexes[foundBatchIdx+1:]...)
}
}

func assertLogContents(t *testing.T, s LogStore, min, max int) {
t.Helper()
Expand All @@ -97,6 +145,14 @@ func assertLogContents(t *testing.T, s LogStore, min, max int) {
// check it matches expectations.
var expected, got []string

// Ensure that the min and max are the actual range the log contains!
first, err := s.FirstIndex()
require.NoError(t, err)
require.Equal(t, min, int(first))
last, err := s.LastIndex()
require.NoError(t, err)
require.Equal(t, max, int(last))

var log Log
for idx := min; idx <= max; idx++ {
expected = append(expected, fmt.Sprintf("%d => op-%d", idx, idx))
Expand Down

0 comments on commit f4390ec

Please sign in to comment.