Skip to content

Commit

Permalink
缓存使用新版本
Browse files Browse the repository at this point in the history
  • Loading branch information
jerbe committed Sep 13, 2023
1 parent 1d738d6 commit a5bb0b2
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 131 deletions.
28 changes: 14 additions & 14 deletions database/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ func AddChatMessage(msg *ChatMessage) error {
// 将聊天数据推送到缓存定长队列中去
// @TODO 暂时这样push,但是这样处理不准确,需要做优化. 因为每个响应的是时长不一样,可能导致顺序不是正确的,甚至是断续的. 如 [1,3,2,4,7,6,5,8,9,11,10,19]
cacheKey := cacheKeyFormatLastMessageList(msg.RoomID, msg.SessionType)
err = jcache.Push(GlobCtx, cacheKey, msg)
err = GlobCache.Push(GlobCtx, cacheKey, msg)
if err != nil && err.Error() == "WRONGTYPE Operation against a key holding the wrong kind of value" {
jcache.Del(GlobCtx, cacheKey)
GlobCache.Del(GlobCtx, cacheKey)
}

if err == nil {
jcache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
GlobCache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
}

return nil
Expand Down Expand Up @@ -340,13 +340,13 @@ func AddChatMessageTx(msg *ChatMessage) error {

// 将聊天数据推送到缓存定长队列中去
cacheKey := cacheKeyFormatLastMessageList(msg.RoomID, msg.SessionType)
err = jcache.Push(GlobCtx, cacheKey, msg)
err = GlobCache.Push(GlobCtx, cacheKey, msg)
if err != nil {
jcache.Del(GlobCtx, cacheKey)
GlobCache.Del(GlobCtx, cacheKey)
log.Error().Err(err).Msgf("推送消息到缓存列表失败:\n %+v", err)
}
if err == nil {
jcache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
GlobCache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
}

return nil, nil
Expand Down Expand Up @@ -494,10 +494,10 @@ func GetLastChatMessageList(roomID string, sessionType int, opts ...*GetOptions)

// 如果有使用缓存,则从缓存中获取
if opt.UseCache() {
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
var messages []*ChatMessage
err := jcache.RangAndScan(GlobCtx, &messages, cacheKey, defaultLastLimit)
err := GlobCache.RangAndScan(GlobCtx, &messages, cacheKey, 0, defaultLastLimit-1)
if err == nil {
return messages, nil
}
Expand All @@ -521,7 +521,7 @@ func GetLastChatMessageList(roomID string, sessionType int, opts ...*GetOptions)
// @todo 设置缓存为找不到记录

cacheKey := cacheKeyFormatLastMessageList(roomID, sessionType)
if err := jcache.SetNX(GlobCtx, cacheKey, nil, jcache.DefaultEmptySetNXDuration); err != nil {
if err := GlobCache.SetNX(GlobCtx, cacheKey, nil, jcache.DefaultEmptySetNXDuration); err != nil {
log.Error().Err(err).Str("cache_key", cacheKey).Msg("缓存写入失败")
}

Expand All @@ -543,16 +543,16 @@ func GetLastChatMessageList(roomID string, sessionType int, opts ...*GetOptions)
pushData = append(pushData, msg)
}

err = jcache.Push(GlobCtx, cacheKey, pushData...)
err = GlobCache.Push(GlobCtx, cacheKey, pushData...)
if err != nil && err.Error() == "WRONGTYPE Operation against a key holding the wrong kind of value" {
jcache.Del(GlobCtx, cacheKey)
err = jcache.Push(GlobCtx, cacheKey, pushData...)
GlobCache.Del(GlobCtx, cacheKey)
err = GlobCache.Push(GlobCtx, cacheKey, pushData...)
if err != nil {
log.Warn().Err(err).Msg("缓存插入聊天消息失败")
}
}
if err == nil {
jcache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
GlobCache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
}

// 设置缓存
Expand Down
20 changes: 20 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"github.com/jerbe/jim/config"
"github.com/jerbe/jim/errors"

"github.com/jerbe/jcache"
"github.com/jerbe/jcache/driver"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -77,6 +80,8 @@ var (

GlobDB *Database

GlobCache *jcache.Client

initialized bool
)

Expand All @@ -87,6 +92,21 @@ func Init(cfg config.Config) (db *Database, err error) {

CacheKeyPrefix = cfg.Main.ServerName

// 初始化缓存
c := driver.RedisConfig{
Mode: cfg.Redis.Mode,
MasterName: cfg.Redis.MasterName,
Addrs: cfg.Redis.Addrs,
Database: cfg.Redis.Database,
Username: cfg.Redis.Username,
Password: cfg.Redis.Password,
}
cache := jcache.NewClient(
driver.NewMemory(),
driver.NewRedis(driver.RedisOptions().Config(&c)))

GlobCache = cache

// 初始化MYSQL
mysqlCfg := cfg.MySQL
mysqlConnStr := mysqlCfg.URI
Expand Down
64 changes: 32 additions & 32 deletions database/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func AddGroup(group *Group, opts ...*SetOptions) (int64, error) {
group.ID = insertId

if opt.UpdateCache() {
jcache.Set(GlobCtx, cacheKeyFormatGroupID(insertId), group, jcache.DefaultExpirationDuration)
GlobCache.Set(GlobCtx, cacheKeyFormatGroupID(insertId), group, jcache.DefaultExpirationDuration)
}

return insertId, nil
Expand All @@ -99,7 +99,7 @@ func GetGroup(id int64, opts ...*GetOptions) (*Group, error) {
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupID(id)
group := new(Group)
err := jcache.CheckAndScan(GlobCtx, group, cacheKey)
err := GlobCache.CheckAndScan(GlobCtx, group, cacheKey)
if err == nil {
return group, nil
}
Expand All @@ -118,7 +118,7 @@ func GetGroup(id int64, opts ...*GetOptions) (*Group, error) {
if errors.IsNoRecord(err) {
// 写入缓存,如果key不存在的话
var cacheKey = cacheKeyFormatGroupID(id)
if e := jcache.SetNX(GlobCtx, cacheKey, nil, jcache.DefaultEmptySetNXDuration); e != nil {
if e := GlobCache.SetNX(GlobCtx, cacheKey, nil, jcache.DefaultEmptySetNXDuration); e != nil {
log.Error().Err(e).Str("err_format", fmt.Sprintf("%+v", e)).Str("cache_key", cacheKey).Msg("缓存写入失败")
}
}
Expand All @@ -127,7 +127,7 @@ func GetGroup(id int64, opts ...*GetOptions) (*Group, error) {

if opt.UpdateCache() {
cacheKey := cacheKeyFormatGroupID(id)
jcache.Set(GlobCtx, cacheKey, group, jcache.RandomExpirationDuration())
GlobCache.Set(GlobCtx, cacheKey, group, jcache.RandomExpirationDuration())
}

return group, nil
Expand Down Expand Up @@ -193,7 +193,7 @@ func UpdateGroup(groupID int64, data *UpdateGroupData, opts ...*SetOptions) erro

defer func() {
if opt.UpdateCache() {
jcache.Del(GlobCtx, cacheKeyFormatGroupID(groupID))
GlobCache.Del(GlobCtx, cacheKeyFormatGroupID(groupID))
}
}()

Expand Down Expand Up @@ -288,7 +288,7 @@ func AddGroupMembers(groupID int64, data *AddGroupMembersData, opts ...*SetOptio
return 0, errors.Wrap(err)
}
if opt.UpdateCache() {
jcache.Del(GlobCtx, cacheKeyFormatGroupMembers(groupID))
GlobCache.Del(GlobCtx, cacheKeyFormatGroupMembers(groupID))
}

rowsAffected, err := rs.RowsAffected()
Expand All @@ -300,9 +300,9 @@ func GetGroupMemberCount(groupID int64, opts ...*GetOptions) (int64, error) {
opt := MergeGetOptions(opts)
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
cnt, err := jcache.HLen(GlobCtx, cacheKey)
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
cnt, err := GlobCache.HLen(GlobCtx, cacheKey)
if err == nil {
return cnt, nil
}
Expand All @@ -324,9 +324,9 @@ func GetGroupMemberIDs(groupID int64, opts ...*GetOptions) ([]int64, error) {
var userIDs []int64
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
err := jcache.HKeysAndScan(GlobCtx, cacheKey, userIDs)
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
err := GlobCache.HKeysAndScan(GlobCtx, userIDs, cacheKey)
if err == nil {
return userIDs, nil
}
Expand Down Expand Up @@ -359,9 +359,9 @@ func GetGroupMemberIDsString(groupID int64, opts ...*GetOptions) ([]string, erro
var userIDs []string
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
keys, err := jcache.HKeys(GlobCtx, cacheKey)
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
keys, err := GlobCache.HKeys(GlobCtx, cacheKey)
if err == nil {
return keys, nil
}
Expand Down Expand Up @@ -433,7 +433,7 @@ func RemoveGroupMembers(filter *RemoveGroupMembersFilter, opts ...*SetOptions) (
for i := 0; i < len(membersIDs); i++ {
keys = append(keys, strconv.FormatInt(membersIDs[i], 10))
}
jcache.HDel(GlobCtx, cacheKeyFormatGroupMembers(groupID), keys...)
GlobCache.HDel(GlobCtx, cacheKeyFormatGroupMembers(groupID), keys...)
}

rowsAffected, err := rs.RowsAffected()
Expand All @@ -446,9 +446,9 @@ func GetGroupAllMembers(groupID int64, opts ...*GetOptions) ([]*GroupMember, err
gms := make([]*GroupMember, 0)
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
err := jcache.HValsScan(GlobCtx, gms, cacheKeyFormatGroupMembers(groupID))
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
err := GlobCache.HValsAndScan(GlobCtx, gms, cacheKeyFormatGroupMembers(groupID))
if err == nil {
return gms, nil
}
Expand All @@ -475,8 +475,8 @@ func GetGroupAllMembers(groupID int64, opts ...*GetOptions) ([]*GroupMember, err
m[strconv.FormatInt(gms[i].UserID, 10)] = gms[i]
}

jcache.HSet(GlobCtx, cacheKey, m)
jcache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
GlobCache.HSet(GlobCtx, cacheKey, m)
GlobCache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
}

return gms, nil
Expand All @@ -488,14 +488,14 @@ func GetGroupMembers(groupID int64, memberIDs []int64, opts ...*GetOptions) ([]*
gms := make([]*GroupMember, 0)
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
var memberKeys = make([]string, len(memberIDs))
for i := 0; i < len(memberIDs); i++ {
memberKeys[i] = fmt.Sprintf("%d", memberIDs[i])
}

err := jcache.HMGetAndScan(GlobCtx, gms, cacheKeyFormatGroupMembers(groupID), memberKeys...)
err := GlobCache.HMGetAndScan(GlobCtx, gms, cacheKeyFormatGroupMembers(groupID), memberKeys...)
if err == nil && len(gms) == len(memberIDs) {
return gms, nil
}
Expand Down Expand Up @@ -526,8 +526,8 @@ func GetGroupMembers(groupID int64, memberIDs []int64, opts ...*GetOptions) ([]*
m[strconv.FormatInt(gms[i].UserID, 10)] = gms[i]
}

jcache.HSet(GlobCtx, cacheKey, m)
jcache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
GlobCache.HSet(GlobCtx, cacheKey, m)
GlobCache.Expire(GlobCtx, cacheKey, jcache.RandomExpirationDuration())
}

return gms, nil
Expand All @@ -539,9 +539,9 @@ func GetGroupMember(groupID, memberID int64, opts ...*GetOptions) (*GroupMember,
member := new(GroupMember)
if opt.UseCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
exists, _ := jcache.Exists(GlobCtx, cacheKey)
if exists {
err := jcache.HGetAndScan(GlobCtx, member, cacheKeyFormatGroupMembers(groupID), fmt.Sprintf("%d", memberID))
exists, _ := GlobCache.Exists(GlobCtx, cacheKey)
if exists > 0 {
err := GlobCache.HGetAndScan(GlobCtx, member, cacheKeyFormatGroupMembers(groupID), fmt.Sprintf("%d", memberID))
if err == nil {
return member, nil
}
Expand All @@ -558,8 +558,8 @@ func GetGroupMember(groupID, memberID int64, opts ...*GetOptions) (*GroupMember,
if opt.UpdateCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)

if ok, _ := jcache.Exists(GlobCtx, cacheKey); ok {
jcache.HSet(GlobCtx, cacheKey, fmt.Sprintf("%d", memberID), member)
if exists, _ := GlobCache.Exists(GlobCtx, cacheKey); exists > 0 {
GlobCache.HSet(GlobCtx, cacheKey, fmt.Sprintf("%d", memberID), member)
}
}

Expand Down Expand Up @@ -616,7 +616,7 @@ func UpdateGroupMember(groupID, userID int64, data *UpdateGroupMemberData, opts

if opt.UpdateCache() {
cacheKey := cacheKeyFormatGroupMembers(groupID)
jcache.Del(GlobCtx, cacheKey)
GlobCache.Del(GlobCtx, cacheKey)
}
return nil
}
Expand Down
Loading

0 comments on commit a5bb0b2

Please sign in to comment.