Skip to content

Commit

Permalink
Use non-expiring cache's PeekOrAdd
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Dec 3, 2024
1 parent 7286a62 commit 8ba8b4d
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package tessera

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/expirable"
lru "github.com/hashicorp/golang-lru/v2"
)

// InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying
Expand All @@ -34,31 +36,42 @@ import (
// InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to
// make calls to a persistent storage.
func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture {
c, err := lru.New[string, IndexFuture](int(size))
if err != nil {
panic(fmt.Errorf("lru.New(%d): %v", size, err))
}
dedupe := &inMemoryDedupe{
delegate: delegate,
cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0),
cache: c,
}
return dedupe.add
}

type inMemoryDedupe struct {
delegate func(ctx context.Context, e *Entry) IndexFuture
// mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes
cache *expirable.LRU[string, IndexFuture]
cache *lru.Cache[string, IndexFuture]
}

// Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case,
// an IndexFuture will be returned that the client can use to get the sequence number of this entry.
func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture {
id := string(e.Identity())
// XXX: This can't lock like this, or it won't compose well with other dedupe "layers" which also block.
// d.mu.Lock()
// defer d.mu.Unlock()

f, ok := d.cache.Get(id)
if !ok {
f = d.delegate(ctx, e)
d.cache.Add(id, f)
// However many calls with the same entry come in and are deduped, we should only call delegate
// once for each unique entry:
f := sync.OnceValues(func() (uint64, error) {
return d.delegate(ctx, e)()
})

// if we've seen this entry before, discard our f and replace
// with the one we created last time, otherwise store f against id.
if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok {
f = prev
}

// Someone MUST resolve the future or the entry will never actually get assigned a sequence number.
// We may as well do it here for now to avoid anyone getting confused if they forget to do so in the
// personality.
_, _ = f()
return f
}

0 comments on commit 8ba8b4d

Please sign in to comment.