diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 96a6b10..1018697 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -59,15 +59,21 @@ func (s *SlackClient) FetchMessages(ctx context.Context, params bridgev2.FetchMe return nil, err } convertedMessages := make([]*bridgev2.BackfillMessage, len(chunk.Messages)) + var maxMsgID string for i, msg := range chunk.Messages { convertedMessages[i] = s.wrapBackfillMessage(ctx, params.Portal, &msg.Msg) + if maxMsgID < msg.Timestamp { + maxMsgID = msg.Timestamp + } } slices.Reverse(convertedMessages) + lastRead := s.getLastReadCache(channelID) return &bridgev2.FetchMessagesResponse{ Messages: convertedMessages, Cursor: networkid.PaginationCursor(chunk.ResponseMetadata.Cursor), HasMore: chunk.HasMore, Forward: params.Forward, + MarkRead: lastRead != "" && maxMsgID != "" && lastRead >= maxMsgID, }, nil } diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 6b75c9d..919f22d 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -72,6 +72,7 @@ func (s *SlackConnector) LoadUserLogin(ctx context.Context, login *bridgev2.User TeamID: teamID, chatInfoCache: make(map[string]chatInfoCacheEntry), + lastReadCache: make(map[string]string), } } teamPortalKey := networkid.PortalKey{ID: slackid.MakeTeamPortalID(teamID)} @@ -101,6 +102,8 @@ type SlackClient struct { chatInfoCache map[string]chatInfoCacheEntry chatInfoCacheLock sync.Mutex + lastReadCache map[string]string + lastReadCacheLock sync.Mutex } var _ bridgev2.NetworkAPI = (*SlackClient)(nil) @@ -157,6 +160,18 @@ func (s *SlackClient) syncTeamPortal(ctx context.Context) error { return nil } +func (s *SlackClient) setLastReadCache(channelID, ts string) { + s.lastReadCacheLock.Lock() + s.lastReadCache[channelID] = ts + s.lastReadCacheLock.Unlock() +} + +func (s *SlackClient) getLastReadCache(channelID string) string { + s.lastReadCacheLock.Lock() + defer s.lastReadCacheLock.Unlock() + return s.lastReadCache[channelID] +} + func (s *SlackClient) SyncChannels(ctx context.Context) { log := zerolog.Ctx(ctx) clientCounts, err := s.Client.ClientCountsContext(ctx, &slack.ClientCountsParams{ @@ -169,15 +184,22 @@ func (s *SlackClient) SyncChannels(ctx context.Context) { return } latestMessageIDs := make(map[string]string, len(clientCounts.Channels)+len(clientCounts.MpIMs)+len(clientCounts.IMs)) + lastReadCache := make(map[string]string, len(clientCounts.Channels)+len(clientCounts.MpIMs)+len(clientCounts.IMs)) for _, ch := range clientCounts.Channels { latestMessageIDs[ch.ID] = ch.Latest + lastReadCache[ch.ID] = ch.LastRead } for _, ch := range clientCounts.MpIMs { latestMessageIDs[ch.ID] = ch.Latest + lastReadCache[ch.ID] = ch.LastRead } for _, ch := range clientCounts.IMs { latestMessageIDs[ch.ID] = ch.Latest + lastReadCache[ch.ID] = ch.LastRead } + s.lastReadCacheLock.Lock() + s.lastReadCache = lastReadCache + s.lastReadCacheLock.Unlock() userPortals, err := s.UserLogin.Bridge.DB.UserPortal.GetAllForLogin(ctx, s.UserLogin.UserLogin) if err != nil { log.Err(err).Msg("Failed to fetch user portals") diff --git a/pkg/connector/handleslack.go b/pkg/connector/handleslack.go index 2e8fbf8..26547f4 100644 --- a/pkg/connector/handleslack.go +++ b/pkg/connector/handleslack.go @@ -146,12 +146,15 @@ func (s *SlackClient) wrapEvent(ctx context.Context, rawEvt any) (bridgev2.Remot case *slack.ChannelMarkedEvent: meta, metaErr = s.makeEventMeta(ctx, evt.Channel, nil, s.UserID, evt.Timestamp) + s.setLastReadCache(evt.Channel, evt.Timestamp) wrapped = wrapReadReceipt(&meta) case *slack.IMMarkedEvent: meta, metaErr = s.makeEventMeta(ctx, evt.Channel, nil, s.UserID, evt.Timestamp) + s.setLastReadCache(evt.Channel, evt.Timestamp) wrapped = wrapReadReceipt(&meta) case *slack.GroupMarkedEvent: meta, metaErr = s.makeEventMeta(ctx, evt.Channel, nil, s.UserID, evt.Timestamp) + s.setLastReadCache(evt.Channel, evt.Timestamp) wrapped = wrapReadReceipt(&meta) case *slack.ChannelJoinedEvent: