Skip to content

Commit

Permalink
backfill: mark as read after forward backfill if channel is read
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Jul 12, 2024
1 parent 1cd16ee commit 1c9a035
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/connector/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ func (s *SlackClient) FetchMessages(ctx context.Context, params bridgev2.FetchMe
return nil, err
}
convertedMessages := make([]*bridgev2.BackfillMessage, len(chunk.Messages))
var maxMsgID string
for i, msg := range chunk.Messages {
convertedMessages[i] = s.wrapBackfillMessage(ctx, params.Portal, &msg.Msg)
if maxMsgID < msg.Timestamp {
maxMsgID = msg.Timestamp
}
}
slices.Reverse(convertedMessages)
lastRead := s.getLastReadCache(channelID)
return &bridgev2.FetchMessagesResponse{
Messages: convertedMessages,
Cursor: networkid.PaginationCursor(chunk.ResponseMetadata.Cursor),
HasMore: chunk.HasMore,
Forward: params.Forward,
MarkRead: lastRead != "" && maxMsgID != "" && lastRead >= maxMsgID,
}, nil
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s *SlackConnector) LoadUserLogin(ctx context.Context, login *bridgev2.User
TeamID: teamID,

chatInfoCache: make(map[string]chatInfoCacheEntry),
lastReadCache: make(map[string]string),
}
}
teamPortalKey := networkid.PortalKey{ID: slackid.MakeTeamPortalID(teamID)}
Expand Down Expand Up @@ -101,6 +102,8 @@ type SlackClient struct {

chatInfoCache map[string]chatInfoCacheEntry
chatInfoCacheLock sync.Mutex
lastReadCache map[string]string
lastReadCacheLock sync.Mutex
}

var _ bridgev2.NetworkAPI = (*SlackClient)(nil)
Expand Down Expand Up @@ -157,6 +160,18 @@ func (s *SlackClient) syncTeamPortal(ctx context.Context) error {
return nil
}

func (s *SlackClient) setLastReadCache(channelID, ts string) {
s.lastReadCacheLock.Lock()
s.lastReadCache[channelID] = ts
s.lastReadCacheLock.Unlock()
}

func (s *SlackClient) getLastReadCache(channelID string) string {
s.lastReadCacheLock.Lock()
defer s.lastReadCacheLock.Unlock()
return s.lastReadCache[channelID]
}

func (s *SlackClient) SyncChannels(ctx context.Context) {
log := zerolog.Ctx(ctx)
clientCounts, err := s.Client.ClientCountsContext(ctx, &slack.ClientCountsParams{
Expand All @@ -169,15 +184,22 @@ func (s *SlackClient) SyncChannels(ctx context.Context) {
return
}
latestMessageIDs := make(map[string]string, len(clientCounts.Channels)+len(clientCounts.MpIMs)+len(clientCounts.IMs))
lastReadCache := make(map[string]string, len(clientCounts.Channels)+len(clientCounts.MpIMs)+len(clientCounts.IMs))
for _, ch := range clientCounts.Channels {
latestMessageIDs[ch.ID] = ch.Latest
lastReadCache[ch.ID] = ch.LastRead
}
for _, ch := range clientCounts.MpIMs {
latestMessageIDs[ch.ID] = ch.Latest
lastReadCache[ch.ID] = ch.LastRead
}
for _, ch := range clientCounts.IMs {
latestMessageIDs[ch.ID] = ch.Latest
lastReadCache[ch.ID] = ch.LastRead
}
s.lastReadCacheLock.Lock()
s.lastReadCache = lastReadCache
s.lastReadCacheLock.Unlock()
userPortals, err := s.UserLogin.Bridge.DB.UserPortal.GetAllForLogin(ctx, s.UserLogin.UserLogin)
if err != nil {
log.Err(err).Msg("Failed to fetch user portals")
Expand Down
3 changes: 3 additions & 0 deletions pkg/connector/handleslack.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ func (s *SlackClient) wrapEvent(ctx context.Context, rawEvt any) (bridgev2.Remot

case *slack.ChannelMarkedEvent:
meta, metaErr = s.makeEventMeta(ctx, evt.Channel, nil, s.UserID, evt.Timestamp)
s.setLastReadCache(evt.Channel, evt.Timestamp)
wrapped = wrapReadReceipt(&meta)
case *slack.IMMarkedEvent:
meta, metaErr = s.makeEventMeta(ctx, evt.Channel, nil, s.UserID, evt.Timestamp)
s.setLastReadCache(evt.Channel, evt.Timestamp)
wrapped = wrapReadReceipt(&meta)
case *slack.GroupMarkedEvent:
meta, metaErr = s.makeEventMeta(ctx, evt.Channel, nil, s.UserID, evt.Timestamp)
s.setLastReadCache(evt.Channel, evt.Timestamp)
wrapped = wrapReadReceipt(&meta)

case *slack.ChannelJoinedEvent:
Expand Down

0 comments on commit 1c9a035

Please sign in to comment.