From 1961d9894a68dded99840c4dd398578be503f2e4 Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Wed, 18 Sep 2024 13:57:04 +0300 Subject: [PATCH] chore: cachettl configuration option for not refreshing ttl (#643) --- cachettl/cachettl.go | 38 +++++++++++++++++----------- cachettl/cachettl_config.go | 22 ++++++++++++++++ cachettl/cachettl_test.go | 50 +++++++++++++++++++++++++++++-------- resourcettl/cache.go | 5 ++-- resourcettl/cache_test.go | 33 ++++++++++++++++++++++++ 5 files changed, 121 insertions(+), 27 deletions(-) create mode 100644 cachettl/cachettl_config.go diff --git a/cachettl/cachettl.go b/cachettl/cachettl.go index 4da67293..ae3ae057 100644 --- a/cachettl/cachettl.go +++ b/cachettl/cachettl.go @@ -10,10 +10,11 @@ import ( // the tail node (end) is the node with the highest expiration time // Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory. type Cache[K comparable, V any] struct { - root *node[K, V] - mu sync.Mutex - m map[K]*node[K, V] - now func() time.Time + root *node[K, V] + mu sync.Mutex + m map[K]*node[K, V] + + config cacheConfig onEvicted func(key K, value V) } @@ -32,16 +33,23 @@ func (n *node[K, V]) remove() { } // New returns a new Cache. -func New[K comparable, V any]() *Cache[K, V] { - return &Cache[K, V]{ - now: time.Now, +func New[K comparable, V any](opts ...Opt) *Cache[K, V] { + c := &Cache[K, V]{ + config: cacheConfig{ + now: time.Now, + refreshTTL: true, + }, root: &node[K, V]{}, m: make(map[K]*node[K, V]), } + for _, opt := range opts { + opt(&c.config) + } + return c } // Get returns the value associated with the key or nil otherwise. -// Additionally, Get() will refresh the TTL and cleanup expired nodes. +// Additionally, Get() will refresh the TTL by default and cleanup expired nodes. func (c *Cache[K, V]) Get(key K) (zero V) { c.mu.Lock() defer c.mu.Unlock() @@ -49,7 +57,7 @@ func (c *Cache[K, V]) Get(key K) (zero V) { defer func() { // remove expired nodes cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail for cn != nil && cn != c.root { - if c.now().After(cn.expiration) { + if c.config.now().After(cn.expiration) { cn.remove() // removes a node from the linked list (leaves the map untouched) delete(c.m, cn.key) // remove node from map too if c.onEvicted != nil { // call the OnEvicted callback if it's set @@ -62,10 +70,12 @@ func (c *Cache[K, V]) Get(key K) (zero V) { } }() - if n, ok := c.m[key]; ok && n.expiration.After(c.now()) { - n.remove() - n.expiration = c.now().Add(n.ttl) // refresh TTL - c.add(n) + if n, ok := c.m[key]; ok && n.expiration.After(c.config.now()) { + if c.config.refreshTTL { + n.remove() + n.expiration = c.config.now().Add(n.ttl) // refresh TTL + c.add(n) + } return n.value } return zero @@ -77,7 +87,7 @@ func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) { c.mu.Lock() defer c.mu.Unlock() - now := c.now() + now := c.config.now() n, ok := c.m[key] if !ok { diff --git a/cachettl/cachettl_config.go b/cachettl/cachettl_config.go new file mode 100644 index 00000000..b5377208 --- /dev/null +++ b/cachettl/cachettl_config.go @@ -0,0 +1,22 @@ +package cachettl + +import "time" + +type Opt func(*cacheConfig) + +// WithNoRefreshTTL disables the refresh of the TTL when the cache is accessed. +var WithNoRefreshTTL = func(c *cacheConfig) { + c.refreshTTL = false +} + +// WithNow sets the function to use to get the current time. +var WithNow = func(now func() time.Time) Opt { + return func(c *cacheConfig) { + c.now = now + } +} + +type cacheConfig struct { + now func() time.Time + refreshTTL bool +} diff --git a/cachettl/cachettl_test.go b/cachettl/cachettl_test.go index 53967978..9b60a1c7 100644 --- a/cachettl/cachettl_test.go +++ b/cachettl/cachettl_test.go @@ -10,8 +10,7 @@ import ( func TestCacheTTL(t *testing.T) { now := time.Now() - c := New[string, string]() - c.now = func() time.Time { return now } + c := New[string, string](WithNow(func() time.Time { return now })) // nothing done so far, we expect the cache to be empty require.Nil(t, c.slice()) @@ -37,30 +36,30 @@ func TestCacheTTL(t *testing.T) { require.Equal(t, []string{"222", "111", "333"}, c.slice()) // move time forward to expire "222" - c.now = func() time.Time { return now.Add(1) } // "222" should still be there - require.Empty(t, c.Get("whatever")) // trigger the cleanup + c.config.now = func() time.Time { return now.Add(1) } // "222" should still be there + require.Empty(t, c.Get("whatever")) // trigger the cleanup require.Equal(t, []string{"222", "111", "333"}, c.slice()) - c.now = func() time.Time { return now.Add(2) } // "222" should still be there - require.Empty(t, c.Get("whatever")) // trigger the cleanup + c.config.now = func() time.Time { return now.Add(2) } // "222" should still be there + require.Empty(t, c.Get("whatever")) // trigger the cleanup require.Equal(t, []string{"222", "111", "333"}, c.slice()) - c.now = func() time.Time { return now.Add(3) } // "222" should be expired! - require.Empty(t, c.Get("whatever")) // trigger the cleanup + c.config.now = func() time.Time { return now.Add(3) } // "222" should be expired! + require.Empty(t, c.Get("whatever")) // trigger the cleanup require.Equal(t, []string{"111", "333"}, c.slice()) // let's move a lot forward to expire everything - c.now = func() time.Time { return now.Add(6) } + c.config.now = func() time.Time { return now.Add(6) } require.Empty(t, c.Get("whatever")) // trigger the cleanup require.Nil(t, c.slice()) require.Len(t, c.m, 0) // now let's set a key, then move forward and get it directly without triggering with a different key - c.now = func() time.Time { return now } + c.config.now = func() time.Time { return now } c.Put("last", "999", 1) require.Equal(t, "999", c.Get("last")) require.Equal(t, []string{"999"}, c.slice()) - c.now = func() time.Time { return now.Add(2) } + c.config.now = func() time.Time { return now.Add(2) } require.Empty(t, c.Get("last")) // trigger the cleanup require.Nil(t, c.slice()) require.Len(t, c.m, 0) @@ -86,3 +85,32 @@ func TestRefreshTTL(t *testing.T) { require.Equal(t, "333", c.Get("three")) require.Equal(t, []string{"111", "222", "333"}, c.slice()) } + +func TestNoRefreshTTL(t *testing.T) { + now := time.Now() + c := New[string, string](WithNoRefreshTTL, WithNow(func() time.Time { return now })) + + // nothing done so far, we expect the cache to be empty + require.Nil(t, c.slice()) + + c.Put("one", "111", time.Second) + c.Put("two", "222", time.Second) + c.Put("three", "333", time.Second) + require.Equal(t, []string{"111", "222", "333"}, c.slice()) + + now = now.Add(500 * time.Millisecond) + require.Equal(t, "111", c.Get("one")) + require.Equal(t, []string{"111", "222", "333"}, c.slice()) + + require.Equal(t, "222", c.Get("two")) + require.Equal(t, []string{"111", "222", "333"}, c.slice()) + + require.Equal(t, "333", c.Get("three")) + require.Equal(t, []string{"111", "222", "333"}, c.slice()) + + now = now.Add(500 * time.Millisecond) + + require.Empty(t, c.Get("one")) + require.Empty(t, c.Get("two")) + require.Empty(t, c.Get("three")) +} diff --git a/resourcettl/cache.go b/resourcettl/cache.go index 737447bf..47658d86 100644 --- a/resourcettl/cache.go +++ b/resourcettl/cache.go @@ -14,6 +14,7 @@ import ( // NewCache creates a new resource cache. // // - ttl - is the time after which the resource is considered expired and cleaned up. +// - opts - options for the cache. // // A resource's ttl is extended every time it is checked out. // @@ -27,14 +28,14 @@ import ( // - Close() error // - Stop() // - Stop() error -func NewCache[K comparable, R any](ttl time.Duration) *Cache[K, R] { +func NewCache[K comparable, R any](ttl time.Duration, opts ...cachettl.Opt) *Cache[K, R] { c := &Cache[K, R]{ keyMu: kitsync.NewPartitionLocker(), resources: make(map[string]R), checkouts: make(map[string]int), expiries: make(map[string]struct{}), ttl: ttl, - ttlcache: cachettl.New[K, string](), + ttlcache: cachettl.New[K, string](opts...), } c.ttlcache.OnEvicted(c.onEvicted) return c diff --git a/resourcettl/cache_test.go b/resourcettl/cache_test.go index e83ba3d6..e3b975c2 100644 --- a/resourcettl/cache_test.go +++ b/resourcettl/cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-go-kit/cachettl" "github.com/rudderlabs/rudder-go-kit/resourcettl" ) @@ -138,6 +139,38 @@ func TestCache(t *testing.T) { require.EqualValues(t, 1, r1.cleanups.Load(), "it should cleanup the expired resource") checkin3() }) + + t.Run("no ttl refresh", func(t *testing.T) { + now := time.Now() + ttl = 1 * time.Second + producer := &MockProducer{} + c := resourcettl.NewCache[string, *cleanuper](ttl, cachettl.WithNoRefreshTTL, cachettl.WithNow(func() time.Time { return now })) + + r1, checkin1, err1 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err1, "it should be able to create a new resource") + require.NotNil(t, r1, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it should create a new resource") + + now = now.Add(ttl / 2) // wait for some time less than ttl + checkin1() + + r2, checkin2, err2 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err2, "it should be able to checkout the same resource") + require.NotNil(t, r2, "it should return a resource") + require.EqualValues(t, 1, producer.instances.Load(), "it shouldn't create a new resource") + require.Equal(t, r1.id, r2.id, "it should return the same resource") + + now = now.Add(ttl / 2) // wait for some time less than ttl + checkin2() + + r3, checkin3, err3 := c.Checkout(key, producer.NewCleanuper) + require.NoError(t, err3, "it should be able to create a new resource") + require.NotNil(t, r3, "it should return a resource") + require.EqualValues(t, 2, producer.instances.Load(), "it should create a new resource since the previous one expired") + require.NotEqual(t, r1.id, r3.id, "it should return a different resource") + time.Sleep(time.Millisecond) + checkin3() + }) } type MockProducer struct {