Skip to content

Commit

Permalink
feat: add a function to quickly retrieve the context messages for a g…
Browse files Browse the repository at this point in the history
…iven message. (#827)

* fix: quote message change to revoke message when app from background to foreground and message status update.

Signed-off-by: Gordon <[email protected]>

* feat: add a function to quickly retrieve the context messages for a given message.

Signed-off-by: Gordon <[email protected]>

* refactor: the SDK interface using the pb protocol to replace json.

Signed-off-by: Gordon <[email protected]>

---------

Signed-off-by: Gordon <[email protected]>
  • Loading branch information
FGadvancer authored Jan 3, 2025
1 parent 9270ddc commit f23f7d6
Show file tree
Hide file tree
Showing 19 changed files with 191 additions and 99 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require golang.org/x/net v0.29.0 // indirect

require (
github.com/google/go-cmp v0.6.0
github.com/openimsdk/protocol v0.0.72-alpha.63
github.com/openimsdk/protocol v0.0.72-alpha.70
github.com/openimsdk/tools v0.0.50-alpha.21
github.com/patrickmn/go-cache v2.1.0+incompatible
golang.org/x/image v0.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/openimsdk/protocol v0.0.72-alpha.63 h1:IyPBibEvwBtTmD8DSrlqcekfEXe74k4+KeeHsgdhGh0=
github.com/openimsdk/protocol v0.0.72-alpha.63/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ=
github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.21 h1:ZKgSFkiBjz6KcNZlNwvrSoUYJ7K5Flan8wHuRBH3VqY=
github.com/openimsdk/tools v0.0.50-alpha.21/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
Expand Down
59 changes: 59 additions & 0 deletions internal/conversation_msg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
pconstant "github.com/openimsdk/protocol/constant"

"github.com/openimsdk/tools/errs"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/content_type"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/server_api_params"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
Expand Down Expand Up @@ -1014,3 +1016,60 @@ func (c *Conversation) SearchConversation(ctx context.Context, searchParam strin
// Return the list of conversations
return apiConversations, nil
}
func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) {
return c.typing.GetInputStates(conversationID, userID), nil
}

func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
return c.typing.ChangeInputStates(ctx, conversationID, focus)
}

func (c *Conversation) FetchSurroundingMessages(ctx context.Context, s *sdk_struct.MsgStruct, before int, after int) ([]*sdk_struct.MsgStruct, error) {
conversationID := utils.GetConversationIDByMsg(s)
var message *model_struct.LocalChatLog
message, err := c.db.GetMessage(ctx, conversationID, s.ClientMsgID)
if err == nil {
if message.Status >= constant.MsgStatusHasDeleted {
return nil, sdkerrs.ErrMsgHasDeleted
}
} else {
if s.Seq == 0 {
return nil, sdkerrs.ErrMsgHasDeleted
}
var messages []*model_struct.LocalChatLog
c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{s.Seq}, false, 1, 0, &messages, &sdk.GetAdvancedHistoryMessageListCallback{})
if len(messages) < 1 {
return nil, sdkerrs.ErrMsgHasDeleted
}
message = messages[0]
}

result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: before,
StartClientMsgID: s.ClientMsgID,
ViewType: cache.ViewSearch,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, LocalChatLogToMsgStruct(message))
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
ConversationID: conversationID,
Count: after,
StartClientMsgID: s.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
return result, nil
}
22 changes: 11 additions & 11 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
startTime = m.SendTime
} else {
// Clear both maps when the user enters the conversation
c.messagePullForwardEndSeqMap.Delete(conversationID)
c.messagePullReverseEndSeqMap.Delete(conversationID)
c.messagePullForwardEndSeqMap.Delete(conversationID, req.ViewType)
c.messagePullReverseEndSeqMap.Delete(conversationID, req.ViewType)
}
log.ZDebug(ctx, "Assembly conversation parameters", "cost time", time.Since(t), "conversationID",
conversationID, "startTime:", startTime, "count:", req.Count, "startTime", startTime)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, &messageListCallback)
list, err := c.fetchMessagesWithGapCheck(ctx, conversationID, req.Count, startTime, isReverse, req.ViewType, &messageListCallback)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +91,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}

func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversationID string,
count int, startTime int64, isReverse bool, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {
count int, startTime int64, isReverse bool, viewType int, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) ([]*model_struct.LocalChatLog, error) {

var list, validMessages []*model_struct.LocalChatLog

Expand Down Expand Up @@ -125,8 +125,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
}
if !isReverse {
if thisEndSeq != 0 {
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key)
c.messagePullForwardEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullForwardEndSeqMap.Load(key, viewType)
if value < lastEndSeq || lastEndSeq == 0 {
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
return true
Expand All @@ -138,8 +138,8 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
}
} else {
if thisEndSeq != 0 {
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key)
c.messagePullReverseEndSeqMap.StoreWithFunc(conversationID, viewType, thisEndSeq, func(key string, value int64) bool {
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(key, viewType)
if value > lastEndSeq || lastEndSeq == 0 {
log.ZDebug(ctx, "update the end sequence of the message", "lastEndSeq", lastEndSeq, "thisEndSeq", value)
return true
Expand Down Expand Up @@ -175,10 +175,10 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
log.ZDebug(ctx, "internal continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
t = time.Now()
c.validateAndFillInterBlockGaps(ctx, thisStartSeq, conversationID,
isReverse, count, startTime, &list, messageListCallback)
isReverse, viewType, count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "between continuity check", "cost time", time.Since(t), "thisStartSeq", thisStartSeq)
t = time.Now()
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse,
c.validateAndFillEndBlockContinuity(ctx, conversationID, isReverse, viewType,
count, startTime, &list, messageListCallback)
log.ZDebug(ctx, "end continuity check", "cost time", time.Since(t))
// If the number of valid messages retrieved is less than the count,
Expand All @@ -188,7 +188,7 @@ func (c *Conversation) fetchMessagesWithGapCheck(ctx context.Context, conversati
newStartTime := getNewStartTime(list)
log.ZDebug(ctx, "fetch more messages", "missingCount", missingCount, "conversationID",
conversationID, "newStartTime", newStartTime)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, messageListCallback)
missingMessages, err := c.fetchMessagesWithGapCheck(ctx, conversationID, missingCount, newStartTime, isReverse, viewType, messageListCallback)
if err != nil {
return nil, err
}
Expand Down
62 changes: 4 additions & 58 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"math"
"sync"

sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"

"github.com/openimsdk/openim-sdk-core/v3/pkg/api"
"github.com/openimsdk/openim-sdk-core/v3/pkg/cache"
"github.com/openimsdk/tools/utils/stringutil"
Expand Down Expand Up @@ -67,8 +65,8 @@ type Conversation struct {
file *file.File
cache *cache.Cache[string, *model_struct.LocalConversation]
maxSeqRecorder MaxSeqRecorder
messagePullForwardEndSeqMap *cache.Cache[string, int64]
messagePullReverseEndSeqMap *cache.Cache[string, int64]
messagePullForwardEndSeqMap *cache.ConversationSeqContextCache
messagePullReverseEndSeqMap *cache.ConversationSeqContextCache
IsExternalExtensions bool
msgOffset int
progress int
Expand Down Expand Up @@ -108,8 +106,8 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr,
file: file,
IsExternalExtensions: info.IsExternalExtensions(),
maxSeqRecorder: NewMaxSeqRecorder(),
messagePullForwardEndSeqMap: cache.NewCache[string, int64](),
messagePullReverseEndSeqMap: cache.NewCache[string, int64](),
messagePullForwardEndSeqMap: cache.NewConversationSeqContextCache(),
messagePullReverseEndSeqMap: cache.NewConversationSeqContextCache(),
msgOffset: 0,
progress: 0,
}
Expand Down Expand Up @@ -906,55 +904,3 @@ func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string)
}
return userInfo.FaceURL, userInfo.Nickname, nil
}

func (c *Conversation) GetInputStates(ctx context.Context, conversationID string, userID string) ([]int32, error) {
return c.typing.GetInputStates(conversationID, userID), nil
}

func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
return c.typing.ChangeInputStates(ctx, conversationID, focus)
}

func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversationID string, seq int64, before int64, after int64) ([]*sdk_struct.MsgStruct, error) {
c.fetchAndMergeMissingMessages(ctx, conversationID, []int64{seq}, false, 0, 0, &[]*model_struct.LocalChatLog{}, &sdk.GetAdvancedHistoryMessageListCallback{})
res, err := c.db.GetMessagesBySeqs(ctx, conversationID, []int64{seq})
if err != nil {
return nil, err
}
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
//_, msgList := c.LocalChatLog2MsgStruct []*model_struct.LocalChatLog{res[0]})
//if len(msgList) == 0 {
// return []*sdk_struct.MsgStruct{}, nil
//}
//msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
//if before > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(before),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//result = append(result, msg)
//if after > 0 {
// req := sdk.GetAdvancedHistoryMessageListParams{
// ConversationID: conversationID,
// Count: int(after),
// StartClientMsgID: msg.ClientMsgID,
// }
// val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
// if err != nil {
// return nil, err
// }
// result = append(result, val.MessageList...)
//}
//sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
18 changes: 9 additions & 9 deletions internal/conversation_msg/message_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ func (c *Conversation) validateAndFillInternalGaps(ctx context.Context, conversa
// validateAndFillInterBlockGaps checks for continuity between blocks of messages. If a gap is identified, it retrieves the missing messages
// to bridge the gap. The function returns a boolean indicating whether the blocks are continuous.
func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisStartSeq int64, conversationID string,
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {

var lastEndSeq, startSeq, endSeq int64
var isLostSeq bool
if isReverse {
lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID)
lastEndSeq, _ = c.messagePullReverseEndSeqMap.Load(conversationID, viewType)
isLostSeq = lastEndSeq+1 != thisStartSeq
startSeq = lastEndSeq + 1
endSeq = thisStartSeq - 1
} else {
lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID)
lastEndSeq, _ = c.messagePullForwardEndSeqMap.Load(conversationID, viewType)
isLostSeq = thisStartSeq+1 != lastEndSeq
startSeq = thisStartSeq + 1
endSeq = lastEndSeq - 1
Expand All @@ -73,15 +73,15 @@ func (c *Conversation) validateAndFillInterBlockGaps(ctx context.Context, thisSt
// internal and inter-block continuity checks but contains fewer messages than `count`, this function verifies if the end
// of the message history has been reached. If not, it attempts to retrieve any missing messages to ensure continuity.
func (c *Conversation) validateAndFillEndBlockContinuity(ctx context.Context, conversationID string,
isReverse bool, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback)
isReverse bool, viewType, count int, startTime int64, list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) {
isShouldFetchMessage, lostSeqList := c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback)
if isShouldFetchMessage {
c.fetchAndMergeMissingMessages(ctx, conversationID, lostSeqList, isReverse, count, startTime, list, messageListCallback)
_, _ = c.checkEndBlock(ctx, conversationID, isReverse, count, list, messageListCallback)
_, _ = c.checkEndBlock(ctx, conversationID, isReverse, viewType, count, list, messageListCallback)
}

}
func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, count int,
func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string, isReverse bool, viewType, count int,
list *[]*model_struct.LocalChatLog, messageListCallback *sdk.GetAdvancedHistoryMessageListCallback) (isShouldFetchMessage bool, seqList []int64) {
// Perform an end-of-block check if the retrieved message count is less than requested
if len(*list) < count {
Expand All @@ -94,7 +94,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
if maxSeq >= currentMaxSeq {
messageListCallback.IsEnd = true
} else {
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID)
lastEndSeq, _ := c.messagePullReverseEndSeqMap.Load(conversationID, viewType)
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastEndSeq", lastEndSeq, "conversationID", conversationID)
// If `maxSeq` is zero and `lastEndSeq` is at the maximum server sequence, this batch is fully local
if maxSeq == 0 && lastEndSeq >= currentMaxSeq { // All messages in this batch are local messages,
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *Conversation) checkEndBlock(ctx context.Context, conversationID string,
if minSeq <= userCanPullMinSeq {
messageListCallback.IsEnd = true
} else {
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID)
lastMinSeq, _ := c.messagePullForwardEndSeqMap.Load(conversationID, viewType)
log.ZDebug(ctx, "validateAndFillEndBlockContinuity", "lastMinSeq", lastMinSeq, "conversationID", conversationID)
// If `minSeq` is zero and `lastMinSeq` is at the minimum server sequence, this batch is fully local
if minSeq == 0 && lastMinSeq <= userCanPullMinSeq { // All messages in this batch are local messages,
Expand Down
4 changes: 2 additions & 2 deletions internal/conversation_msg/read_drawing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"github.com/openimsdk/tools/errs"
Expand Down Expand Up @@ -52,7 +51,8 @@ func (c *Conversation) markConversationMessageAsRead(ctx context.Context, conver
return err
}
if conversation.UnreadCount == 0 {
return sdkerrs.ErrUnreadCount
log.ZWarn(ctx, "unread count is 0", nil, "conversationID", conversationID)
return nil
}
// get the maximum sequence number of messages in the table that are not sent by oneself
peerUserMaxSeq, err := c.db.GetConversationPeerNormalMsgSeq(ctx, conversationID)
Expand Down
2 changes: 2 additions & 0 deletions internal/interaction/long_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ func (c *LongConnMgr) handleMessage(message []byte) error {
fallthrough
case constant.GetConvMaxReadSeq:
fallthrough
case constant.PullConvLastMessage:
fallthrough
case constant.SendMsg:
fallthrough
case constant.SendSignalMsg:
Expand Down
Loading

0 comments on commit f23f7d6

Please sign in to comment.