Skip to content

Commit

Permalink
kgo: be sure to use topics when other topics are paused
Browse files Browse the repository at this point in the history
Follow up from #585, there was a bug in the commit for it. If any topic
was paused, then all non-paused topics would be returned once, but they
would not be marked as fetchable after that.

I _think_ the non-fetchability would eventually be cleared on a metadata
update, _but_ the source would re-fetch from the old position again. The
only way the topic would advance would be if no topics were paused after
the metadata update.

However this is a bit confusing, and overall this patch is required.

This also patches a second bug in PollFetches with pausing: if a topic
has a paused partition, if the fetch response does NOT contain any
paused partitions, then the logic would actually strip the entire topic.

The pause tests have been strengthened a good bit -- all lines but one
are hit, and the one line that is not hit could more easily be hit if
more partitions are added to the topic / a cluster of size one is used.
The line is currently not hit because it requires one paused partition
and one unpaused partition to be returned from the same broker at the
same time.
  • Loading branch information
twmb committed Oct 31, 2023
1 parent 6ebcb43 commit 4ab819d
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 30 deletions.
155 changes: 129 additions & 26 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) {
}
}

func closed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

func TestPauseIssue489(t *testing.T) {
t.Parallel()

t1, cleanup := tmpTopicPartitions(t, 2)
t1, cleanup := tmpTopicPartitions(t, 3)
defer cleanup()

cl, _ := NewClient(
Expand All @@ -282,47 +291,141 @@ func TestPauseIssue489(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var zeroOne uint8
var which uint8
for !exit.Load() {
r := StringRecord("v")
r.Partition = int32(zeroOne % 2)
zeroOne++
r.Partition = int32(which % 3)
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
}()
defer cancel()

for i := 0; i < 10; i++ {
var sawZero, sawOne bool
for !sawZero || !sawOne {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
})
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
} else {
fs = cl.PollRecords(ctx, 2)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawZero, sawOne, sawTwo bool
for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne, sawTwo = false, false, false
for i := 0; i < 50 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
cancel()
if sawZero {
t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name)
}
if !sawOne {
t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name)
}
if !sawTwo {
t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}
}

func TestPauseIssueOct2023(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
t2, cleanup2 := tmpTopicPartitions(t, 1)
t3, cleanup3 := tmpTopicPartitions(t, 1)
defer cleanup1()
defer cleanup2()
defer cleanup3()
ts := []string{t1, t2, t3}

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
ConsumeTopics(ts...),
FetchMaxWait(100*time.Millisecond),
)
defer cl.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var which int
for !exit.Load() {
r := StringRecord("v")
r.Topic = ts[which%len(ts)]
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
if sawZero {
t.Error("saw partition zero even though it was paused")
}()
defer cancel()

for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawt1, sawt2, sawt3 bool
for (!sawt1 || !sawt2) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cl.PauseFetchTopics(t1)
sawt1, sawt2, sawt3 = false, false, false
for i := 0; i < 50 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cancel()
if sawt1 {
t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name)
}
if !sawt2 {
t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name)
}
if !sawt3 {
t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name)
}
cl.ResumeFetchTopics(t1)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}

Expand Down
21 changes: 17 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
// and strip the topic entirely.
pps, ok := paused.t(t)
if !ok {
for _, o := range ps {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
continue
}
if strip == nil {
Expand All @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
continue
}
stript := make(map[int32]struct{})
strip[t] = stript
for _, o := range ps {
if _, ok := pps.m[o.from.partition]; ok {
o.from.allowUsable()
Expand All @@ -378,6 +381,9 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
if len(stript) > 0 {
strip[t] = stript
}
}
})
if strip != nil {
Expand Down Expand Up @@ -435,9 +441,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

r.Topics = append(r.Topics, *t)
rt := &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
var rt *FetchTopic
ensureTopicAdded := func() {
if rt != nil {
return
}
r.Topics = append(r.Topics, *t)
rt = &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
}

tCursors := b.usedOffsets[t.Topic]

Expand All @@ -455,6 +467,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

ensureTopicAdded()
rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]

Expand Down

0 comments on commit 4ab819d

Please sign in to comment.