From f23f7d661debb1b5fd662b155fe2155e02a86433 Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Fri, 3 Jan 2025 19:48:08 +0800 Subject: [PATCH] feat: add a function to quickly retrieve the context messages for a given message. (#827) * fix: quote message change to revoke message when app from background to foreground and message status update. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: add a function to quickly retrieve the context messages for a given message. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: the SDK interface using the pb protocol to replace json. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --------- Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- internal/conversation_msg/api.go | 59 ++++++++++++++++++ internal/conversation_msg/conversation.go | 22 +++---- internal/conversation_msg/conversation_msg.go | 62 ++----------------- internal/conversation_msg/message_check.go | 18 +++--- internal/conversation_msg/read_drawing.go | 4 +- internal/interaction/long_conn_mgr.go | 2 + internal/interaction/msg_sync.go | 46 +++++++++++++- internal/user/user.go | 5 +- open_im_sdk/conversation_msg.go | 4 +- pkg/cache/conversation_seq_cache.go | 32 ++++++++++ pkg/cache/{manager.go => user_cache.go} | 21 ++++--- pkg/constant/constant.go | 1 + .../conversation_msg_sdk_struct.go | 1 + pkg/sdkerrs/code.go | 1 + pkg/sdkerrs/predefine.go | 1 + test/conversation_test.go | 3 + test/create_msg_test.go | 2 +- 19 files changed, 191 insertions(+), 99 deletions(-) create mode 100644 pkg/cache/conversation_seq_cache.go rename pkg/cache/{manager.go => user_cache.go} (82%) diff --git a/go.mod b/go.mod index a161cc33d..6879626c8 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require golang.org/x/net v0.29.0 // indirect require ( github.com/google/go-cmp v0.6.0 - github.com/openimsdk/protocol v0.0.72-alpha.63 + github.com/openimsdk/protocol v0.0.72-alpha.70 github.com/openimsdk/tools v0.0.50-alpha.21 github.com/patrickmn/go-cache v2.1.0+incompatible golang.org/x/image v0.15.0 diff --git a/go.sum b/go.sum index be5153407..895dd6738 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0= -github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ= +github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.21 h1:ZKgSFkiBjz6KcNZlNwvrSoUYJ7K5Flan8wHuRBH3VqY= github.com/openimsdk/tools v0.0.50-alpha.21/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= diff --git a/internal/conversation_msg/api.go b/internal/conversation_msg/api.go index 6c09891e0..ddf36c737 100644 --- a/internal/conversation_msg/api.go +++ b/internal/conversation_msg/api.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/openimsdk/openim-sdk-core/v3/pkg/cache" pconstant "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" @@ -22,6 +23,7 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/pkg/content_type" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback" + sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback" "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/openim-sdk-core/v3/pkg/server_api_params" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" @@ -1014,3 +1016,60 @@ func (c *Conversation) SearchConversation(ctx context.Context, searchParam strin // Return the list of conversations return apiConversations, nil } +func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) { + return c.typing.GetInputStates(conversationID, userID), nil +} + +func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error { + return c.typing.ChangeInputStates(ctx, conversationID, focus) +} + +func (c *Conversation) FetchSurroundingMessages(ctx context.Context, s *sdk_struct.MsgStruct, before int, after int) ([]*sdk_struct.MsgStruct, error) { + conversationID := utils.GetConversationIDByMsg(s) + var message *model_struct.LocalChatLog + message, err := c.db.GetMessage(ctx, conversationID, s.ClientMsgID) + if err == nil { + if message.Status >= constant.MsgStatusHasDeleted { + return nil, sdkerrs.ErrMsgHasDeleted + } + } else { + if s.Seq == 0 { + return nil, sdkerrs.ErrMsgHasDeleted + } + var messages []*model_struct.LocalChatLog + c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{s.Seq}, false, 1, 0, &messages, &sdk.GetAdvancedHistoryMessageListCallback{}) + if len(messages) < 1 { + return nil, sdkerrs.ErrMsgHasDeleted + } + message = messages[0] + } + + result := make([]*sdk_struct.MsgStruct, 0, before+after+1) + if before > 0 { + req := sdk.GetAdvancedHistoryMessageListParams{ + ConversationID: conversationID, + Count: before, + StartClientMsgID: s.ClientMsgID, + ViewType: cache.ViewSearch, + } + val, err := c.getAdvancedHistoryMessageList(ctx, req, false) + if err != nil { + return nil, err + } + result = append(result, val.MessageList...) + } + result = append(result, LocalChatLogToMsgStruct(message)) + if after > 0 { + req := sdk.GetAdvancedHistoryMessageListParams{ + ConversationID: conversationID, + Count: after, + StartClientMsgID: s.ClientMsgID, + } + val, err := c.getAdvancedHistoryMessageList(ctx, req, true) + if err != nil { + return nil, err + } + result = append(result, val.MessageList...) + } + return result, nil +} diff --git a/internal/conversation_msg/conversation.go b/internal/conversation_msg/conversation.go index 3e45e8018..53568f9cd 100644 --- a/internal/conversation_msg/conversation.go +++ b/internal/conversation_msg/conversation.go @@ -65,12 +65,12 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd startTime = m.SendTime } else { // Clear both maps when the user enters the conversation - c.messagePullForwardEndSeqMap.Delete(conversationID) - c.messagePullReverseEndSeqMap.Delete(conversationID) + c.messagePullForwardEndSeqMap.Delete(conversationID, req.ViewType) + c.messagePullReverseEndSeqMap.Delete(conversationID, req.ViewType) } log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID", conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime) - list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, &messageListCallback) + list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, req.ViewType, &messageListCallback) if err != nil { return nil, err } @@ -91,7 +91,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd } func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string, - count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) { + count int, startTime int64, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) { var list, validMessages []*model_struct.LocalChatLog @@ -125,8 +125,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati } if !isReverse { if thisEndSeq != 0 { - c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool { - lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key) + c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool { + lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key, viewType) if value < lastEndSeq || lastEndSeq == 0 { log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value) return true @@ -138,8 +138,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati } } else { if thisEndSeq != 0 { - c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool { - lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key) + c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool { + lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key, viewType) if value > lastEndSeq || lastEndSeq == 0 { log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value) return true @@ -175,10 +175,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq) t = time.Now() c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID, - isReverse, count, startTime, &list, messageListCallback) + isReverse, viewType, count, startTime, &list, messageListCallback) log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq) t = time.Now() - c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, + c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, viewType, count, startTime, &list, messageListCallback) log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t)) // If the number of valid messages retrieved is less than the count, @@ -188,7 +188,7 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati newStartTime := getNewStartTime(list) log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID", conversationID, "newStartTime", newStartTime) - missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, messageListCallback) + missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, viewType, messageListCallback) if err != nil { return nil, err } diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 897e851f6..38f39b5bf 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -8,8 +8,6 @@ import ( "math" "sync" - sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback" - "github.com/openimsdk/openim-sdk-core/v3/pkg/api" "github.com/openimsdk/openim-sdk-core/v3/pkg/cache" "github.com/openimsdk/tools/utils/stringutil" @@ -67,8 +65,8 @@ type Conversation struct { file *file.File cache *cache.Cache[string, *model_struct.LocalConversation] maxSeqRecorder MaxSeqRecorder - messagePullForwardEndSeqMap *cache.Cache[string, int64] - messagePullReverseEndSeqMap *cache.Cache[string, int64] + messagePullForwardEndSeqMap *cache.ConversationSeqContextCache + messagePullReverseEndSeqMap *cache.ConversationSeqContextCache IsExternalExtensions bool msgOffset int progress int @@ -108,8 +106,8 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr, file: file, IsExternalExtensions: info.IsExternalExtensions(), maxSeqRecorder: NewMaxSeqRecorder(), - messagePullForwardEndSeqMap: cache.NewCache[string, int64](), - messagePullReverseEndSeqMap: cache.NewCache[string, int64](), + messagePullForwardEndSeqMap: cache.NewConversationSeqContextCache(), + messagePullReverseEndSeqMap: cache.NewConversationSeqContextCache(), msgOffset: 0, progress: 0, } @@ -906,55 +904,3 @@ func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string) } return userInfo.FaceURL, userInfo.Nickname, nil } - -func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) { - return c.typing.GetInputStates(conversationID, userID), nil -} - -func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error { - return c.typing.ChangeInputStates(ctx, conversationID, focus) -} - -func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversationID string, seq int64, before int64, after int64) ([]*sdk_struct.MsgStruct, error) { - c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{seq}, false, 0, 0, &[]*model_struct.LocalChatLog{}, &sdk.GetAdvancedHistoryMessageListCallback{}) - res, err := c.db.GetMessagesBySeqs(ctx, conversationID, []int64{seq}) - if err != nil { - return nil, err - } - if len(res) == 0 { - return []*sdk_struct.MsgStruct{}, nil - } - //_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]}) - //if len(msgList) == 0 { - // return []*sdk_struct.MsgStruct{}, nil - //} - //msg := msgList[0] - result := make([]*sdk_struct.MsgStruct, 0, before+after+1) - //if before > 0 { - // req := sdk.GetAdvancedHistoryMessageListParams{ - // ConversationID: conversationID, - // Count: int(before), - // StartClientMsgID: msg.ClientMsgID, - // } - // val, err := c.getAdvancedHistoryMessageList(ctx, req, false) - // if err != nil { - // return nil, err - // } - // result = append(result, val.MessageList...) - //} - //result = append(result, msg) - //if after > 0 { - // req := sdk.GetAdvancedHistoryMessageListParams{ - // ConversationID: conversationID, - // Count: int(after), - // StartClientMsgID: msg.ClientMsgID, - // } - // val, err := c.getAdvancedHistoryMessageList(ctx, req, true) - // if err != nil { - // return nil, err - // } - // result = append(result, val.MessageList...) - //} - //sort.Sort(sdk_struct.NewMsgList(result)) - return result, nil -} diff --git a/internal/conversation_msg/message_check.go b/internal/conversation_msg/message_check.go index c7627a63b..a000fc58e 100644 --- a/internal/conversation_msg/message_check.go +++ b/internal/conversation_msg/message_check.go @@ -43,17 +43,17 @@ func (c *Conversation) validateAndFillInternalGaps(ctx context.Context, conversa // validateAndFillInterBlockGaps checks for continuity between blocks of messages. If a gap is identified, it retrieves the missing messages // to bridge the gap. The function returns a boolean indicating whether the blocks are continuous. func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisStartSeq int64, conversationID string, - isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) { + isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) { var lastEndSeq, startSeq, endSeq int64 var isLostSeq bool if isReverse { - lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID) + lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID, viewType) isLostSeq = lastEndSeq+1 != thisStartSeq startSeq = lastEndSeq + 1 endSeq = thisStartSeq - 1 } else { - lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID) + lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID, viewType) isLostSeq = thisStartSeq+1 != lastEndSeq startSeq = thisStartSeq + 1 endSeq = lastEndSeq - 1 @@ -73,15 +73,15 @@ func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisSt // internal and inter-block continuity checks but contains fewer messages than `count`, this function verifies if the end // of the message history has been reached. If not, it attempts to retrieve any missing messages to ensure continuity. func (c *Conversation) validateAndFillEndBlockContinuity(ctx context.Context, conversationID string, - isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) { - isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback) + isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) { + isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback) if isShouldFetchMessage { c.fetchAndMergeMissingMessages(ctx, conversationID, lostSeqList, isReverse, count, startTime, list, messageListCallback) - _, _ = c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback) + _, _ = c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback) } } -func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, count int, +func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, viewType, count int, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (isShouldFetchMessage bool, seqList []int64) { // Perform an end-of-block check if the retrieved message count is less than requested if len(*list) < count { @@ -94,7 +94,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, if maxSeq >= currentMaxSeq { messageListCallback.IsEnd = true } else { - lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID) + lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID, viewType) log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastEndSeq", lastEndSeq, "conversationID", conversationID) // If `maxSeq` is zero and `lastEndSeq` is at the maximum server sequence, this batch is fully local if maxSeq == 0 && lastEndSeq >= currentMaxSeq { // All messages in this batch are local messages, @@ -124,7 +124,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, if minSeq <= userCanPullMinSeq { messageListCallback.IsEnd = true } else { - lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID) + lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID, viewType) log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastMinSeq", lastMinSeq, "conversationID", conversationID) // If `minSeq` is zero and `lastMinSeq` is at the minimum server sequence, this batch is fully local if minSeq == 0 && lastMinSeq <= userCanPullMinSeq { // All messages in this batch are local messages, diff --git a/internal/conversation_msg/read_drawing.go b/internal/conversation_msg/read_drawing.go index b015461cb..36c675c22 100644 --- a/internal/conversation_msg/read_drawing.go +++ b/internal/conversation_msg/read_drawing.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" - "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/openim-sdk-core/v3/pkg/utils" "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" "github.com/openimsdk/tools/errs" @@ -52,7 +51,8 @@ func (c *Conversation) markConversationMessageAsRead(ctx context.Context, conver return err } if conversation.UnreadCount == 0 { - return sdkerrs.ErrUnreadCount + log.ZWarn(ctx, "unread count is 0", nil, "conversationID", conversationID) + return nil } // get the maximum sequence number of messages in the table that are not sent by oneself peerUserMaxSeq, err := c.db.GetConversationPeerNormalMsgSeq(ctx, conversationID) diff --git a/internal/interaction/long_conn_mgr.go b/internal/interaction/long_conn_mgr.go index c872de1eb..252f6ded2 100644 --- a/internal/interaction/long_conn_mgr.go +++ b/internal/interaction/long_conn_mgr.go @@ -506,6 +506,8 @@ func (c *LongConnMgr) handleMessage(message []byte) error { fallthrough case constant.GetConvMaxReadSeq: fallthrough + case constant.PullConvLastMessage: + fallthrough case constant.SendMsg: fallthrough case constant.SendSignalMsg: diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index 113c79739..814bc1679 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -30,6 +30,7 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct" "github.com/openimsdk/openim-sdk-core/v3/sdk_struct" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/errs" "github.com/openimsdk/protocol/sdkws" @@ -100,7 +101,7 @@ func (m *MsgSyncer) loadSeq(ctx context.Context) error { } } - // TODO With a large number of sessions, this could potentially cause blocking and needs optimization. + // TODO With a large number of sessions(10w), this could potentially cause blocking and needs optimization. type SyncedSeq struct { ConversationID string @@ -476,6 +477,7 @@ func (m *MsgSyncer) syncAndTriggerReinstallMsgs(ctx context.Context, seqMap map[ log.ZError(ctx, "syncMsgFromServer err", err, "tempSeqMap", tpSeqMap) return err } + m.checkMessagesAndGetLastMessage(ctx, resp.Msgs) _ = m.triggerReinstallConversation(ctx, resp.Msgs, total) for conversationID, seqs := range tpSeqMap { m.syncedMaxSeqsLock.Lock() @@ -495,6 +497,7 @@ func (m *MsgSyncer) syncAndTriggerReinstallMsgs(ctx context.Context, seqMap map[ log.ZError(ctx, "syncMsgFromServer err", err, "seqMap", seqMap) return err } + m.checkMessagesAndGetLastMessage(ctx, resp.Msgs) _ = m.triggerReinstallConversation(ctx, resp.Msgs, total) for conversationID, seqs := range seqMap { m.syncedMaxSeqsLock.Lock() @@ -512,6 +515,33 @@ func (m *MsgSyncer) syncAndTriggerReinstallMsgs(ctx context.Context, seqMap map[ } return nil } +func (m *MsgSyncer) checkMessagesAndGetLastMessage(ctx context.Context, messages map[string]*sdkws.PullMsgs) { + var conversationIDs []string + + for conversationID, message := range messages { + allInValid := true + for _, data := range message.Msgs { + if data.Status < constant.MsgStatusHasDeleted { + allInValid = false + break + } + } + if allInValid { + conversationIDs = append(conversationIDs, conversationID) + } + } + if len(conversationIDs) > 0 { + resp, err := m.fetchLatestValidMessages(ctx, conversationIDs) + if err != nil { + log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs) + return + } + for conversationID, message := range resp.Msgs { + messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}} + } + } + +} func (m *MsgSyncer) splitSeqs(split int, seqsNeedSync []int64) (splitSeqs [][]int64) { if len(seqsNeedSync) <= split { @@ -547,6 +577,20 @@ func (m *MsgSyncer) pullMsgBySeqRange(ctx context.Context, seqMap map[string][2] return resp, nil } +func (m *MsgSyncer) fetchLatestValidMessages(ctx context.Context, conversationID []string) (resp *msg.GetLastMessageResp, err error) { + log.ZDebug(ctx, "fetchLatestValidMessages", "conversationID", conversationID) + + req := msg.GetLastMessageReq{ + UserID: m.loginUserID, + ConversationIDs: conversationID, + } + resp = &msg.GetLastMessageResp{} + if err := m.longConnMgr.SendReqWaitResp(ctx, &req, constant.PullConvLastMessage, resp); err != nil { + return nil, err + } + return resp, nil +} + // synchronizes messages by SEQs. func (m *MsgSyncer) syncMsgBySeqs(ctx context.Context, conversationID string, seqsNeedSync []int64) (allMsgs []*sdkws.MsgData, err error) { pullMsgReq := sdkws.PullMessageBySeqsReq{} diff --git a/internal/user/user.go b/internal/user/user.go index f114e9528..4ceab8ed6 100644 --- a/internal/user/user.go +++ b/internal/user/user.go @@ -17,6 +17,7 @@ package user import ( "context" "fmt" + "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/tools/log" @@ -36,7 +37,7 @@ func NewUser(dataBase db_interface.DataBase, loginUserID string, conversationCh user := &User{DataBase: dataBase, loginUserID: loginUserID, conversationCh: conversationCh} user.initSyncer() //user.OnlineStatusCache = cache.NewCache[string, *userPb.OnlineStatus]() - user.UserCache = cache.NewManager[string, *model_struct.LocalUser]( + user.UserCache = cache.NewUserCache[string, *model_struct.LocalUser]( func(value *model_struct.LocalUser) string { return value.UserID }, nil, user.GetLoginUser, @@ -53,7 +54,7 @@ type User struct { userSyncer *syncer.Syncer[*model_struct.LocalUser, syncer.NoResp, string] commandSyncer *syncer.Syncer[*model_struct.LocalUserCommand, syncer.NoResp, string] conversationCh chan common.Cmd2Value - UserCache *cache.Manager[string, *model_struct.LocalUser] + UserCache *cache.UserCache[string, *model_struct.LocalUser] //OnlineStatusCache *cache.Cache[string, *userPb.OnlineStatus] } diff --git a/open_im_sdk/conversation_msg.go b/open_im_sdk/conversation_msg.go index 4ee6cbecb..8eef52088 100644 --- a/open_im_sdk/conversation_msg.go +++ b/open_im_sdk/conversation_msg.go @@ -195,6 +195,6 @@ func GetInputStates(callback open_im_sdk_callback.Base, operationID string, conv call(callback, operationID, UserForSDK.Conversation().GetInputStates, conversationID, userID) } -func FetchSurroundingMessages(callback open_im_sdk_callback.Base, operationID string, conversationID string, seq int64, before int64, after int64) { - call(callback, operationID, UserForSDK.Conversation().FetchSurroundingMessages, conversationID, seq, before, after) +func FetchSurroundingMessages(callback open_im_sdk_callback.Base, operationID string, message string, before int, after int) { + call(callback, operationID, UserForSDK.Conversation().FetchSurroundingMessages, message, before, after) } diff --git a/pkg/cache/conversation_seq_cache.go b/pkg/cache/conversation_seq_cache.go new file mode 100644 index 000000000..257e545ae --- /dev/null +++ b/pkg/cache/conversation_seq_cache.go @@ -0,0 +1,32 @@ +package cache + +import "github.com/openimsdk/tools/utils/stringutil" + +const ( + ViewHistory = iota + ViewSearch +) + +type ConversationSeqContextCache struct { + *Cache[string, int64] +} + +func NewConversationSeqContextCache() *ConversationSeqContextCache { + return &ConversationSeqContextCache{Cache: NewCache[string, int64]()} +} + +func (c ConversationSeqContextCache) Load(conversationID string, viewType int) (int64, bool) { + return c.Cache.Load(c.getConversationViewTypeKey(conversationID, viewType)) + +} +func (c ConversationSeqContextCache) Delete(conversationID string, viewType int) { + c.Cache.Delete(c.getConversationViewTypeKey(conversationID, viewType)) + +} +func (c ConversationSeqContextCache) StoreWithFunc(conversationID string, viewType int, thisEndSeq int64, fn func(key string, value int64) bool) { + + c.Cache.StoreWithFunc(c.getConversationViewTypeKey(conversationID, viewType), thisEndSeq, fn) +} +func (c ConversationSeqContextCache) getConversationViewTypeKey(conversationID string, viewType int) string { + return conversationID + "_" + stringutil.IntToString(viewType) +} diff --git a/pkg/cache/manager.go b/pkg/cache/user_cache.go similarity index 82% rename from pkg/cache/manager.go rename to pkg/cache/user_cache.go index a43048a6c..464bd85a2 100644 --- a/pkg/cache/manager.go +++ b/pkg/cache/user_cache.go @@ -2,18 +2,19 @@ package cache import ( "context" + "github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs" "github.com/openimsdk/tools/utils/datautil" ) -func NewManager[K comparable, V any]( +func NewUserCache[K comparable, V any]( getKeyFunc func(value V) K, batchDBFunc func(ctx context.Context, keys []K) ([]V, error), singleDBFunc func(ctx context.Context, keys K) (V, error), queryFunc func(ctx context.Context, keys []K) ([]V, error), -) *Manager[K, V] { - return &Manager[K, V]{ - Cache: Cache[K, V]{}, +) *UserCache[K, V] { + return &UserCache[K, V]{ + Cache: NewCache[K, V](), getKeyFunc: getKeyFunc, batchDBFunc: batchDBFunc, singleDBFunc: singleDBFunc, @@ -21,15 +22,15 @@ func NewManager[K comparable, V any]( } } -type Manager[K comparable, V any] struct { - Cache[K, V] +type UserCache[K comparable, V any] struct { + *Cache[K, V] getKeyFunc func(value V) K batchDBFunc func(ctx context.Context, keys []K) ([]V, error) singleDBFunc func(ctx context.Context, keys K) (V, error) queryFunc func(ctx context.Context, keys []K) ([]V, error) } -func (m *Manager[K, V]) BatchFetch(ctx context.Context, keys []K) (map[K]V, error) { +func (m *UserCache[K, V]) BatchFetch(ctx context.Context, keys []K) (map[K]V, error) { var ( res = make(map[K]V) queryKeys []K @@ -56,7 +57,7 @@ func (m *Manager[K, V]) BatchFetch(ctx context.Context, keys []K) (map[K]V, erro return res, nil } -func (m *Manager[K, V]) Fetch(ctx context.Context, key K) (V, error) { +func (m *UserCache[K, V]) Fetch(ctx context.Context, key K) (V, error) { var nilData V if data, ok := m.Load(key); ok { @@ -71,7 +72,7 @@ func (m *Manager[K, V]) Fetch(ctx context.Context, key K) (V, error) { return fetchedData, nil } -func (m *Manager[K, V]) batchFetch(ctx context.Context, keys []K) ([]V, error) { +func (m *UserCache[K, V]) batchFetch(ctx context.Context, keys []K) ([]V, error) { if len(keys) == 0 { return nil, nil } @@ -106,7 +107,7 @@ func (m *Manager[K, V]) batchFetch(ctx context.Context, keys []K) ([]V, error) { return writeData, nil } -func (m *Manager[K, V]) fetch(ctx context.Context, key K) (V, error) { +func (m *UserCache[K, V]) fetch(ctx context.Context, key K) (V, error) { var writeData V if m.singleDBFunc != nil { dbData, err := m.singleDBFunc(ctx, key) diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index 3b62a6141..9fd66cb9c 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -200,6 +200,7 @@ const ( SendSignalMsg = 1004 PullMsgBySeqList = 1005 GetConvMaxReadSeq = 1006 + PullConvLastMessage = 1007 PushMsg = 2001 KickOnlineMsg = 2002 LogoutMsg = 2003 diff --git a/pkg/sdk_params_callback/conversation_msg_sdk_struct.go b/pkg/sdk_params_callback/conversation_msg_sdk_struct.go index eff3e6174..bce86d0bf 100644 --- a/pkg/sdk_params_callback/conversation_msg_sdk_struct.go +++ b/pkg/sdk_params_callback/conversation_msg_sdk_struct.go @@ -32,6 +32,7 @@ type GetAdvancedHistoryMessageListParams struct { ConversationID string `json:"conversationID"` StartClientMsgID string `json:"startClientMsgID"` Count int `json:"count"` + ViewType int `json:"viewType"` } type GetAdvancedHistoryMessageListCallback struct { diff --git a/pkg/sdkerrs/code.go b/pkg/sdkerrs/code.go index bbcd37dbe..f2c19935b 100644 --- a/pkg/sdkerrs/code.go +++ b/pkg/sdkerrs/code.go @@ -39,6 +39,7 @@ const ( MsgRepeatError = 10204 // Message repeated MsgContentTypeNotSupportError = 10205 // Message content type not supported MsgHasNoSeqError = 10206 // Message does not have a sequence number + MsgHasDeletedError = 10207 // Message has been deleted // Conversation-related errors NotSupportOptError = 10301 // Operation not supported diff --git a/pkg/sdkerrs/predefine.go b/pkg/sdkerrs/predefine.go index d08d1472a..97a5aaea8 100644 --- a/pkg/sdkerrs/predefine.go +++ b/pkg/sdkerrs/predefine.go @@ -37,6 +37,7 @@ var ( ErrMsgRepeated = errs.NewCodeError(MsgRepeatError, "Only failed messages can be resent") ErrMsgContentTypeNotSupport = errs.NewCodeError(MsgContentTypeNotSupportError, "Message content type not supported") ErrMsgHasNoSeq = errs.NewCodeError(MsgHasNoSeqError, "Message has no sequence number") + ErrMsgHasDeleted = errs.NewCodeError(MsgHasDeletedError, "Message has been deleted") // Conversation-related errors ErrNotSupportOpt = errs.NewCodeError(NotSupportOptError, "Operation not supported for supergroup") diff --git a/test/conversation_test.go b/test/conversation_test.go index 1daec28c4..33080867f 100644 --- a/test/conversation_test.go +++ b/test/conversation_test.go @@ -17,6 +17,7 @@ package test import ( "context" "testing" + "time" "github.com/openimsdk/openim-sdk-core/v3/open_im_sdk" "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback" @@ -32,6 +33,8 @@ func Test_GetAllConversationList(t *testing.T) { for _, conversation := range conversations { t.Log(conversation) } + t.Log(len(conversations)) + time.Sleep(time.Second * 100) } func Test_GetConversationListSplit(t *testing.T) { diff --git a/test/create_msg_test.go b/test/create_msg_test.go index b5d8fa721..6ce151785 100644 --- a/test/create_msg_test.go +++ b/test/create_msg_test.go @@ -147,7 +147,7 @@ func Test_CreateForwardMessage(t *testing.T) { } func Test_FetchSurroundingMessages(t *testing.T) { - msgs, err := open_im_sdk.UserForSDK.Conversation().FetchSurroundingMessages(ctx, "sg_3559850526", 15, 14, 8) + msgs, err := open_im_sdk.UserForSDK.Conversation().FetchSurroundingMessages(ctx, &sdk_struct.MsgStruct{}, 15, 14) if err != nil { t.Error(err) return