Skip to content

Commit

Permalink
fix:optimize login (#571)
Browse files Browse the repository at this point in the history
* fix:optimize login

* fix:optimize login
  • Loading branch information
icey-yu authored Jul 4, 2024
1 parent 38e2d36 commit 2aeab12
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 37 deletions.
70 changes: 35 additions & 35 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@ package interaction

import (
"context"
"strings"
"sync"
"time"

"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"

"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"strings"
"sync"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -88,44 +85,47 @@ func (m *MsgSyncer) loadSeq(ctx context.Context) error {
MaxSyncedSeq int64
Err error
}
concurrency := 10
t2 := time.Now()
SyncedSeqs := make(chan SyncedSeq, len(conversationIDList))
sem := make(chan struct{}, concurrency)

concurrency := 20
partSize := len(conversationIDList) / concurrency
var wg sync.WaitGroup
for _, v := range conversationIDList {
resultMaps := make([]map[string]SyncedSeq, concurrency)

for i := 0; i < concurrency; i++ {
wg.Add(1)
sem <- struct{}{} // Acquire a token
go func(conversationID string) {
defer wg.Done()
defer func() { <-sem }() // Release the token
start := i * partSize
end := start + partSize
if i == concurrency-1 {
end = len(conversationIDList)
}

maxSyncedSeq, err := m.db.GetConversationNormalMsgSeq(ctx, conversationID)
SyncedSeqs <- SyncedSeq{
ConversationID: conversationID,
MaxSyncedSeq: maxSyncedSeq,
Err: err,
resultMaps[i] = make(map[string]SyncedSeq)

go func(i, start, end int) {
defer wg.Done()
for _, v := range conversationIDList[start:end] {
maxSyncedSeq, err := m.db.GetConversationNormalMsgSeqNoInit(ctx, v)
resultMaps[i][v] = SyncedSeq{
ConversationID: v,
MaxSyncedSeq: maxSyncedSeq,
Err: err,
}
}
}(v)
log.ZDebug(ctx, "goroutine done.", "goroutine cost time", time.Since(t2))
}(i, start, end)
}

// Close the results channel once all goroutines have finished
go func() {
wg.Wait()
close(SyncedSeqs)
}()

// Collect the results
for res := range SyncedSeqs {
if res.Err != nil {
log.ZError(ctx, "get group normal seq failed", res.Err, "conversationID", res.ConversationID)
} else {
m.syncedMaxSeqs[res.ConversationID] = res.MaxSyncedSeq
wg.Wait()

// merge map
for _, resultMap := range resultMaps {
for k, v := range resultMap {
if v.Err != nil {
log.ZError(ctx, "get group normal seq failed", v.Err, "conversationID", k)
continue
}
m.syncedMaxSeqs[k] = v.MaxSyncedSeq
}
}

notificationSeqs, err := m.db.GetNotificationAllSeqs(ctx)
if err != nil {
log.ZError(ctx, "get notification seq failed", err)
Expand Down
12 changes: 11 additions & 1 deletion pkg/db/chat_log_model_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"

"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
Expand All @@ -46,6 +45,11 @@ func (d *DataBase) initChatLog(ctx context.Context, conversationID string) {
}
}
}

func (d *DataBase) checkTable(ctx context.Context, tableName string) bool {
return d.conn.Migrator().HasTable(tableName)
}

func (d *DataBase) UpdateMessage(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error {
t := d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Updates(c)
if t.RowsAffected == 0 {
Expand Down Expand Up @@ -334,6 +338,12 @@ func (d *DataBase) GetConversationNormalMsgSeq(ctx context.Context, conversation
return seq, errs.WrapMsg(err, "GetConversationNormalMsgSeq")
}

func (d *DataBase) GetConversationNormalMsgSeqNoInit(ctx context.Context, conversationID string) (int64, error) {
var seq int64
err := d.conn.WithContext(ctx).Table(utils.GetConversationTableName(conversationID)).Select("IFNULL(max(seq),0)").Find(&seq).Error
return seq, errs.WrapMsg(err, "GetConversationNormalMsgSeq")
}

func (d *DataBase) GetConversationPeerNormalMsgSeq(ctx context.Context, conversationID string) (int64, error) {
var seq int64
err := d.conn.WithContext(ctx).Table(utils.GetConversationTableName(conversationID)).Select("IFNULL(max(seq),0)").Where("send_id != ?", d.loginUserID).Find(&seq).Error
Expand Down
19 changes: 18 additions & 1 deletion pkg/db/conversation_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"gorm.io/gorm"
)

const (
batchSize = 200
)

func (d *DataBase) GetConversationByUserID(ctx context.Context, userID string) (*model_struct.LocalConversation, error) {
var conversation model_struct.LocalConversation
err := errs.WrapMsg(d.conn.WithContext(ctx).Where("user_id=?", userID).Find(&conversation).Error, "GetConversationByUserID error")
Expand Down Expand Up @@ -89,10 +93,23 @@ func (d *DataBase) BatchInsertConversationList(ctx context.Context, conversation
if conversationList == nil {
return nil
}

d.mRWMutex.Lock()
defer d.mRWMutex.Unlock()

return errs.WrapMsg(d.conn.WithContext(ctx).Create(conversationList).Error, "BatchInsertConversationList failed")
for i := 0; i < len(conversationList); i += batchSize {
end := i + batchSize
if end > len(conversationList) {
end = len(conversationList)
}

batch := conversationList[i:end]
if err := d.conn.WithContext(ctx).Create(batch).Error; err != nil {
return errs.WrapMsg(err, "BatchInsertConversationList failed")
}
}

return nil
}

func (d *DataBase) UpdateOrCreateConversations(ctx context.Context, conversationList []*model_struct.LocalConversation) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/db/db_interface/databse.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type MessageModel interface {
GetMessagesByClientMsgIDs(ctx context.Context, conversationID string, msgIDs []string) (result []*model_struct.LocalChatLog, err error)
GetMessagesBySeqs(ctx context.Context, conversationID string, seqs []int64) (result []*model_struct.LocalChatLog, err error)
GetConversationNormalMsgSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationNormalMsgSeqNoInit(ctx context.Context, conversationID string) (int64, error)
GetConversationPeerNormalMsgSeq(ctx context.Context, conversationID string) (int64, error)

GetTestMessage(ctx context.Context, seq uint32) (*model_struct.LocalChatLog, error)
Expand Down

0 comments on commit 2aeab12

Please sign in to comment.