Skip to content

Commit

Permalink
fix: solve multiplatform sync concurrency error. (#578)
Browse files Browse the repository at this point in the history
* fix: solve multiplatform sync concurrency error.

* feat: update errRecordNotFound uniform error.

* fix: return correct arg.

* fix: update uncorrect condition.

* fix: solve uncorrect mutex use.

* remove not used imports.

* update mutex lock implement.

* remove uncorrect mutex use.
  • Loading branch information
mo3et authored Jul 9, 2024
1 parent c9b0c7b commit 28de626
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 20 deletions.
13 changes: 11 additions & 2 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"errors"

"github.com/openimsdk/openim-sdk-core/v3/internal/business"
"github.com/openimsdk/openim-sdk-core/v3/internal/cache"
"github.com/openimsdk/openim-sdk-core/v3/internal/file"
Expand All @@ -38,11 +39,12 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"

"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"sort"
"time"

"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"

"github.com/jinzhu/copier"
)

Expand Down Expand Up @@ -950,10 +952,13 @@ func (c *Conversation) batchAddFaceURLAndName(ctx context.Context, conversations
groupIDs = append(groupIDs, conversation.GroupID)
}
}

// if userIDs = nil, return nil, nil
users, err := c.batchGetUserNameAndFaceURL(ctx, userIDs...)
if err != nil {
return err
}

groups, err := c.full.GetGroupsInfo(ctx, groupIDs...)
if err != nil {
return err
Expand Down Expand Up @@ -987,6 +992,10 @@ func (c *Conversation) batchGetUserNameAndFaceURL(ctx context.Context, userIDs .
var notCachedUserIDs []string
var notInFriend []string

if len(userIDs) == 0 {
return m, nil
}

friendList, err := c.friend.Db().GetFriendInfoList(ctx, userIDs)
if err != nil {
log.ZWarn(ctx, "BatchGetUserNameAndFaceURL", err, "userIDs", userIDs)
Expand Down
2 changes: 2 additions & 0 deletions internal/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package group

import (
"context"
"sync"

"github.com/openimsdk/openim-sdk-core/v3/internal/util"
"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Group struct {
conversationCh chan common.Cmd2Value
// memberSyncMutex sync.RWMutex

groupSyncMutex sync.Mutex
listenerForService open_im_sdk_callback.OnListenerForService
}

Expand Down
4 changes: 4 additions & 0 deletions internal/group/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package group
import (
"context"
"fmt"

"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/tools/errs"
Expand All @@ -27,6 +28,9 @@ import (
)

func (g *Group) DoNotification(ctx context.Context, msg *sdkws.MsgData) {
g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

go func() {
if err := g.doNotification(ctx, msg); err != nil {
log.ZError(ctx, "DoGroupNotification failed", err)
Expand Down
35 changes: 31 additions & 4 deletions internal/group/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package group

import (
"context"
"github.com/openimsdk/tools/errs"
"gorm.io/gorm"
"time"

"github.com/openimsdk/tools/errs"

"github.com/openimsdk/tools/utils/datautil"

"github.com/openimsdk/openim-sdk-core/v3/pkg/datafetcher"
Expand Down Expand Up @@ -47,6 +47,10 @@ func (g *Group) CreateGroup(ctx context.Context, req *group.CreateGroupReq) (*sd
if err != nil {
return nil, err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

if err := g.IncrSyncJoinGroup(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -101,6 +105,10 @@ func (g *Group) ChangeGroupMute(ctx context.Context, groupID string, isMute bool
if err != nil {
return err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

if err := g.IncrSyncGroupAndMember(ctx, groupID); err != nil {
return err
}
Expand All @@ -123,6 +131,10 @@ func (g *Group) TransferGroupOwner(ctx context.Context, groupID, newOwnerUserID
if err := util.ApiPost(ctx, constant.TransferGroupRouter, &group.TransferGroupOwnerReq{GroupID: groupID, OldOwnerUserID: g.loginUserID, NewOwnerUserID: newOwnerUserID}, nil); err != nil {
return err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

if err := g.IncrSyncGroupAndMember(ctx, groupID); err != nil {
return err
}
Expand All @@ -133,20 +145,32 @@ func (g *Group) KickGroupMember(ctx context.Context, groupID string, reason stri
if err := util.ApiPost(ctx, constant.KickGroupMemberRouter, &group.KickGroupMemberReq{GroupID: groupID, KickedUserIDs: userIDList, Reason: reason}, nil); err != nil {
return err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

return g.IncrSyncGroupAndMember(ctx, groupID)
}

func (g *Group) SetGroupInfo(ctx context.Context, groupInfo *sdkws.GroupInfoForSet) error {
if err := util.ApiPost(ctx, constant.SetGroupInfoRouter, &group.SetGroupInfoReq{GroupInfoForSet: groupInfo}, nil); err != nil {
return err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

return g.IncrSyncJoinGroup(ctx)
}

func (g *Group) SetGroupMemberInfo(ctx context.Context, groupMemberInfo *group.SetGroupMemberInfo) error {
if err := util.ApiPost(ctx, constant.SetGroupMemberInfoRouter, &group.SetGroupMemberInfoReq{Members: []*group.SetGroupMemberInfo{groupMemberInfo}}, nil); err != nil {
return err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

return g.IncrSyncGroupAndMember(ctx, groupMemberInfo.GroupID)
}

Expand Down Expand Up @@ -356,7 +380,7 @@ func (g *Group) GetSpecifiedGroupMembersInfo(ctx context.Context, groupID string

_, err := g.db.GetVersionSync(ctx, g.groupAndMemberVersionTableName(), groupID)
if err != nil {
if errs.Unwrap(err) != gorm.ErrRecordNotFound {
if errs.Unwrap(err) != errs.ErrRecordNotFound {
return nil, err
}
err := g.IncrSyncGroupAndMember(ctx, groupID)
Expand Down Expand Up @@ -414,7 +438,7 @@ func (g *Group) GetGroupMemberList(ctx context.Context, groupID string, filter,

_, err := g.db.GetVersionSync(ctx, g.groupAndMemberVersionTableName(), groupID)
if err != nil {
if errs.Unwrap(err) != gorm.ErrRecordNotFound {
if errs.Unwrap(err) != errs.ErrRecordNotFound {
return nil, err
}
err := g.IncrSyncGroupAndMember(ctx, groupID)
Expand Down Expand Up @@ -571,6 +595,9 @@ func (g *Group) InviteUserToGroup(ctx context.Context, groupID, reason string, u
return err
}

g.groupSyncMutex.Lock()
defer g.groupSyncMutex.Unlock()

if err := g.IncrSyncGroupAndMember(ctx, groupID); err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/group/sync2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"sync"

"gorm.io/gorm"

"github.com/openimsdk/openim-sdk-core/v3/internal/incrversion"
"github.com/openimsdk/openim-sdk-core/v3/internal/util"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
Expand Down Expand Up @@ -77,11 +75,12 @@ func (g *Group) IncrSyncGroupAndMember(ctx context.Context, groupIDs ...string)
req := group.GetIncrementalGroupMemberReq{
GroupID: groupID,
}

lvs, err := g.db.GetVersionSync(ctx, g.groupAndMemberVersionTableName(), groupID)
if err == nil {
req.VersionID = lvs.VersionID
req.Version = lvs.Version
} else if errs.Unwrap(err) != gorm.ErrRecordNotFound {
} else if errs.Unwrap(err) != errs.ErrRecordNotFound {
return err
}
groups = append(groups, &req)
Expand All @@ -96,10 +95,11 @@ func (g *Group) IncrSyncGroupAndMember(ctx context.Context, groupIDs ...string)
tempGroupID := groupID
wg.Add(1)
go func() error {
defer wg.Done()
if err := g.syncGroupAndMember(ctx, tempGroupID, tempResp); err != nil {
return err
log.ZError(ctx, "sync Group And Member error", errs.Wrap(err))
return errs.Wrap(err)
}
wg.Done()
return nil
}()
delete(groupIDSet, tempGroupID)
Expand Down
3 changes: 1 addition & 2 deletions internal/incrversion/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"gorm.io/gorm"
)

type VersionSynchronizer[V, R any] struct {
Expand All @@ -36,7 +35,7 @@ type VersionSynchronizer[V, R any] struct {

func (o *VersionSynchronizer[V, R]) getVersionInfo() (*model_struct.LocalVersionSync, error) {
versionInfo, err := o.DB.GetVersionSync(o.Ctx, o.TableName, o.EntityID)
if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound {
if err != nil && errs.Unwrap(err) != errs.ErrRecordNotFound {
log.ZWarn(o.Ctx, "get version info", err)
return nil, err

Expand Down
6 changes: 3 additions & 3 deletions internal/user/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package user
import (
"context"
"errors"

"github.com/openimsdk/openim-sdk-core/v3/internal/util"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
Expand All @@ -25,7 +26,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"gorm.io/gorm"
)

func (u *User) SyncLoginUserInfo(ctx context.Context) error {
Expand All @@ -34,7 +34,7 @@ func (u *User) SyncLoginUserInfo(ctx context.Context) error {
return err
}
localUser, err := u.GetLoginUser(ctx, u.loginUserID)
if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound {
if err != nil && errs.Unwrap(err) != errs.ErrRecordNotFound {
log.ZError(ctx, "SyncLoginUserInfo", err)
}
var localUsers []*model_struct.LocalUser
Expand All @@ -50,7 +50,7 @@ func (u *User) SyncLoginUserInfoWithoutNotice(ctx context.Context) error {
return err
}
localUser, err := u.GetLoginUser(ctx, u.loginUserID)
if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound {
if err != nil && errs.Unwrap(err) != errs.ErrRecordNotFound {
log.ZError(ctx, "SyncLoginUserInfo", err)
}
var localUsers []*model_struct.LocalUser
Expand Down
6 changes: 5 additions & 1 deletion pkg/db/app_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import (

func (d *DataBase) GetAppSDKVersion(ctx context.Context) (*model_struct.LocalAppSDKVersion, error) {
var appVersion model_struct.LocalAppSDKVersion
return &appVersion, errs.Wrap(d.conn.WithContext(ctx).Take(&appVersion).Error)
err := d.conn.WithContext(ctx).Take(&appVersion).Error
if err == gorm.ErrRecordNotFound {
err = errs.ErrRecordNotFound
}
return &appVersion, errs.Wrap(err)
}

func (d *DataBase) SetAppSDKVersion(ctx context.Context, appVersion *model_struct.LocalAppSDKVersion) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/db_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (d *DataBase) initDB(ctx context.Context, logLevel int) error {

func (d *DataBase) versionDataMigrate(ctx context.Context) error {
verModel, err := d.GetAppSDKVersion(ctx)
if errs.Unwrap(err) == gorm.ErrRecordNotFound {
if errs.Unwrap(err) == errs.ErrRecordNotFound {
err = d.conn.AutoMigrate(
&model_struct.LocalAppSDKVersion{},
&model_struct.LocalFriend{},
Expand Down
9 changes: 8 additions & 1 deletion pkg/db/user_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ func (d *DataBase) GetLoginUser(ctx context.Context, userID string) (*model_stru
d.userMtx.RLock()
defer d.userMtx.RUnlock()
var user model_struct.LocalUser
return &user, errs.WrapMsg(d.conn.WithContext(ctx).Where("user_id = ? ", userID).Take(&user).Error, "GetLoginUserInfo failed")
err := d.conn.WithContext(ctx).Where("user_id = ? ", userID).Take(&user).Error
if err != nil {
if err == errs.ErrRecordNotFound {
return nil, errs.ErrRecordNotFound.Wrap()
}
return nil, errs.Wrap(err)
}
return &user, nil
}

func (d *DataBase) UpdateLoginUser(ctx context.Context, user *model_struct.LocalUser) error {
Expand Down
9 changes: 8 additions & 1 deletion pkg/db/version_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ func (d *DataBase) GetVersionSync(ctx context.Context, tableName, entityID strin
d.versionMtx.RLock()
defer d.versionMtx.RUnlock()
var res model_struct.LocalVersionSync
return &res, errs.Wrap(d.conn.WithContext(ctx).Where("`table_name` = ? and `entity_id` = ?", tableName, entityID).Take(&res).Error)
err := d.conn.WithContext(ctx).Where("`table_name` = ? and `entity_id` = ?", tableName, entityID).Take(&res).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
err = errs.ErrRecordNotFound
}
return nil, errs.Wrap(err)
}
return &res, errs.Wrap(err)
}

func (d *DataBase) SetVersionSync(ctx context.Context, lv *model_struct.LocalVersionSync) error {
Expand Down

0 comments on commit 28de626

Please sign in to comment.