Skip to content

Commit

Permalink
kadm: change FetchOffsetsForTopics to only return requested topics by…
Browse files Browse the repository at this point in the history
… default

Closes #524.
  • Loading branch information
twmb committed Sep 16, 2023
1 parent 4ad9ea5 commit 5556f88
Showing 1 changed file with 49 additions and 1 deletion.
50 changes: 49 additions & 1 deletion pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,17 +830,47 @@ func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetRespons
return rs, nil
}

// FetchAllGroupTopics is a kadm "internal" topic name that can be used in
// [FetchOffsetsForTopics]. By default, [FetchOffsetsForTopics] only returns
// topics that are explicitly requested. Other topics that may be committed to
// in the group are not returned. Using FetchAllRequestedTopics switches the
// behavior to return the union of all committed topics and all requested
// topics.
const FetchAllGroupTopics = "|fetch-all-group-topics|"

// FetchOffsetsForTopics is a helper function that returns the currently
// committed offsets for the given group, as well as default -1 offsets for any
// topic/partition that does not yet have a commit.
// topic/partition that does not yet have a commit. Note that if
//
// If any partition fetched or listed has an error, this function returns an
// error. The returned offset responses are ready to be used or converted
// directly to pure offsets with `Into`, and again into kgo offsets with
// another `Into`.
//
// By default, this function returns offsets for only the requested topics. You
// can use the special "topic" [FetchAllGroupTopics] to return all committed-to
// topics in addition to all requested topics.
func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error) {
os := make(Offsets)

var all bool
keept := topics[:0]
for _, topic := range topics {
if topic == FetchAllGroupTopics {
all = true
continue
}
keept = append(keept, topic)
}
topics = keept

if !all && len(topics) == 0 {
return make(OffsetResponses), nil
}

// We have to request metadata to learn all partitions in all the
// topics. The default returned offset for all partitions is filled in
// to be -1.
if len(topics) > 0 {
listed, err := cl.ListTopics(ctx, topics...)
if err != nil {
Expand All @@ -865,11 +895,29 @@ func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topic
if err := resps.Error(); err != nil {
return nil, fmt.Errorf("offset fetches had a load error, first error: %w", err)
}

// For any topic (and any partition) we explicitly asked for, if the
// partition does not exist in the response, we fill the default -1
// from above.
os.Each(func(o Offset) {
if _, ok := resps.Lookup(o.Topic, o.Partition); !ok {
resps.Add(OffsetResponse{Offset: o})
}
})

// If we are not requesting all group offsets, then we strip any topic
// that was not explicitly requested.
if !all {
tset := make(map[string]struct{})
for _, t := range topics {
tset[t] = struct{}{}
}
for t := range resps {
if _, ok := tset[t]; !ok {
delete(resps, t)
}
}
}
return resps, nil
}

Expand Down

0 comments on commit 5556f88

Please sign in to comment.