diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 6648e7e76..732c05094 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/openim-sdk-core/v3/internal/business" "github.com/openimsdk/openim-sdk-core/v3/internal/cache" "github.com/openimsdk/openim-sdk-core/v3/internal/file" @@ -38,11 +39,12 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" - "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" "sort" "time" + "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" + "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" + "github.com/jinzhu/copier" ) @@ -950,10 +952,13 @@ func (c *Conversation) batchAddFaceURLAndName(ctx context.Context, conversations groupIDs = append(groupIDs, conversation.GroupID) } } + + // if userIDs = nil, return nil, nil users, err := c.batchGetUserNameAndFaceURL(ctx, userIDs...) if err != nil { return err } + groups, err := c.full.GetGroupsInfo(ctx, groupIDs...) if err != nil { return err @@ -987,6 +992,10 @@ func (c *Conversation) batchGetUserNameAndFaceURL(ctx context.Context, userIDs . var notCachedUserIDs []string var notInFriend []string + if len(userIDs) == 0 { + return m, nil + } + friendList, err := c.friend.Db().GetFriendInfoList(ctx, userIDs) if err != nil { log.ZWarn(ctx, "BatchGetUserNameAndFaceURL", err, "userIDs", userIDs) diff --git a/internal/group/group.go b/internal/group/group.go index 79a53613c..a00613077 100644 --- a/internal/group/group.go +++ b/internal/group/group.go @@ -16,6 +16,7 @@ package group import ( "context" + "sync" "github.com/openimsdk/openim-sdk-core/v3/internal/util" "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback" @@ -64,6 +65,7 @@ type Group struct { conversationCh chan common.Cmd2Value // memberSyncMutex sync.RWMutex + groupSyncMutex sync.Mutex listenerForService open_im_sdk_callback.OnListenerForService } diff --git a/internal/group/notification.go b/internal/group/notification.go index 56c3338e5..fabf308d1 100644 --- a/internal/group/notification.go +++ b/internal/group/notification.go @@ -17,6 +17,7 @@ package group import ( "context" "fmt" + "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" "github.com/openimsdk/tools/errs" @@ -27,6 +28,9 @@ import ( ) func (g *Group) DoNotification(ctx context.Context, msg *sdkws.MsgData) { + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + go func() { if err := g.doNotification(ctx, msg); err != nil { log.ZError(ctx, "DoGroupNotification failed", err) diff --git a/internal/group/sdk.go b/internal/group/sdk.go index 27e09c427..9fb78edd4 100644 --- a/internal/group/sdk.go +++ b/internal/group/sdk.go @@ -16,10 +16,10 @@ package group import ( "context" - "github.com/openimsdk/tools/errs" - "gorm.io/gorm" "time" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/openim-sdk-core/v3/pkg/datafetcher" @@ -47,6 +47,10 @@ func (g *Group) CreateGroup(ctx context.Context, req *group.CreateGroupReq) (*sd if err != nil { return nil, err } + + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + if err := g.IncrSyncJoinGroup(ctx); err != nil { return nil, err } @@ -101,6 +105,10 @@ func (g *Group) ChangeGroupMute(ctx context.Context, groupID string, isMute bool if err != nil { return err } + + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + if err := g.IncrSyncGroupAndMember(ctx, groupID); err != nil { return err } @@ -123,6 +131,10 @@ func (g *Group) TransferGroupOwner(ctx context.Context, groupID, newOwnerUserID if err := util.ApiPost(ctx, constant.TransferGroupRouter, &group.TransferGroupOwnerReq{GroupID: groupID, OldOwnerUserID: g.loginUserID, NewOwnerUserID: newOwnerUserID}, nil); err != nil { return err } + + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + if err := g.IncrSyncGroupAndMember(ctx, groupID); err != nil { return err } @@ -133,6 +145,10 @@ func (g *Group) KickGroupMember(ctx context.Context, groupID string, reason stri if err := util.ApiPost(ctx, constant.KickGroupMemberRouter, &group.KickGroupMemberReq{GroupID: groupID, KickedUserIDs: userIDList, Reason: reason}, nil); err != nil { return err } + + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + return g.IncrSyncGroupAndMember(ctx, groupID) } @@ -140,6 +156,10 @@ func (g *Group) SetGroupInfo(ctx context.Context, groupInfo *sdkws.GroupInfoForS if err := util.ApiPost(ctx, constant.SetGroupInfoRouter, &group.SetGroupInfoReq{GroupInfoForSet: groupInfo}, nil); err != nil { return err } + + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + return g.IncrSyncJoinGroup(ctx) } @@ -147,6 +167,10 @@ func (g *Group) SetGroupMemberInfo(ctx context.Context, groupMemberInfo *group.S if err := util.ApiPost(ctx, constant.SetGroupMemberInfoRouter, &group.SetGroupMemberInfoReq{Members: []*group.SetGroupMemberInfo{groupMemberInfo}}, nil); err != nil { return err } + + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + return g.IncrSyncGroupAndMember(ctx, groupMemberInfo.GroupID) } @@ -356,7 +380,7 @@ func (g *Group) GetSpecifiedGroupMembersInfo(ctx context.Context, groupID string _, err := g.db.GetVersionSync(ctx, g.groupAndMemberVersionTableName(), groupID) if err != nil { - if errs.Unwrap(err) != gorm.ErrRecordNotFound { + if errs.Unwrap(err) != errs.ErrRecordNotFound { return nil, err } err := g.IncrSyncGroupAndMember(ctx, groupID) @@ -414,7 +438,7 @@ func (g *Group) GetGroupMemberList(ctx context.Context, groupID string, filter, _, err := g.db.GetVersionSync(ctx, g.groupAndMemberVersionTableName(), groupID) if err != nil { - if errs.Unwrap(err) != gorm.ErrRecordNotFound { + if errs.Unwrap(err) != errs.ErrRecordNotFound { return nil, err } err := g.IncrSyncGroupAndMember(ctx, groupID) @@ -571,6 +595,9 @@ func (g *Group) InviteUserToGroup(ctx context.Context, groupID, reason string, u return err } + g.groupSyncMutex.Lock() + defer g.groupSyncMutex.Unlock() + if err := g.IncrSyncGroupAndMember(ctx, groupID); err != nil { return err } diff --git a/internal/group/sync2.go b/internal/group/sync2.go index a64a26c8b..b53fb7829 100644 --- a/internal/group/sync2.go +++ b/internal/group/sync2.go @@ -4,8 +4,6 @@ import ( "context" "sync" - "gorm.io/gorm" - "github.com/openimsdk/openim-sdk-core/v3/internal/incrversion" "github.com/openimsdk/openim-sdk-core/v3/internal/util" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" @@ -77,11 +75,12 @@ func (g *Group) IncrSyncGroupAndMember(ctx context.Context, groupIDs ...string) req := group.GetIncrementalGroupMemberReq{ GroupID: groupID, } + lvs, err := g.db.GetVersionSync(ctx, g.groupAndMemberVersionTableName(), groupID) if err == nil { req.VersionID = lvs.VersionID req.Version = lvs.Version - } else if errs.Unwrap(err) != gorm.ErrRecordNotFound { + } else if errs.Unwrap(err) != errs.ErrRecordNotFound { return err } groups = append(groups, &req) @@ -96,10 +95,11 @@ func (g *Group) IncrSyncGroupAndMember(ctx context.Context, groupIDs ...string) tempGroupID := groupID wg.Add(1) go func() error { + defer wg.Done() if err := g.syncGroupAndMember(ctx, tempGroupID, tempResp); err != nil { - return err + log.ZError(ctx, "sync Group And Member error", errs.Wrap(err)) + return errs.Wrap(err) } - wg.Done() return nil }() delete(groupIDSet, tempGroupID) diff --git a/internal/incrversion/option.go b/internal/incrversion/option.go index 9c7024143..ca2af52f2 100644 --- a/internal/incrversion/option.go +++ b/internal/incrversion/option.go @@ -10,7 +10,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "gorm.io/gorm" ) type VersionSynchronizer[V, R any] struct { @@ -36,7 +35,7 @@ type VersionSynchronizer[V, R any] struct { func (o *VersionSynchronizer[V, R]) getVersionInfo() (*model_struct.LocalVersionSync, error) { versionInfo, err := o.DB.GetVersionSync(o.Ctx, o.TableName, o.EntityID) - if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { + if err != nil && errs.Unwrap(err) != errs.ErrRecordNotFound { log.ZWarn(o.Ctx, "get version info", err) return nil, err diff --git a/internal/user/sync.go b/internal/user/sync.go index 90b6f405a..2d2d60af3 100644 --- a/internal/user/sync.go +++ b/internal/user/sync.go @@ -17,6 +17,7 @@ package user import ( "context" "errors" + "github.com/openimsdk/openim-sdk-core/v3/internal/util" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" @@ -25,7 +26,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "gorm.io/gorm" ) func (u *User) SyncLoginUserInfo(ctx context.Context) error { @@ -34,7 +34,7 @@ func (u *User) SyncLoginUserInfo(ctx context.Context) error { return err } localUser, err := u.GetLoginUser(ctx, u.loginUserID) - if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { + if err != nil && errs.Unwrap(err) != errs.ErrRecordNotFound { log.ZError(ctx, "SyncLoginUserInfo", err) } var localUsers []*model_struct.LocalUser @@ -50,7 +50,7 @@ func (u *User) SyncLoginUserInfoWithoutNotice(ctx context.Context) error { return err } localUser, err := u.GetLoginUser(ctx, u.loginUserID) - if err != nil && errs.Unwrap(err) != gorm.ErrRecordNotFound { + if err != nil && errs.Unwrap(err) != errs.ErrRecordNotFound { log.ZError(ctx, "SyncLoginUserInfo", err) } var localUsers []*model_struct.LocalUser diff --git a/pkg/db/app_version.go b/pkg/db/app_version.go index cfc391b6f..7324dfbda 100644 --- a/pkg/db/app_version.go +++ b/pkg/db/app_version.go @@ -14,7 +14,11 @@ import ( func (d *DataBase) GetAppSDKVersion(ctx context.Context) (*model_struct.LocalAppSDKVersion, error) { var appVersion model_struct.LocalAppSDKVersion - return &appVersion, errs.Wrap(d.conn.WithContext(ctx).Take(&appVersion).Error) + err := d.conn.WithContext(ctx).Take(&appVersion).Error + if err == gorm.ErrRecordNotFound { + err = errs.ErrRecordNotFound + } + return &appVersion, errs.Wrap(err) } func (d *DataBase) SetAppSDKVersion(ctx context.Context, appVersion *model_struct.LocalAppSDKVersion) error { diff --git a/pkg/db/db_init.go b/pkg/db/db_init.go index 6f64d5b95..697d0a5a0 100644 --- a/pkg/db/db_init.go +++ b/pkg/db/db_init.go @@ -148,7 +148,7 @@ func (d *DataBase) initDB(ctx context.Context, logLevel int) error { func (d *DataBase) versionDataMigrate(ctx context.Context) error { verModel, err := d.GetAppSDKVersion(ctx) - if errs.Unwrap(err) == gorm.ErrRecordNotFound { + if errs.Unwrap(err) == errs.ErrRecordNotFound { err = d.conn.AutoMigrate( &model_struct.LocalAppSDKVersion{}, &model_struct.LocalFriend{}, diff --git a/pkg/db/user_model.go b/pkg/db/user_model.go index eddbfeb14..6500fd9c5 100644 --- a/pkg/db/user_model.go +++ b/pkg/db/user_model.go @@ -29,7 +29,14 @@ func (d *DataBase) GetLoginUser(ctx context.Context, userID string) (*model_stru d.userMtx.RLock() defer d.userMtx.RUnlock() var user model_struct.LocalUser - return &user, errs.WrapMsg(d.conn.WithContext(ctx).Where("user_id = ? ", userID).Take(&user).Error, "GetLoginUserInfo failed") + err := d.conn.WithContext(ctx).Where("user_id = ? ", userID).Take(&user).Error + if err != nil { + if err == errs.ErrRecordNotFound { + return nil, errs.ErrRecordNotFound.Wrap() + } + return nil, errs.Wrap(err) + } + return &user, nil } func (d *DataBase) UpdateLoginUser(ctx context.Context, user *model_struct.LocalUser) error { diff --git a/pkg/db/version_sync.go b/pkg/db/version_sync.go index 123beab46..34a82959e 100644 --- a/pkg/db/version_sync.go +++ b/pkg/db/version_sync.go @@ -15,7 +15,14 @@ func (d *DataBase) GetVersionSync(ctx context.Context, tableName, entityID strin d.versionMtx.RLock() defer d.versionMtx.RUnlock() var res model_struct.LocalVersionSync - return &res, errs.Wrap(d.conn.WithContext(ctx).Where("`table_name` = ? and `entity_id` = ?", tableName, entityID).Take(&res).Error) + err := d.conn.WithContext(ctx).Where("`table_name` = ? and `entity_id` = ?", tableName, entityID).Take(&res).Error + if err != nil { + if err == gorm.ErrRecordNotFound { + err = errs.ErrRecordNotFound + } + return nil, errs.Wrap(err) + } + return &res, errs.Wrap(err) } func (d *DataBase) SetVersionSync(ctx context.Context, lv *model_struct.LocalVersionSync) error {