From 2aeab121a0a7af975903e823f06fafb30162a4e4 Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Thu, 4 Jul 2024 14:17:06 +0800 Subject: [PATCH] fix:optimize login (#571) * fix:optimize login * fix:optimize login --- internal/interaction/msg_sync.go | 70 ++++++++++++++++---------------- pkg/db/chat_log_model_v3.go | 12 +++++- pkg/db/conversation_model.go | 19 ++++++++- pkg/db/db_interface/databse.go | 1 + 4 files changed, 65 insertions(+), 37 deletions(-) diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index 4f28dec24..504bd8f5d 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -16,16 +16,13 @@ package interaction import ( "context" - "strings" - "sync" - "time" - - "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" - "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface" + "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" + "strings" + "sync" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" @@ -88,44 +85,47 @@ func (m *MsgSyncer) loadSeq(ctx context.Context) error { MaxSyncedSeq int64 Err error } - concurrency := 10 - t2 := time.Now() - SyncedSeqs := make(chan SyncedSeq, len(conversationIDList)) - sem := make(chan struct{}, concurrency) + concurrency := 20 + partSize := len(conversationIDList) / concurrency var wg sync.WaitGroup - for _, v := range conversationIDList { + resultMaps := make([]map[string]SyncedSeq, concurrency) + + for i := 0; i < concurrency; i++ { wg.Add(1) - sem <- struct{}{} // Acquire a token - go func(conversationID string) { - defer wg.Done() - defer func() { <-sem }() // Release the token + start := i * partSize + end := start + partSize + if i == concurrency-1 { + end = len(conversationIDList) + } - maxSyncedSeq, err := m.db.GetConversationNormalMsgSeq(ctx, conversationID) - SyncedSeqs <- SyncedSeq{ - ConversationID: conversationID, - MaxSyncedSeq: maxSyncedSeq, - Err: err, + resultMaps[i] = make(map[string]SyncedSeq) + + go func(i, start, end int) { + defer wg.Done() + for _, v := range conversationIDList[start:end] { + maxSyncedSeq, err := m.db.GetConversationNormalMsgSeqNoInit(ctx, v) + resultMaps[i][v] = SyncedSeq{ + ConversationID: v, + MaxSyncedSeq: maxSyncedSeq, + Err: err, + } } - }(v) - log.ZDebug(ctx, "goroutine done.", "goroutine cost time", time.Since(t2)) + }(i, start, end) } - // Close the results channel once all goroutines have finished - go func() { - wg.Wait() - close(SyncedSeqs) - }() - - // Collect the results - for res := range SyncedSeqs { - if res.Err != nil { - log.ZError(ctx, "get group normal seq failed", res.Err, "conversationID", res.ConversationID) - } else { - m.syncedMaxSeqs[res.ConversationID] = res.MaxSyncedSeq + wg.Wait() + + // merge map + for _, resultMap := range resultMaps { + for k, v := range resultMap { + if v.Err != nil { + log.ZError(ctx, "get group normal seq failed", v.Err, "conversationID", k) + continue + } + m.syncedMaxSeqs[k] = v.MaxSyncedSeq } } - notificationSeqs, err := m.db.GetNotificationAllSeqs(ctx) if err != nil { log.ZError(ctx, "get notification seq failed", err) diff --git a/pkg/db/chat_log_model_v3.go b/pkg/db/chat_log_model_v3.go index da69c056a..5f7efc4c9 100644 --- a/pkg/db/chat_log_model_v3.go +++ b/pkg/db/chat_log_model_v3.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" @@ -46,6 +45,11 @@ func (d *DataBase) initChatLog(ctx context.Context, conversationID string) { } } } + +func (d *DataBase) checkTable(ctx context.Context, tableName string) bool { + return d.conn.Migrator().HasTable(tableName) +} + func (d *DataBase) UpdateMessage(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error { t := d.conn.WithContext(ctx).Table(utils.GetTableName(conversationID)).Updates(c) if t.RowsAffected == 0 { @@ -334,6 +338,12 @@ func (d *DataBase) GetConversationNormalMsgSeq(ctx context.Context, conversation return seq, errs.WrapMsg(err, "GetConversationNormalMsgSeq") } +func (d *DataBase) GetConversationNormalMsgSeqNoInit(ctx context.Context, conversationID string) (int64, error) { + var seq int64 + err := d.conn.WithContext(ctx).Table(utils.GetConversationTableName(conversationID)).Select("IFNULL(max(seq),0)").Find(&seq).Error + return seq, errs.WrapMsg(err, "GetConversationNormalMsgSeq") +} + func (d *DataBase) GetConversationPeerNormalMsgSeq(ctx context.Context, conversationID string) (int64, error) { var seq int64 err := d.conn.WithContext(ctx).Table(utils.GetConversationTableName(conversationID)).Select("IFNULL(max(seq),0)").Where("send_id != ?", d.loginUserID).Find(&seq).Error diff --git a/pkg/db/conversation_model.go b/pkg/db/conversation_model.go index f4324dabc..b5c7ad57c 100644 --- a/pkg/db/conversation_model.go +++ b/pkg/db/conversation_model.go @@ -31,6 +31,10 @@ import ( "gorm.io/gorm" ) +const ( + batchSize = 200 +) + func (d *DataBase) GetConversationByUserID(ctx context.Context, userID string) (*model_struct.LocalConversation, error) { var conversation model_struct.LocalConversation err := errs.WrapMsg(d.conn.WithContext(ctx).Where("user_id=?", userID).Find(&conversation).Error, "GetConversationByUserID error") @@ -89,10 +93,23 @@ func (d *DataBase) BatchInsertConversationList(ctx context.Context, conversation if conversationList == nil { return nil } + d.mRWMutex.Lock() defer d.mRWMutex.Unlock() - return errs.WrapMsg(d.conn.WithContext(ctx).Create(conversationList).Error, "BatchInsertConversationList failed") + for i := 0; i < len(conversationList); i += batchSize { + end := i + batchSize + if end > len(conversationList) { + end = len(conversationList) + } + + batch := conversationList[i:end] + if err := d.conn.WithContext(ctx).Create(batch).Error; err != nil { + return errs.WrapMsg(err, "BatchInsertConversationList failed") + } + } + + return nil } func (d *DataBase) UpdateOrCreateConversations(ctx context.Context, conversationList []*model_struct.LocalConversation) error { diff --git a/pkg/db/db_interface/databse.go b/pkg/db/db_interface/databse.go index 3c92407b8..d04184726 100644 --- a/pkg/db/db_interface/databse.go +++ b/pkg/db/db_interface/databse.go @@ -113,6 +113,7 @@ type MessageModel interface { GetMessagesByClientMsgIDs(ctx context.Context, conversationID string, msgIDs []string) (result []*model_struct.LocalChatLog, err error) GetMessagesBySeqs(ctx context.Context, conversationID string, seqs []int64) (result []*model_struct.LocalChatLog, err error) GetConversationNormalMsgSeq(ctx context.Context, conversationID string) (int64, error) + GetConversationNormalMsgSeqNoInit(ctx context.Context, conversationID string) (int64, error) GetConversationPeerNormalMsgSeq(ctx context.Context, conversationID string) (int64, error) GetTestMessage(ctx context.Context, seq uint32) (*model_struct.LocalChatLog, error)