Skip to content

Commit

Permalink
feat<memory: priority> add method IncrByFloat, Delete (#28)
Browse files Browse the repository at this point in the history
* feat<memory: priority> (add method IncrByFloat, Delete, UpdatePriority) 新增rbtree_priority_cache实现Cache的两个方法IncrByFloat, Delete和一个修改优先级的方法UpdatePriority

* refector<priority> 抽取通用方法findOrCreateNode

* feat<priority> 删除updatePriority方法

* fix<priority> 修复单测

* feat<priority> 优化findOrCreate函数入参,延迟初始化节点零值
  • Loading branch information
XiaKuan authored Oct 31, 2023
1 parent 10ff4bc commit 8c6eedc
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 61 deletions.
28 changes: 14 additions & 14 deletions memory/priority/rbtree_cache_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,34 @@ type rbTreeCacheNode struct {
isDeleted bool //是否被删除
}

func newKVRBTreeCacheNode(key string, value any, expiration time.Duration) *rbTreeCacheNode {
node := &rbTreeCacheNode{
// newRBTreeCacheNode 创建红黑树节点,注意如果是容器类型节点要value传递初始化一个零值
func newRBTreeCacheNode(key string, value any) *rbTreeCacheNode {
return &rbTreeCacheNode{
key: key,
value: value,
}
}

func newKVRBTreeCacheNode(key string, value any, expiration time.Duration) *rbTreeCacheNode {
node := newRBTreeCacheNode(key, value)
node.setExpiration(expiration)
return node
}

func newListRBTreeCacheNode(key string) *rbTreeCacheNode {
return &rbTreeCacheNode{
key: key,
value: list.NewLinkedList[any](),
}
return newRBTreeCacheNode(key, list.NewLinkedList[any]())
}

func newSetRBTreeCacheNode(key string, initSize int) *rbTreeCacheNode {
return &rbTreeCacheNode{
key: key,
value: set.NewMapSet[any](initSize),
}
return newRBTreeCacheNode(key, set.NewMapSet[any](initSize))
}

func newIntRBTreeCacheNode(key string) *rbTreeCacheNode {
return &rbTreeCacheNode{
key: key,
value: int64(0),
}
return newRBTreeCacheNode(key, int64(0))
}

func newFloatRBTreeCacheNode(key string) *rbTreeCacheNode {
return newRBTreeCacheNode(key, float64(0))
}

// setExpiration 设置有效期
Expand Down
126 changes: 80 additions & 46 deletions memory/priority/rbtree_priority_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,8 @@ func (r *RBTreePriorityCache) Set(_ context.Context, key string, val any, expira
r.globalLock.Lock()
defer r.globalLock.Unlock()

node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
if r.isFull() {
r.deleteNodeByPriority()
}
node = newKVRBTreeCacheNode(key, val, expiration)
r.addNode(node)
return nil
}
node := r.findOrCreateNode(key, func() any { return val })

node.replace(val, expiration)
return nil
}
Expand Down Expand Up @@ -157,6 +150,8 @@ func (r *RBTreePriorityCache) Get(ctx context.Context, key string) (val ecache.V
return
}

r.globalLock.Lock()
defer r.globalLock.Unlock()
now := time.Now()
if !node.beforeDeadline(now) {
r.doubleCheckWhenExpire(node, now)
Expand All @@ -169,11 +164,8 @@ func (r *RBTreePriorityCache) Get(ctx context.Context, key string) (val ecache.V
return
}

// doubleCheckWhenExpire 缓存过期时的二次校验,防止被抢先删除了
// doubleCheckWhenExpire 缓存过期时的二次校验,防止被抢先删除了【调用该方法必须先获得锁】
func (r *RBTreePriorityCache) doubleCheckWhenExpire(node *rbTreeCacheNode, now time.Time) {
r.globalLock.Lock()
defer r.globalLock.Unlock()

checkNode, checkCacheErr := r.cacheData.Find(node.key)
if checkCacheErr != nil {
return //被抢先删除了
Expand Down Expand Up @@ -212,15 +204,9 @@ func (r *RBTreePriorityCache) LPush(ctx context.Context, key string, val ...any)
r.globalLock.Lock()
defer r.globalLock.Unlock()

node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
if r.isFull() {
r.deleteNodeByPriority()
}
node = newListRBTreeCacheNode(key)
r.addNode(node)
}

node := r.findOrCreateNode(key, func() any {
return list.NewLinkedList[any]()
})
nodeVal, ok := node.value.(*list.LinkedList[any])
if !ok {
return 0, errOnlyListCanLPUSH
Expand Down Expand Up @@ -268,15 +254,9 @@ func (r *RBTreePriorityCache) SAdd(ctx context.Context, key string, members ...a
r.globalLock.Lock()
defer r.globalLock.Unlock()

node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
if r.isFull() {
r.deleteNodeByPriority()
}
node = newSetRBTreeCacheNode(key, r.collectionCap)
r.addNode(node)
}

node := r.findOrCreateNode(key, func() any {
return set.NewMapSet[any](r.collectionCap)
})
nodeVal, ok := node.value.(*set.MapSet[any])
if !ok {
return 0, errOnlySetCanSAdd
Expand Down Expand Up @@ -327,14 +307,7 @@ func (r *RBTreePriorityCache) IncrBy(ctx context.Context, key string, value int6
r.globalLock.Lock()
defer r.globalLock.Unlock()

node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
if r.isFull() {
r.deleteNodeByPriority()
}
node = newIntRBTreeCacheNode(key)
r.addNode(node)
}
node := r.findOrCreateNode(key, func() any { return int64(0) })

nodeVal, ok := node.value.(int64)
if !ok {
Expand All @@ -347,18 +320,64 @@ func (r *RBTreePriorityCache) IncrBy(ctx context.Context, key string, value int6
return newVal, nil
}

func (r *RBTreePriorityCache) DecrBy(ctx context.Context, key string, value int64) (int64, error) {
func (r *RBTreePriorityCache) IncrByFloat(ctx context.Context, key string, value float64) (float64, error) {
r.globalLock.Lock()
defer r.globalLock.Unlock()

node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
if r.isFull() {
r.deleteNodeByPriority()
node := r.findOrCreateNode(key, func() any { return float64(0) })
nodeVal, ok := node.value.(float64)
if !ok {
//如果是int类型可以尝试转换
intNodeVal, ok := node.value.(int64)
if !ok {
return 0, errOnlyNumCanIncrBy
}
node = newIntRBTreeCacheNode(key)
r.addNode(node)
nodeVal = float64(intNodeVal)
}

newVal := nodeVal + value
node.value = newVal

return newVal, nil
}

func (r *RBTreePriorityCache) Delete(ctx context.Context, keys ...string) (int64, error) {
delCount := int64(0)
now := time.Now()
for _, key := range keys {
r.globalLock.RLock()
_, cacheErr := r.cacheData.Find(key)
r.globalLock.RUnlock()
if cacheErr != nil {
continue
}

r.globalLock.Lock()
node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
r.globalLock.Unlock()
continue
}

// 过期删除不添加计数
if !node.beforeDeadline(now) {
r.deleteNode(node)
r.globalLock.Unlock()
continue
}

r.deleteNode(node)
r.globalLock.Unlock()
delCount++
}
return delCount, nil
}

func (r *RBTreePriorityCache) DecrBy(ctx context.Context, key string, value int64) (int64, error) {
r.globalLock.Lock()
defer r.globalLock.Unlock()

node := r.findOrCreateNode(key, func() any { return int64(0) })

nodeVal, ok := node.value.(int64)
if !ok {
Expand Down Expand Up @@ -403,6 +422,19 @@ func (r *RBTreePriorityCache) isFull() bool {
return r.cacheNum >= r.cacheLimit
}

// findOrCreateNode 查找节点,不存在时使用默认值创建节点【调用该方法必须先获得锁】
func (r *RBTreePriorityCache) findOrCreateNode(key string, initFunc func() any) *rbTreeCacheNode {
node, cacheErr := r.cacheData.Find(key)
if cacheErr != nil {
if r.isFull() {
r.deleteNodeByPriority()
}
node = newRBTreeCacheNode(key, initFunc())
r.addNode(node)
}
return node
}

// deleteNodeByPriority 根据优先级淘汰缓存结点【调用该方法必须先获得锁】
func (r *RBTreePriorityCache) deleteNodeByPriority() {
for {
Expand Down Expand Up @@ -434,7 +466,9 @@ func (r *RBTreePriorityCache) autoClean() {
now := time.Now()
for _, value := range values {
if !value.beforeDeadline(now) {
r.globalLock.Lock()
r.doubleCheckWhenExpire(value, now)
r.globalLock.Unlock()
}
}
}
Expand Down
Loading

0 comments on commit 8c6eedc

Please sign in to comment.