From 4ab819db56bffbe20b11b30fccdd8d864ba1eafe Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 31 Oct 2023 08:13:42 -0600 Subject: [PATCH] kgo: be sure to use topics when other topics are paused Follow up from #585, there was a bug in the commit for it. If any topic was paused, then all non-paused topics would be returned once, but they would not be marked as fetchable after that. I _think_ the non-fetchability would eventually be cleared on a metadata update, _but_ the source would re-fetch from the old position again. The only way the topic would advance would be if no topics were paused after the metadata update. However this is a bit confusing, and overall this patch is required. This also patches a second bug in PollFetches with pausing: if a topic has a paused partition, if the fetch response does NOT contain any paused partitions, then the logic would actually strip the entire topic. The pause tests have been strengthened a good bit -- all lines but one are hit, and the one line that is not hit could more easily be hit if more partitions are added to the topic / a cluster of size one is used. The line is currently not hit because it requires one paused partition and one unpaused partition to be returned from the same broker at the same time. --- pkg/kgo/consumer_direct_test.go | 155 ++++++++++++++++++++++++++------ pkg/kgo/source.go | 21 ++++- 2 files changed, 146 insertions(+), 30 deletions(-) diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index ac385e8e..9fae1249 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) { } } +func closed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + func TestPauseIssue489(t *testing.T) { t.Parallel() - t1, cleanup := tmpTopicPartitions(t, 2) + t1, cleanup := tmpTopicPartitions(t, 3) defer cleanup() cl, _ := NewClient( @@ -282,47 +291,141 @@ func TestPauseIssue489(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { var exit atomic.Bool - var zeroOne uint8 + var which uint8 for !exit.Load() { r := StringRecord("v") - r.Partition = int32(zeroOne % 2) - zeroOne++ + r.Partition = int32(which % 3) + which++ cl.Produce(ctx, r, func(r *Record, err error) { if err == context.Canceled { exit.Store(true) } }) + time.Sleep(100 * time.Microsecond) } }() defer cancel() - for i := 0; i < 10; i++ { - var sawZero, sawOne bool - for !sawZero || !sawOne { - fs := cl.PollFetches(ctx) - fs.EachRecord(func(r *Record) { - sawZero = sawZero || r.Partition == 0 - sawOne = sawOne || r.Partition == 1 - }) - } - cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) - sawZero, sawOne = false, false + for _, pollfn := range []struct { + name string + fn func(context.Context) Fetches + }{ + {"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }}, + {"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }}, + } { for i := 0; i < 10; i++ { - var fs Fetches - if i < 5 { - fs = cl.PollFetches(ctx) - } else { - fs = cl.PollRecords(ctx, 2) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + var sawZero, sawOne, sawTwo bool + for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + sawTwo = sawTwo || r.Partition == 2 + }) } - fs.EachRecord(func(r *Record) { - sawZero = sawZero || r.Partition == 0 - sawOne = sawOne || r.Partition == 1 + cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) + sawZero, sawOne, sawTwo = false, false, false + for i := 0; i < 50 && !closed(ctx.Done()); i++ { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + sawTwo = sawTwo || r.Partition == 2 + }) + } + cancel() + if sawZero { + t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name) + } + if !sawOne { + t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name) + } + if !sawTwo { + t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name) + } + cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) + } + } +} + +func TestPauseIssueOct2023(t *testing.T) { + t.Parallel() + + t1, cleanup1 := tmpTopicPartitions(t, 1) + t2, cleanup2 := tmpTopicPartitions(t, 1) + t3, cleanup3 := tmpTopicPartitions(t, 1) + defer cleanup1() + defer cleanup2() + defer cleanup3() + ts := []string{t1, t2, t3} + + cl, _ := NewClient( + getSeedBrokers(), + UnknownTopicRetries(-1), + ConsumeTopics(ts...), + FetchMaxWait(100*time.Millisecond), + ) + defer cl.Close() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + var exit atomic.Bool + var which int + for !exit.Load() { + r := StringRecord("v") + r.Topic = ts[which%len(ts)] + which++ + cl.Produce(ctx, r, func(r *Record, err error) { + if err == context.Canceled { + exit.Store(true) + } }) + time.Sleep(100 * time.Microsecond) } - if sawZero { - t.Error("saw partition zero even though it was paused") + }() + defer cancel() + + for _, pollfn := range []struct { + name string + fn func(context.Context) Fetches + }{ + {"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }}, + {"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }}, + } { + for i := 0; i < 10; i++ { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + var sawt1, sawt2, sawt3 bool + for (!sawt1 || !sawt2) && !closed(ctx.Done()) { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawt1 = sawt1 || r.Topic == t1 + sawt2 = sawt2 || r.Topic == t2 + sawt3 = sawt3 || r.Topic == t3 + }) + } + cl.PauseFetchTopics(t1) + sawt1, sawt2, sawt3 = false, false, false + for i := 0; i < 50 && !closed(ctx.Done()); i++ { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawt1 = sawt1 || r.Topic == t1 + sawt2 = sawt2 || r.Topic == t2 + sawt3 = sawt3 || r.Topic == t3 + }) + } + cancel() + if sawt1 { + t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name) + } + if !sawt2 { + t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name) + } + if !sawt3 { + t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name) + } + cl.ResumeFetchTopics(t1) } - cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 3086e60d..75a8c884 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { // and strip the topic entirely. pps, ok := paused.t(t) if !ok { + for _, o := range ps { + o.from.setOffset(o.cursorOffset) + o.from.allowUsable() + } continue } if strip == nil { @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { continue } stript := make(map[int32]struct{}) - strip[t] = stript for _, o := range ps { if _, ok := pps.m[o.from.partition]; ok { o.from.allowUsable() @@ -378,6 +381,9 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { o.from.setOffset(o.cursorOffset) o.from.allowUsable() } + if len(stript) > 0 { + strip[t] = stript + } } }) if strip != nil { @@ -435,9 +441,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { continue } - r.Topics = append(r.Topics, *t) - rt := &r.Topics[len(r.Topics)-1] - rt.Partitions = nil + var rt *FetchTopic + ensureTopicAdded := func() { + if rt != nil { + return + } + r.Topics = append(r.Topics, *t) + rt = &r.Topics[len(r.Topics)-1] + rt.Partitions = nil + } tCursors := b.usedOffsets[t.Topic] @@ -455,6 +467,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { continue } + ensureTopicAdded() rt.Partitions = append(rt.Partitions, *p) rp := &rt.Partitions[len(rt.Partitions)-1]