Skip to content

Commit

Permalink
调整 SRem 的语义到 Redis (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash authored Oct 13, 2023
1 parent dda9743 commit 10ff4bc
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 113 deletions.
7 changes: 5 additions & 2 deletions internal/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ package errs

import "errors"

var ErrKeyNotExist = errors.New("key 不存在")
var ErrDeleteKeyFailed = errors.New("删除key失败")
var (
ErrKeyNotExist = errors.New("key 不存在")
ErrDeleteKeyFailed = errors.New("删除key失败")
ErrKeyNeverExpireNotSupported = errors.New("不支持key永不过期")
)
16 changes: 6 additions & 10 deletions memory/lru/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,32 +215,28 @@ func (c *Cache) SAdd(ctx context.Context, key string, members ...any) (int64, er
return int64(len(s.Keys())), nil
}

func (c *Cache) SRem(ctx context.Context, key string, members ...any) (val ecache.Value) {
func (c *Cache) SRem(ctx context.Context, key string, members ...any) (int64, error) {
c.lock.Lock()
defer c.lock.Unlock()

result, ok := c.client.Get(key)
if !ok {
val.Err = errs.ErrKeyNotExist
return
return 0, errs.ErrKeyNotExist
}

s, ok := result.(set.Set[any])
if !ok {
val.Err = errors.New("当前key已存在不是set类型")
return
return 0, errors.New("当前key已存在不是set类型")
}

var rems = make([]any, 0, cap(members))
var rems int64
for _, member := range members {
if s.Exist(member) {
rems = append(rems, member)
s.Delete(member)
rems++
}
}

val.Val = rems
return
return rems, nil
}

func (c *Cache) IncrBy(ctx context.Context, key string, value int64) (int64, error) {
Expand Down
26 changes: 10 additions & 16 deletions memory/lru/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package lru
import (
"context"
"errors"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -614,9 +613,10 @@ func TestCache_SRem(t *testing.T) {
before func(t *testing.T)
after func(t *testing.T)

key string
val []any
wantVal []any
key string
val []any

wantVal int64
wantErr error
}{
{
Expand All @@ -634,7 +634,7 @@ func TestCache_SRem(t *testing.T) {
},
key: "test",
val: []any{"hello world"},
wantVal: []any{"hello world"},
wantVal: 1,
},
{
name: "srem value ignore",
Expand All @@ -649,7 +649,7 @@ func TestCache_SRem(t *testing.T) {
},
key: "test",
val: []any{"hello ecache"},
wantVal: []any{},
wantVal: 0,
},
{
name: "srem value nil",
Expand All @@ -675,21 +675,15 @@ func TestCache_SRem(t *testing.T) {

for _, tc := range testCase {
t.Run(tc.name, func(t *testing.T) {
defer tc.after(t)
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*5)
defer cancelFunc()
c := NewCache(lru)

tc.before(t)
val := c.SRem(ctx, tc.key, tc.val...)
defer tc.after(t)
if val.Err != nil {
assert.Equal(t, tc.wantErr, val.Err)
return
}

result, ok := val.Val.([]any)
assert.Equal(t, true, ok)
assert.Equal(t, true, reflect.DeepEqual(tc.wantVal, result))
val, err := c.SRem(ctx, tc.key, tc.val...)
assert.Equal(t, tc.wantErr, err)
assert.Equal(t, tc.wantVal, val)
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions memory/priority/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ package priority

// Priority 如果传进来的元素没有实现该接口,则默认优先级为0
type Priority interface {
// GetPriority 获取元素的优先级
GetPriority() int
// Priority 获取元素的优先级
Priority() int
}
55 changes: 24 additions & 31 deletions memory/priority/rbtree_priority_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,44 +40,48 @@ var (
errOnlyNumCanDecrBy = errors.New("ecache: 只有数字类型的数据,才能执行 DecrBy")
)

var (
// todo 这两个变量没有想到更好办法处理,如果外部不调用option进行设置,最后还是需要内部定义一个默认值
priorityQueueInitSize = 8 //优先级队列的初始大小
mapSetInitSize = 8 //缓存结点中set.MapSet的初始大小
)

type RBTreePriorityCache struct {
globalLock *sync.RWMutex //内部全局读写锁,保护缓存数据和优先级数据
cacheData *tree.RBTree[string, *rbTreeCacheNode] //缓存数据
cacheNum int //缓存中总键值对数量
cacheLimit int //键值对数量限制,默认MaxInt32,约等于没有限制
priorityData *queue.PriorityQueue[*rbTreeCacheNode] //优先级数据
defaultPriority int //默认优先级
cleanInterval time.Duration
// 集合类型的值的初始化容量
collectionCap int
}

func NewRBTreePriorityCache(opts ...option.Option[RBTreePriorityCache]) (*RBTreePriorityCache, error) {
cache, _ := newRBTreePriorityCache(opts...)
// todo 自动清理过期缓存的时间间隔,暂时先写死1s,后面再考虑怎么暴露出去
go cache.autoClean(time.Second)

go cache.autoClean()
return cache, nil
}

func newRBTreePriorityCache(opts ...option.Option[RBTreePriorityCache]) (*RBTreePriorityCache, error) {
rbTree, _ := tree.NewRBTree[string, *rbTreeCacheNode](comparatorRBTreeCacheNodeByKey())
priorityQueue := queue.NewPriorityQueue[*rbTreeCacheNode](priorityQueueInitSize, comparatorRBTreeCacheNodeByPriority())

const (
priorityQueueDefaultSize = 8 //优先级队列的初始大小
collectionDefaultCap = 8 //缓存结点中set.MapSet的初始大小
)
priorityQueue := queue.NewPriorityQueue[*rbTreeCacheNode](priorityQueueDefaultSize, comparatorRBTreeCacheNodeByPriority())
cache := &RBTreePriorityCache{
globalLock: &sync.RWMutex{},
cacheData: rbTree,
cacheNum: 0,
cacheLimit: math.MaxInt32,
priorityData: priorityQueue,
// 暂时设置为一秒间隔
cleanInterval: time.Second,
collectionCap: collectionDefaultCap,
}
option.Apply(cache, opts...)

return cache, nil
}

// WithCacheLimit 设置所允许的最大键值对数量
func WithCacheLimit(cacheLimit int) option.Option[RBTreePriorityCache] {
return func(opt *RBTreePriorityCache) {
opt.cacheLimit = cacheLimit
Expand All @@ -90,7 +94,7 @@ func WithDefaultPriority(priority int) option.Option[RBTreePriorityCache] {
}
}

func (r *RBTreePriorityCache) Set(ctx context.Context, key string, val any, expiration time.Duration) error {
func (r *RBTreePriorityCache) Set(_ context.Context, key string, val any, expiration time.Duration) error {
r.globalLock.Lock()
defer r.globalLock.Unlock()

Expand All @@ -101,11 +105,9 @@ func (r *RBTreePriorityCache) Set(ctx context.Context, key string, val any, expi
}
node = newKVRBTreeCacheNode(key, val, expiration)
r.addNode(node)

return nil
}
node.replace(val, expiration)

return nil
}

Expand Down Expand Up @@ -271,7 +273,7 @@ func (r *RBTreePriorityCache) SAdd(ctx context.Context, key string, members ...a
if r.isFull() {
r.deleteNodeByPriority()
}
node = newSetRBTreeCacheNode(key, mapSetInitSize)
node = newSetRBTreeCacheNode(key, r.collectionCap)
r.addNode(node)
}

Expand All @@ -292,41 +294,33 @@ func (r *RBTreePriorityCache) SAdd(ctx context.Context, key string, members ...a
return successNum, nil
}

func (r *RBTreePriorityCache) SRem(ctx context.Context, key string, members ...any) ecache.Value {
func (r *RBTreePriorityCache) SRem(_ context.Context, key string, members ...any) (int64, error) {
r.globalLock.Lock()
defer r.globalLock.Unlock()

var retVal ecache.Value

node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
retVal.Err = errs.ErrKeyNotExist

return retVal
return 0, errs.ErrKeyNotExist
}

nodeVal, ok := node.value.(*set.MapSet[any])
if !ok {
retVal.Err = errOnlySetCanSRem

return retVal
return 0, errOnlySetCanSRem
}

successNum := 0
var successNum int64
for _, item := range members {
isExist := nodeVal.Exist(item)
if isExist {
nodeVal.Delete(item)
successNum++
}
}
retVal.Val = int64(successNum)

if len(nodeVal.Keys()) == 0 {
r.deleteNode(node) //如果集合为空,删除缓存结点
}

return retVal
return successNum, nil
}

func (r *RBTreePriorityCache) IncrBy(ctx context.Context, key string, value int64) (int64, error) {
Expand Down Expand Up @@ -384,7 +378,7 @@ func (r *RBTreePriorityCache) calculatePriority(node *rbTreeCacheNode) int {
//如果实现了Priority接口,那么就用接口的方法获取优先级权重
val, ok := node.value.(Priority)
if ok {
priority = val.GetPriority()
priority = val.Priority()
}

return priority
Expand Down Expand Up @@ -429,10 +423,9 @@ func (r *RBTreePriorityCache) deleteNodeByPriority() {
}

// autoClean 自动清理过期缓存
func (r *RBTreePriorityCache) autoClean(interval time.Duration) {
ticker := time.NewTicker(interval)
func (r *RBTreePriorityCache) autoClean() {
ticker := time.NewTicker(r.cleanInterval)
defer ticker.Stop()

for range ticker.C {
r.globalLock.RLock()
_, values := r.cacheData.KeyValues()
Expand Down
Loading

0 comments on commit 10ff4bc

Please sign in to comment.