Skip to content

Commit

Permalink
Merge pull request #560 from twmb/524
Browse files Browse the repository at this point in the history
kadm: change FetchOffsetsForTopics to only return requested topics by default
  • Loading branch information
twmb authored Sep 21, 2023
2 parents d762f76 + 00ac608 commit d3d9005
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,14 @@ func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetRespons
return rs, nil
}

// FetchAllGroupTopics is a kadm "internal" topic name that can be used in
// [FetchOffsetsForTopics]. By default, [FetchOffsetsForTopics] only returns
// topics that are explicitly requested. Other topics that may be committed to
// in the group are not returned. Using FetchAllRequestedTopics switches the
// behavior to return the union of all committed topics and all requested
// topics.
const FetchAllGroupTopics = "|fetch-all-group-topics|"

// FetchOffsetsForTopics is a helper function that returns the currently
// committed offsets for the given group, as well as default -1 offsets for any
// topic/partition that does not yet have a commit.
Expand All @@ -838,9 +846,31 @@ func (cl *Client) FetchOffsets(ctx context.Context, group string) (OffsetRespons
// error. The returned offset responses are ready to be used or converted
// directly to pure offsets with `Into`, and again into kgo offsets with
// another `Into`.
//
// By default, this function returns offsets for only the requested topics. You
// can use the special "topic" [FetchAllGroupTopics] to return all committed-to
// topics in addition to all requested topics.
func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topics ...string) (OffsetResponses, error) {
os := make(Offsets)

var all bool
keept := topics[:0]
for _, topic := range topics {
if topic == FetchAllGroupTopics {
all = true
continue
}
keept = append(keept, topic)
}
topics = keept

if !all && len(topics) == 0 {
return make(OffsetResponses), nil
}

// We have to request metadata to learn all partitions in all the
// topics. The default returned offset for all partitions is filled in
// to be -1.
if len(topics) > 0 {
listed, err := cl.ListTopics(ctx, topics...)
if err != nil {
Expand All @@ -865,11 +895,29 @@ func (cl *Client) FetchOffsetsForTopics(ctx context.Context, group string, topic
if err := resps.Error(); err != nil {
return nil, fmt.Errorf("offset fetches had a load error, first error: %w", err)
}

// For any topic (and any partition) we explicitly asked for, if the
// partition does not exist in the response, we fill the default -1
// from above.
os.Each(func(o Offset) {
if _, ok := resps.Lookup(o.Topic, o.Partition); !ok {
resps.Add(OffsetResponse{Offset: o})
}
})

// If we are not requesting all group offsets, then we strip any topic
// that was not explicitly requested.
if !all {
tset := make(map[string]struct{})
for _, t := range topics {
tset[t] = struct{}{}
}
for t := range resps {
if _, ok := tset[t]; !ok {
delete(resps, t)
}
}
}
return resps, nil
}

Expand Down

0 comments on commit d3d9005

Please sign in to comment.