Skip to content

Commit

Permalink
kgo: do not add all topics to internal tps map when regex consuming
Browse files Browse the repository at this point in the history
The internal tps map is meant to be what we store topicPartitions in
that we are candidates to be consumed. This is filtered in
assignPartitions to only opt-in partitions that are actually being
consumed.

It's not BAD if we store all topics in that map, but it's not the
intent. The rest of the client worked fine even with extra topics in the
map.

When regex consuming, the metadata function previously put all topics
into the map always. Now, we move the regex evaluation logic --
duplicated in both the direct and group consumers -- into one function
and use that for filtering within metadata.

This introduces a required sequence of filtering THEN finding
assignments, which is fine / was the way things operated anyway.

Moving the filtering to metadata (only in the regex consuming logic)
means that we no longer store information for topics we are not
consuming. Indirectly, this fixes a bug where `GetConsumeTopics` would
always return ALL topics when regex consuming, because
`GetConsumeTopics` always just returned what was in the `tps` field.

This adds a test for the fixed behavior, as well as tests that NOT regex
consuming always returns all topics the user is interested in.

Closes #810.
  • Loading branch information
twmb committed Oct 14, 2024
1 parent b66ceb7 commit 47f4858
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 40 deletions.
38 changes: 38 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
}
}

// filterMetadataAllTopics, called BEFORE doOnMetadataUpdate, evaluates
// all topics received against the user provided regex.
func (c *consumer) filterMetadataAllTopics(topics []string) []string {
c.mu.Lock()
defer c.mu.Unlock()

var rns reNews
defer rns.log(&c.cl.cfg)

var reSeen map[string]bool
if c.d != nil {
reSeen = c.d.reSeen
} else {
reSeen = c.g.reSeen
}

keep := topics[:0]
for _, topic := range topics {
want, seen := reSeen[topic]
if !seen {
for rawRe, re := range c.cl.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
reSeen[topic] = want
}
if want {
keep = append(keep, topic)
}
}
return keep
}

func (c *consumer) doOnMetadataUpdate() {
if !c.consuming() {
return
Expand Down
20 changes: 1 addition & 19 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,11 @@ func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset
func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
topics := d.tps.load()

var rns reNews
if d.cfg.regex {
defer rns.log(d.cfg)
}

toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
var useTopic bool
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
for rawRe, re := range d.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
d.reSeen[topic] = want
}
useTopic = want
useTopic, _ = d.reSeen[topic]

Check failure on line 72 in pkg/kgo/consumer_direct.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

S1005: unnecessary assignment to the blank identifier (gosimple)

Check failure on line 72 in pkg/kgo/consumer_direct.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

S1005: unnecessary assignment to the blank identifier (gosimple)
} else {
useTopic = d.m.onlyt(topic)
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -536,3 +537,68 @@ func TestIssue648(t *testing.T) {
t.Errorf("did not see ErrUnknownTopicOrPartition")
}
}

func TestIssue810(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
defer cleanup1()

_, cleanup2 := tmpTopicPartitions(t, 1)
defer cleanup2()

// Non-regex consuming: topics are available immediately.
{
cl, _ := newTestClient(
ConsumeTopics(t1),
UnknownTopicRetries(-1),
)
defer cl.Close()

topics := cl.GetConsumeTopics()
exp := []string{t1}

if !reflect.DeepEqual(topics, exp) {
t.Errorf("non-regex got %v != exp %v", topics, exp)
}
}

// Regex consuming: topics are available only after discovery.
{
cl, _ := newTestClient(
ConsumeTopics(t1),
ConsumeRegex(),
UnknownTopicRetries(-1),
)
defer cl.Close()

var (
ticker = time.NewTicker(100 * time.Millisecond)
fail = time.NewTimer(5 * time.Second)
failed bool
lastSaw []string
exp = []string{t1}
)

defer ticker.Stop()
defer fail.Stop()

out:
for {
select {
case <-ticker.C:
lastSaw = cl.GetConsumeTopics()
if reflect.DeepEqual(lastSaw, exp) {
break out
}
case <-fail.C:
failed = true
break out
}
}

if failed {
t.Errorf("did not see expected topics in time, last saw %v != exp %v", lastSaw, exp)
}
}
}
23 changes: 2 additions & 21 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type groupConsumer struct {
cooperative atomicBool // true if the group balancer chosen during Join is cooperative

// The data for topics that the user assigned. Metadata updates the
// atomic.Value in each pointer atomically. If we are consuming via
// regex, metadata grabs the lock to add new topics.
// atomic.Value in each pointer atomically.
tps *topicsPartitions

reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
Expand Down Expand Up @@ -1714,11 +1713,6 @@ func (g *groupConsumer) findNewAssignments() {
delta int
}

var rns reNews
if g.cfg.regex {
defer rns.log(&g.cl.cfg)
}

var numNewTopics int
toChange := make(map[string]change, len(topics))
for topic, topicPartitions := range topics {
Expand All @@ -1741,20 +1735,7 @@ func (g *groupConsumer) findNewAssignments() {
// support adding new regex).
useTopic := true
if g.cfg.regex {
want, seen := g.reSeen[topic]
if !seen {
for rawRe, re := range g.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
g.reSeen[topic] = want
}
useTopic = want
useTopic, _ = g.reSeen[topic]

Check failure on line 1738 in pkg/kgo/consumer_group.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

S1005: unnecessary assignment to the blank identifier (gosimple)

Check failure on line 1738 in pkg/kgo/consumer_group.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

S1005: unnecessary assignment to the blank identifier (gosimple)
}

// We only track using the topic if there are partitions for
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,14 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
for topic := range latest {
allTopics = append(allTopics, topic)
}

// We filter out topics will not match any of our regex's.
// This ensures that the `tps` field does not contain topics
// we will never use (the client works with misc. topics in
// there, but it's better to avoid it -- and allows us to use
// `tps` in GetConsumeTopics).
allTopics = c.filterMetadataAllTopics(allTopics)

tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

Expand Down

0 comments on commit 47f4858

Please sign in to comment.