From 10ff4bc6542075ceb47db66fc3c373c8deceb531 Mon Sep 17 00:00:00 2001 From: Ming Deng Date: Fri, 13 Oct 2023 21:59:34 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=20SRem=20=E7=9A=84=E8=AF=AD?= =?UTF-8?q?=E4=B9=89=E5=88=B0=20Redis=20(#27)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/errs/errs.go | 7 +- memory/lru/cache.go | 16 ++--- memory/lru/cache_test.go | 26 +++---- memory/priority/priority.go | 4 +- memory/priority/rbtree_priority_cache.go | 55 +++++++-------- memory/priority/rbtree_priority_cache_test.go | 69 +++++++++---------- redis/cache.go | 5 +- redis/cache_e2e_test.go | 6 +- redis/cache_test.go | 6 +- types.go | 16 +++-- 10 files changed, 97 insertions(+), 113 deletions(-) diff --git a/internal/errs/errs.go b/internal/errs/errs.go index a15ec5c..4b44366 100644 --- a/internal/errs/errs.go +++ b/internal/errs/errs.go @@ -16,5 +16,8 @@ package errs import "errors" -var ErrKeyNotExist = errors.New("key 不存在") -var ErrDeleteKeyFailed = errors.New("删除key失败") +var ( + ErrKeyNotExist = errors.New("key 不存在") + ErrDeleteKeyFailed = errors.New("删除key失败") + ErrKeyNeverExpireNotSupported = errors.New("不支持key永不过期") +) diff --git a/memory/lru/cache.go b/memory/lru/cache.go index 15c92d4..7ae9e08 100644 --- a/memory/lru/cache.go +++ b/memory/lru/cache.go @@ -215,32 +215,28 @@ func (c *Cache) SAdd(ctx context.Context, key string, members ...any) (int64, er return int64(len(s.Keys())), nil } -func (c *Cache) SRem(ctx context.Context, key string, members ...any) (val ecache.Value) { +func (c *Cache) SRem(ctx context.Context, key string, members ...any) (int64, error) { c.lock.Lock() defer c.lock.Unlock() result, ok := c.client.Get(key) if !ok { - val.Err = errs.ErrKeyNotExist - return + return 0, errs.ErrKeyNotExist } s, ok := result.(set.Set[any]) if !ok { - val.Err = errors.New("当前key已存在不是set类型") - return + return 0, errors.New("当前key已存在不是set类型") } - var rems = make([]any, 0, cap(members)) + var rems int64 for _, member := range members { if s.Exist(member) { - rems = append(rems, member) s.Delete(member) + rems++ } } - - val.Val = rems - return + return rems, nil } func (c *Cache) IncrBy(ctx context.Context, key string, value int64) (int64, error) { diff --git a/memory/lru/cache_test.go b/memory/lru/cache_test.go index 3e43d36..3fa4ea9 100644 --- a/memory/lru/cache_test.go +++ b/memory/lru/cache_test.go @@ -17,7 +17,6 @@ package lru import ( "context" "errors" - "reflect" "testing" "time" @@ -614,9 +613,10 @@ func TestCache_SRem(t *testing.T) { before func(t *testing.T) after func(t *testing.T) - key string - val []any - wantVal []any + key string + val []any + + wantVal int64 wantErr error }{ { @@ -634,7 +634,7 @@ func TestCache_SRem(t *testing.T) { }, key: "test", val: []any{"hello world"}, - wantVal: []any{"hello world"}, + wantVal: 1, }, { name: "srem value ignore", @@ -649,7 +649,7 @@ func TestCache_SRem(t *testing.T) { }, key: "test", val: []any{"hello ecache"}, - wantVal: []any{}, + wantVal: 0, }, { name: "srem value nil", @@ -675,21 +675,15 @@ func TestCache_SRem(t *testing.T) { for _, tc := range testCase { t.Run(tc.name, func(t *testing.T) { + defer tc.after(t) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*5) defer cancelFunc() c := NewCache(lru) tc.before(t) - val := c.SRem(ctx, tc.key, tc.val...) - defer tc.after(t) - if val.Err != nil { - assert.Equal(t, tc.wantErr, val.Err) - return - } - - result, ok := val.Val.([]any) - assert.Equal(t, true, ok) - assert.Equal(t, true, reflect.DeepEqual(tc.wantVal, result)) + val, err := c.SRem(ctx, tc.key, tc.val...) + assert.Equal(t, tc.wantErr, err) + assert.Equal(t, tc.wantVal, val) }) } } diff --git a/memory/priority/priority.go b/memory/priority/priority.go index 45be453..6fa7d3f 100644 --- a/memory/priority/priority.go +++ b/memory/priority/priority.go @@ -16,6 +16,6 @@ package priority // Priority 如果传进来的元素没有实现该接口,则默认优先级为0 type Priority interface { - // GetPriority 获取元素的优先级 - GetPriority() int + // Priority 获取元素的优先级 + Priority() int } diff --git a/memory/priority/rbtree_priority_cache.go b/memory/priority/rbtree_priority_cache.go index 689a751..db78dfd 100644 --- a/memory/priority/rbtree_priority_cache.go +++ b/memory/priority/rbtree_priority_cache.go @@ -40,12 +40,6 @@ var ( errOnlyNumCanDecrBy = errors.New("ecache: 只有数字类型的数据,才能执行 DecrBy") ) -var ( - // todo 这两个变量没有想到更好办法处理,如果外部不调用option进行设置,最后还是需要内部定义一个默认值 - priorityQueueInitSize = 8 //优先级队列的初始大小 - mapSetInitSize = 8 //缓存结点中set.MapSet的初始大小 -) - type RBTreePriorityCache struct { globalLock *sync.RWMutex //内部全局读写锁,保护缓存数据和优先级数据 cacheData *tree.RBTree[string, *rbTreeCacheNode] //缓存数据 @@ -53,31 +47,41 @@ type RBTreePriorityCache struct { cacheLimit int //键值对数量限制,默认MaxInt32,约等于没有限制 priorityData *queue.PriorityQueue[*rbTreeCacheNode] //优先级数据 defaultPriority int //默认优先级 + cleanInterval time.Duration + // 集合类型的值的初始化容量 + collectionCap int } func NewRBTreePriorityCache(opts ...option.Option[RBTreePriorityCache]) (*RBTreePriorityCache, error) { cache, _ := newRBTreePriorityCache(opts...) - // todo 自动清理过期缓存的时间间隔,暂时先写死1s,后面再考虑怎么暴露出去 - go cache.autoClean(time.Second) - + go cache.autoClean() return cache, nil } func newRBTreePriorityCache(opts ...option.Option[RBTreePriorityCache]) (*RBTreePriorityCache, error) { rbTree, _ := tree.NewRBTree[string, *rbTreeCacheNode](comparatorRBTreeCacheNodeByKey()) - priorityQueue := queue.NewPriorityQueue[*rbTreeCacheNode](priorityQueueInitSize, comparatorRBTreeCacheNodeByPriority()) + + const ( + priorityQueueDefaultSize = 8 //优先级队列的初始大小 + collectionDefaultCap = 8 //缓存结点中set.MapSet的初始大小 + ) + priorityQueue := queue.NewPriorityQueue[*rbTreeCacheNode](priorityQueueDefaultSize, comparatorRBTreeCacheNodeByPriority()) cache := &RBTreePriorityCache{ globalLock: &sync.RWMutex{}, cacheData: rbTree, cacheNum: 0, cacheLimit: math.MaxInt32, priorityData: priorityQueue, + // 暂时设置为一秒间隔 + cleanInterval: time.Second, + collectionCap: collectionDefaultCap, } option.Apply(cache, opts...) return cache, nil } +// WithCacheLimit 设置所允许的最大键值对数量 func WithCacheLimit(cacheLimit int) option.Option[RBTreePriorityCache] { return func(opt *RBTreePriorityCache) { opt.cacheLimit = cacheLimit @@ -90,7 +94,7 @@ func WithDefaultPriority(priority int) option.Option[RBTreePriorityCache] { } } -func (r *RBTreePriorityCache) Set(ctx context.Context, key string, val any, expiration time.Duration) error { +func (r *RBTreePriorityCache) Set(_ context.Context, key string, val any, expiration time.Duration) error { r.globalLock.Lock() defer r.globalLock.Unlock() @@ -101,11 +105,9 @@ func (r *RBTreePriorityCache) Set(ctx context.Context, key string, val any, expi } node = newKVRBTreeCacheNode(key, val, expiration) r.addNode(node) - return nil } node.replace(val, expiration) - return nil } @@ -271,7 +273,7 @@ func (r *RBTreePriorityCache) SAdd(ctx context.Context, key string, members ...a if r.isFull() { r.deleteNodeByPriority() } - node = newSetRBTreeCacheNode(key, mapSetInitSize) + node = newSetRBTreeCacheNode(key, r.collectionCap) r.addNode(node) } @@ -292,27 +294,21 @@ func (r *RBTreePriorityCache) SAdd(ctx context.Context, key string, members ...a return successNum, nil } -func (r *RBTreePriorityCache) SRem(ctx context.Context, key string, members ...any) ecache.Value { +func (r *RBTreePriorityCache) SRem(_ context.Context, key string, members ...any) (int64, error) { r.globalLock.Lock() defer r.globalLock.Unlock() - var retVal ecache.Value - node, cacheErr := r.cacheData.Find(key) if cacheErr != nil { - retVal.Err = errs.ErrKeyNotExist - - return retVal + return 0, errs.ErrKeyNotExist } nodeVal, ok := node.value.(*set.MapSet[any]) if !ok { - retVal.Err = errOnlySetCanSRem - - return retVal + return 0, errOnlySetCanSRem } - successNum := 0 + var successNum int64 for _, item := range members { isExist := nodeVal.Exist(item) if isExist { @@ -320,13 +316,11 @@ func (r *RBTreePriorityCache) SRem(ctx context.Context, key string, members ...a successNum++ } } - retVal.Val = int64(successNum) if len(nodeVal.Keys()) == 0 { r.deleteNode(node) //如果集合为空,删除缓存结点 } - - return retVal + return successNum, nil } func (r *RBTreePriorityCache) IncrBy(ctx context.Context, key string, value int64) (int64, error) { @@ -384,7 +378,7 @@ func (r *RBTreePriorityCache) calculatePriority(node *rbTreeCacheNode) int { //如果实现了Priority接口,那么就用接口的方法获取优先级权重 val, ok := node.value.(Priority) if ok { - priority = val.GetPriority() + priority = val.Priority() } return priority @@ -429,10 +423,9 @@ func (r *RBTreePriorityCache) deleteNodeByPriority() { } // autoClean 自动清理过期缓存 -func (r *RBTreePriorityCache) autoClean(interval time.Duration) { - ticker := time.NewTicker(interval) +func (r *RBTreePriorityCache) autoClean() { + ticker := time.NewTicker(r.cleanInterval) defer ticker.Stop() - for range ticker.C { r.globalLock.RLock() _, values := r.cacheData.KeyValues() diff --git a/memory/priority/rbtree_priority_cache_test.go b/memory/priority/rbtree_priority_cache_test.go index 98395ee..0b79179 100644 --- a/memory/priority/rbtree_priority_cache_test.go +++ b/memory/priority/rbtree_priority_cache_test.go @@ -32,7 +32,7 @@ type testStructForPriority struct { priority int } -func (ts testStructForPriority) GetPriority() int { +func (ts testStructForPriority) Priority() int { return ts.priority } @@ -1031,9 +1031,9 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1046,9 +1046,9 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1059,10 +1059,10 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") valSet1.Add("value2") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1075,9 +1075,9 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1088,9 +1088,9 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1109,10 +1109,10 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") valSet1.Add("value2") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1125,9 +1125,9 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache(WithCacheLimit(1)) cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1138,9 +1138,9 @@ func TestRBTreePriorityCache_SAdd(t *testing.T) { cache, _ := NewRBTreePriorityCache(WithCacheLimit(1)) cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value2") - node1 := newSetRBTreeCacheNode("key2", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key2", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1214,9 +1214,9 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1227,9 +1227,9 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) cache.deleteNode(node1) @@ -1243,9 +1243,9 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1256,9 +1256,9 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1271,10 +1271,10 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") valSet1.Add("value2") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) return cache @@ -1285,10 +1285,10 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { cache, _ := NewRBTreePriorityCache() cache.globalLock.Lock() defer cache.globalLock.Unlock() - valSet1 := set.NewMapSet[any](mapSetInitSize) + valSet1 := set.NewMapSet[any](8) valSet1.Add("value1") valSet1.Add("value2") - node1 := newSetRBTreeCacheNode("key1", mapSetInitSize) + node1 := newSetRBTreeCacheNode("key1", 8) node1.value = valSet1 cache.addNode(node1) cache.deleteNode(node1) @@ -1321,13 +1321,10 @@ func TestRBTreePriorityCache_SRem(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { startCache := tc.startCache() - value := startCache.SRem(context.Background(), tc.key, tc.values...) - assert.Equal(t, tc.wantErr, value.Err) - if value.Err != nil { - return - } - assert.Equal(t, tc.wantRet, value.Val) - assert.Equal(t, true, compareTwoRBTreeClient(startCache, tc.wantCache())) + value, err := startCache.SRem(context.Background(), tc.key, tc.values...) + assert.Equal(t, tc.wantErr, err) + assert.Equal(t, tc.wantRet, value) + assert.True(t, compareTwoRBTreeClient(startCache, tc.wantCache())) }) } } diff --git a/redis/cache.go b/redis/cache.go index 1b0ce1d..7e2ef24 100644 --- a/redis/cache.go +++ b/redis/cache.go @@ -78,9 +78,8 @@ func (c *Cache) SAdd(ctx context.Context, key string, members ...any) (int64, er return c.client.SAdd(ctx, key, members...).Result() } -func (c *Cache) SRem(ctx context.Context, key string, members ...any) (result ecache.Value) { - result.Val, result.Err = c.client.SRem(ctx, key, members...).Result() - return +func (c *Cache) SRem(ctx context.Context, key string, members ...any) (int64, error) { + return c.client.SRem(ctx, key, members...).Result() } func (c *Cache) IncrBy(ctx context.Context, key string, value int64) (int64, error) { diff --git a/redis/cache_e2e_test.go b/redis/cache_e2e_test.go index 5fe923e..d734d04 100644 --- a/redis/cache_e2e_test.go +++ b/redis/cache_e2e_test.go @@ -577,9 +577,9 @@ func TestCache_e2e_SRem(t *testing.T) { defer cancelFunc() c := NewCache(rdb) tc.before(ctx, t) - val := c.SRem(ctx, tc.key, tc.val...) - assert.Equal(t, val.Val, tc.wantVal) - assert.Equal(t, val.Err, tc.wantErr) + val, err := c.SRem(ctx, tc.key, tc.val...) + assert.Equal(t, tc.wantErr, err) + assert.Equal(t, tc.wantVal, val) tc.after(ctx, t) }) } diff --git a/redis/cache_test.go b/redis/cache_test.go index 9f25629..e69fb9f 100644 --- a/redis/cache_test.go +++ b/redis/cache_test.go @@ -583,9 +583,9 @@ func TestCache_SRem(t *testing.T) { defer ctrl.Finish() c := NewCache(tc.mock(ctrl)) - result := c.SRem(context.Background(), tc.key, tc.val...) - assert.Equal(t, result.Val, tc.wantVal) - assert.Equal(t, result.Err, tc.wantErr) + result, err := c.SRem(context.Background(), tc.key, tc.val...) + assert.Equal(t, tc.wantErr, err) + assert.Equal(t, tc.wantVal, result) }) } } diff --git a/types.go b/types.go index 6c8ab94..7c57ad5 100644 --- a/types.go +++ b/types.go @@ -23,14 +23,12 @@ import ( "github.com/ecodeclub/ekit" ) -var ( - ErrKeyNeverExpireNotSupported = errors.New("不支持key永不过期") -) - type Cache interface { - // Set 设置一个键值对,并且设置过期时间,当过期时间为0时,表示永不过期 + // Set 设置一个键值对,并且设置过期时间. + // 当过期时间为0时,表示永不过期 Set(ctx context.Context, key string, val any, expiration time.Duration) error - // SetNX 设置一个键值对如果key不存在则写入反之失败,并且设置过期时间.当过期时间为0时,表示永不过期 + // SetNX 设置一个键值对如果key不存在则写入反之失败,并且设置过期时间. + // 当过期时间为0时,表示永不过期 SetNX(ctx context.Context, key string, val any, expiration time.Duration) (bool, error) // Get 返回一个 Value // 如果你需要检测 Err,可以使用 Value.Err @@ -49,12 +47,16 @@ type Cache interface { // SAdd 命令将一个或多个成员元素加入到集合中,已经存在于集合的成员元素将被忽略。 SAdd(ctx context.Context, key string, members ...any) (int64, error) // SRem 移除集合中的一个或多个成员元素,不存在的成员元素会被忽略。 - SRem(ctx context.Context, key string, members ...any) Value + // 返回最终删除了多少个原色 + SRem(ctx context.Context, key string, members ...any) (int64, error) // IncrBy 设置一个key并自增 1 或者指定的值 + // 返回增加后的值 IncrBy(ctx context.Context, key string, value int64) (int64, error) // DecrBy 将 key 中储存的数字值减一 + // 返回减少后的值 DecrBy(ctx context.Context, key string, value int64) (int64, error) // IncrByFloat 为 key 中所储存的值加上指定的浮点数增量值。 + // 返回增加后的值 IncrByFloat(ctx context.Context, key string, value float64) (float64, error) }