Skip to content

Commit

Permalink
Hide channel inside future
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 9, 2024
1 parent 672d0a1 commit fafed1f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
68 changes: 43 additions & 25 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Queue struct {
flush FlushFunc

inFlightMu sync.Mutex
inFlight map[string][]chan IndexFunc
inFlight map[string]*entry
}

// IndexFunc is a function which returns an assigned log index, or an error.
Expand All @@ -49,7 +49,7 @@ type FlushFunc func(ctx context.Context, entries [][]byte) (index uint64, err er
func NewQueue(maxAge time.Duration, maxSize uint, f FlushFunc) *Queue {
q := &Queue{
flush: f,
inFlight: make(map[string][]chan IndexFunc, maxSize),
inFlight: make(map[string]*entry, maxSize),
}
q.buf = buffer.New(
buffer.WithSize(maxSize),
Expand All @@ -60,43 +60,43 @@ func NewQueue(maxAge time.Duration, maxSize uint, f FlushFunc) *Queue {
}

// squashDupes keeps track of all in-flight requests, enabling dupe squashing for entries currently in the queue.
// Returns true if the provided entry is a dupe and should NOT be added to the queue.
func (q *Queue) squashDupes(e entry) bool {
// Returns an entry struct, and a bool which is true if the provided entry is a dupe and should NOT be added to the queue.
func (q *Queue) squashDupes(e []byte) (*entry, bool) {
q.inFlightMu.Lock()
defer q.inFlightMu.Unlock()

k := string(e.data)
l, isKnown := q.inFlight[k]
q.inFlight[k] = append(l, e.c)
return isKnown
k := string(e)
entry, isKnown := q.inFlight[k]
if !isKnown {
entry = newEntry(e)
q.inFlight[k] = entry
}
return entry, isKnown
}

// Add places e into the queue, and returns a func which may be called to retrieve the assigned index.
func (q *Queue) Add(ctx context.Context, e []byte) IndexFunc {
entry := entry{
data: e,
c: make(chan IndexFunc, 1),
}
if q.squashDupes(entry) {
entry, isDupe := q.squashDupes(e)
if isDupe {
// This entry is already in the queue, so no need to add it again.
return entry.index()
return entry.index
}
if err := q.buf.Push(entry); err != nil {
entry.c <- func() (uint64, error) { return 0, err }
entry.assign(0, err)
close(entry.c)
}
return entry.index()
return entry.index
}

// doFlush handles the queue flush, and sending notifications of assigned log indices.
//
// To prevent blockin the queue longer than necessary, the notifications happen in a
// separate goroutine.
func (q *Queue) doFlush(items []interface{}) {
entries := make([]entry, len(items))
entries := make([]*entry, len(items))
entriesData := make([][]byte, len(items))
for i, t := range items {
entries[i] = t.(entry)
entries[i] = t.(*entry)
entriesData[i] = entries[i].data
}

Expand All @@ -108,25 +108,43 @@ func (q *Queue) doFlush(items []interface{}) {
defer q.inFlightMu.Unlock()

for i, e := range entries {
e.assign(s+uint64(i), err)
k := string(e.data)
for _, dd := range q.inFlight[k] {
dd <- func() (uint64, error) { return s + uint64(i), err }
close(dd)
}
delete(q.inFlight, k)
}
}()

}

// entry represents an in-flight entry in the queue.
//
// The index field acts as a Future for the entry's assigned index/error, and will
// hang until assign is called.
type entry struct {
data []byte
entryCtx context.Context
c chan IndexFunc
index IndexFunc
}

func (e *entry) index() IndexFunc {
return sync.OnceValues(func() (uint64, error) {
// newEntry creates a new entry for the provided data.
func newEntry(data []byte) *entry {
e := &entry{
data: data,
c: make(chan IndexFunc, 1),
}
e.index = sync.OnceValues(func() (uint64, error) {
return (<-e.c)()
})
return e
}

// assign sets the assigned log index (or an error) to the entry.
//
// This func must only be called once, and will cause any current or future callers of index()
// to be given the values provided here.
func (e *entry) assign(idx uint64, err error) {
e.c <- func() (uint64, error) {
return idx, err
}
close(e.c)
}
1 change: 0 additions & 1 deletion storage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,4 @@ func TestDedup(t *testing.T) {
t.Errorf("[%d] got seq %d, want %d", i, N, firstN)
}
}

}

0 comments on commit fafed1f

Please sign in to comment.