diff --git a/dedupe.go b/dedupe.go index c81ff069..8e6ecd42 100644 --- a/dedupe.go +++ b/dedupe.go @@ -16,8 +16,10 @@ package tessera import ( "context" + "fmt" + "sync" - "github.com/hashicorp/golang-lru/v2/expirable" + lru "github.com/hashicorp/golang-lru/v2" ) // InMemoryDedupe wraps an Add function to prevent duplicate entries being written to the underlying @@ -34,31 +36,42 @@ import ( // InMemoryDedupe. This allows recent duplicates to be deduplicated in memory, reducing the need to // make calls to a persistent storage. func InMemoryDedupe(delegate func(ctx context.Context, e *Entry) IndexFuture, size uint) func(context.Context, *Entry) IndexFuture { + c, err := lru.New[string, IndexFuture](int(size)) + if err != nil { + panic(fmt.Errorf("lru.New(%d): %v", size, err)) + } dedupe := &inMemoryDedupe{ delegate: delegate, - cache: expirable.NewLRU[string, IndexFuture](int(size), nil, 0), + cache: c, } return dedupe.add } type inMemoryDedupe struct { delegate func(ctx context.Context, e *Entry) IndexFuture - // mu sync.Mutex // cache is thread safe, but this mutex allows us to do conditional writes - cache *expirable.LRU[string, IndexFuture] + cache *lru.Cache[string, IndexFuture] } // Add adds the entry to the underlying delegate only if e hasn't been recently seen. In either case, // an IndexFuture will be returned that the client can use to get the sequence number of this entry. func (d *inMemoryDedupe) add(ctx context.Context, e *Entry) IndexFuture { id := string(e.Identity()) - // XXX: This can't lock like this, or it won't compose well with other dedupe "layers" which also block. - // d.mu.Lock() - // defer d.mu.Unlock() - f, ok := d.cache.Get(id) - if !ok { - f = d.delegate(ctx, e) - d.cache.Add(id, f) + // However many calls with the same entry come in and are deduped, we should only call delegate + // once for each unique entry: + f := sync.OnceValues(func() (uint64, error) { + return d.delegate(ctx, e)() + }) + + // if we've seen this entry before, discard our f and replace + // with the one we created last time, otherwise store f against id. + if prev, ok, _ := d.cache.PeekOrAdd(id, f); ok { + f = prev } + + // Someone MUST resolve the future or the entry will never actually get assigned a sequence number. + // We may as well do it here for now to avoid anyone getting confused if they forget to do so in the + // personality. + _, _ = f() return f }