Skip to content

Commit

Permalink
Fix race conditions in backlog.reset()
Browse files Browse the repository at this point in the history
  • Loading branch information
PlasmaPower committed Dec 23, 2023
1 parent 63bcedc commit fd5edcb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
26 changes: 14 additions & 12 deletions broadcaster/backlog/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ type Backlog interface {
type backlog struct {
head atomic.Pointer[backlogSegment]
tail atomic.Pointer[backlogSegment]
lookupByIndex *containers.SyncMap[uint64, *backlogSegment]
lookupByIndex atomic.Pointer[containers.SyncMap[uint64, *backlogSegment]]
config ConfigFetcher
messageCount atomic.Uint64
}

// NewBacklog creates a backlog.
func NewBacklog(c ConfigFetcher) Backlog {
lookup := &containers.SyncMap[uint64, *backlogSegment]{}
return &backlog{
lookupByIndex: lookup,
config: c,
b := &backlog{
config: c,
}
b.lookupByIndex.Store(&containers.SyncMap[uint64, *backlogSegment]{})
return b
}

// Head return the head backlogSegment within the backlog.
Expand All @@ -63,6 +63,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
b.delete(uint64(bm.ConfirmedSequenceNumberMessage.SequenceNumber))
}

lookupByIndex := b.lookupByIndex.Load()
for _, msg := range bm.Messages {
segment := b.tail.Load()
if segment == nil {
Expand Down Expand Up @@ -95,7 +96,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
} else if err != nil {
return err
}
b.lookupByIndex.Store(uint64(msg.SequenceNumber), segment)
lookupByIndex.Store(uint64(msg.SequenceNumber), segment)
b.messageCount.Add(1)
}

Expand Down Expand Up @@ -160,7 +161,7 @@ func (b *backlog) delete(confirmed uint64) {
}

if confirmed > tail.End() {
log.Error("confirmed sequence number is past the end of stored messages", "confirmed sequence number", confirmed, "last stored sequence number", tail.End())
log.Warn("confirmed sequence number is past the end of stored messages", "confirmed sequence number", confirmed, "last stored sequence number", tail.End())
b.reset()
return
}
Expand Down Expand Up @@ -211,14 +212,15 @@ func (b *backlog) delete(confirmed uint64) {
// removeFromLookup removes all entries from the head segment's start index to
// the given confirmed index.
func (b *backlog) removeFromLookup(start, end uint64) {
lookupByIndex := b.lookupByIndex.Load()
for i := start; i <= end; i++ {
b.lookupByIndex.Delete(i)
lookupByIndex.Delete(i)
}
}

// Lookup attempts to find the backlogSegment storing the given message index.
func (b *backlog) Lookup(i uint64) (BacklogSegment, error) {
segment, ok := b.lookupByIndex.Load(i)
segment, ok := b.lookupByIndex.Load().Load(i)
if !ok {
return nil, fmt.Errorf("error finding backlog segment containing message with SequenceNumber %d", i)
}
Expand All @@ -233,9 +235,9 @@ func (s *backlog) Count() uint64 {

// reset removes all segments from the backlog.
func (b *backlog) reset() {
b.head = atomic.Pointer[backlogSegment]{}
b.tail = atomic.Pointer[backlogSegment]{}
b.lookupByIndex = &containers.SyncMap[uint64, *backlogSegment]{}
b.head.Store(nil)
b.tail.Store(nil)
b.lookupByIndex.Store(&containers.SyncMap[uint64, *backlogSegment]{})
b.messageCount.Store(0)
}

Expand Down
8 changes: 4 additions & 4 deletions broadcaster/backlog/backlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func validateBroadcastMessage(t *testing.T, bm *m.BroadcastMessage, expectedCoun

func createDummyBacklog(indexes []arbutil.MessageIndex) (*backlog, error) {
b := &backlog{
lookupByIndex: &containers.SyncMap[uint64, *backlogSegment]{},
config: func() *Config { return &DefaultTestConfig },
config: func() *Config { return &DefaultTestConfig },
}
b.lookupByIndex.Store(&containers.SyncMap[uint64, *backlogSegment]{})
bm := &m.BroadcastMessage{Messages: m.CreateDummyBroadcastMessages(indexes)}
err := b.Append(bm)
return b, err
Expand Down Expand Up @@ -161,9 +161,9 @@ func TestDeleteInvalidBacklog(t *testing.T) {
lookup := &containers.SyncMap[uint64, *backlogSegment]{}
lookup.Store(40, s)
b := &backlog{
lookupByIndex: lookup,
config: func() *Config { return &DefaultTestConfig },
config: func() *Config { return &DefaultTestConfig },
}
b.lookupByIndex.Store(lookup)
b.messageCount.Store(2)
b.head.Store(s)
b.tail.Store(s)
Expand Down

0 comments on commit fd5edcb

Please sign in to comment.