From af5bc1f957562a03a26eb2317e03b1a507339f93 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 31 Oct 2023 08:13:42 -0600 Subject: [PATCH] kgo: be sure to use topics when other topics are paused Follow up from #585, there was a bug in the commit for it. If any topic was paused, then all non-paused topics would be returned once, but they would not be marked as fetchable after that. I _think_ the non-fetchability would eventually be cleared on a metadata update, _but_ the source would re-fetch from the old position again. The only way the topic would advance would be if no topics were paused after the metadata update. However this is a bit confusing, and overall this patch is required. This also patches a second bug in PollFetches with pausing: if a topic has a paused partition, if the fetch response does NOT contain any paused partitions, then the logic would actually strip the entire topic. The pause tests have been strengthened a good bit -- all lines but one are hit, and the one line that is not hit could more easily be hit if more partitions are added to the topic / a cluster of size one is used. The line is currently not hit because it requires one paused partition and one unpaused partition to be returned from the same broker at the same time. Lastly, this adds an error reason to why list or epoch is reloading, which was used briefly while investigating test slowness. --- pkg/kgo/consumer.go | 14 ++- pkg/kgo/consumer_direct_test.go | 156 ++++++++++++++++++++++++++------ pkg/kgo/source.go | 27 +++++- 3 files changed, 163 insertions(+), 34 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 6569a60d..ceaed081 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1763,10 +1763,16 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload // guard this entire function. debug := s.c.cl.cfg.logger.Level() >= LogLevelDebug - var using, reloading map[string]map[int32]EpochOffset + + var using map[string]map[int32]EpochOffset + type epochOffsetWhy struct { + EpochOffset + error + } + var reloading map[string]map[int32]epochOffsetWhy if debug { using = make(map[string]map[int32]EpochOffset) - reloading = make(map[string]map[int32]EpochOffset) + reloading = make(map[string]map[int32]epochOffsetWhy) defer func() { t := "list" if loaded.loadType == loadTypeEpoch { @@ -1818,10 +1824,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload if debug { treloading := reloading[load.topic] if treloading == nil { - treloading = make(map[int32]EpochOffset) + treloading = make(map[int32]epochOffsetWhy) reloading[load.topic] = treloading } - treloading[load.partition] = EpochOffset{load.leaderEpoch, load.offset} + treloading[load.partition] = epochOffsetWhy{EpochOffset{load.leaderEpoch, load.offset}, load.err} } } } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index ac385e8e..f4ddc4db 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) { } } +func closed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + func TestPauseIssue489(t *testing.T) { t.Parallel() - t1, cleanup := tmpTopicPartitions(t, 2) + t1, cleanup := tmpTopicPartitions(t, 3) defer cleanup() cl, _ := NewClient( @@ -282,47 +291,142 @@ func TestPauseIssue489(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { var exit atomic.Bool - var zeroOne uint8 + var which uint8 for !exit.Load() { r := StringRecord("v") - r.Partition = int32(zeroOne % 2) - zeroOne++ + r.Partition = int32(which % 3) + which++ cl.Produce(ctx, r, func(r *Record, err error) { if err == context.Canceled { exit.Store(true) } }) + time.Sleep(100 * time.Microsecond) } }() defer cancel() - for i := 0; i < 10; i++ { - var sawZero, sawOne bool - for !sawZero || !sawOne { - fs := cl.PollFetches(ctx) - fs.EachRecord(func(r *Record) { - sawZero = sawZero || r.Partition == 0 - sawOne = sawOne || r.Partition == 1 - }) - } - cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) - sawZero, sawOne = false, false + for _, pollfn := range []struct { + name string + fn func(context.Context) Fetches + }{ + {"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }}, + {"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }}, + } { for i := 0; i < 10; i++ { - var fs Fetches - if i < 5 { - fs = cl.PollFetches(ctx) - } else { - fs = cl.PollRecords(ctx, 2) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + var sawZero, sawOne, sawTwo bool + for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + sawTwo = sawTwo || r.Partition == 2 + }) } - fs.EachRecord(func(r *Record) { - sawZero = sawZero || r.Partition == 0 - sawOne = sawOne || r.Partition == 1 + cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) + sawZero, sawOne, sawTwo = false, false, false + for i := 0; i < 10 && !closed(ctx.Done()); i++ { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + sawTwo = sawTwo || r.Partition == 2 + }) + } + cancel() + if sawZero { + t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name) + } + if !sawOne { + t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name) + } + if !sawTwo { + t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name) + } + cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) + } + } +} + +func TestPauseIssueOct2023(t *testing.T) { + t.Parallel() + + t1, cleanup1 := tmpTopicPartitions(t, 1) + t2, cleanup2 := tmpTopicPartitions(t, 1) + t3, cleanup3 := tmpTopicPartitions(t, 1) + defer cleanup1() + defer cleanup2() + defer cleanup3() + ts := []string{t1, t2, t3} + + cl, _ := NewClient( + getSeedBrokers(), + UnknownTopicRetries(-1), + ConsumeTopics(ts...), + MetadataMinAge(50*time.Millisecond), + FetchMaxWait(100*time.Millisecond), + ) + defer cl.Close() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + var exit atomic.Bool + var which int + for !exit.Load() { + r := StringRecord("v") + r.Topic = ts[which%len(ts)] + which++ + cl.Produce(ctx, r, func(r *Record, err error) { + if err == context.Canceled { + exit.Store(true) + } }) + time.Sleep(100 * time.Microsecond) } - if sawZero { - t.Error("saw partition zero even though it was paused") + }() + defer cancel() + + for _, pollfn := range []struct { + name string + fn func(context.Context) Fetches + }{ + {"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }}, + {"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }}, + } { + for i := 0; i < 10; i++ { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + var sawt1, sawt2, sawt3 bool + for (!sawt1 || !sawt2 || !sawt3) && !closed(ctx.Done()) { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawt1 = sawt1 || r.Topic == t1 + sawt2 = sawt2 || r.Topic == t2 + sawt3 = sawt3 || r.Topic == t3 + }) + } + cl.PauseFetchTopics(t1) + sawt1, sawt2, sawt3 = false, false, false + for i := 0; i < 10 && !closed(ctx.Done()); i++ { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawt1 = sawt1 || r.Topic == t1 + sawt2 = sawt2 || r.Topic == t2 + sawt3 = sawt3 || r.Topic == t3 + }) + } + cancel() + if sawt1 { + t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name) + } + if !sawt2 { + t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name) + } + if !sawt3 { + t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name) + } + cl.ResumeFetchTopics(t1) } - cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 3086e60d..25a3c2d3 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { // and strip the topic entirely. pps, ok := paused.t(t) if !ok { + for _, o := range ps { + o.from.setOffset(o.cursorOffset) + o.from.allowUsable() + } continue } if strip == nil { @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { continue } stript := make(map[int32]struct{}) - strip[t] = stript for _, o := range ps { if _, ok := pps.m[o.from.partition]; ok { o.from.allowUsable() @@ -378,6 +381,15 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { o.from.setOffset(o.cursorOffset) o.from.allowUsable() } + // We only add stript to strip if there are any + // stripped partitions. We could have a paused + // partition that is on another broker, while this + // broker has no paused partitions -- if we add stript + // here, our logic below (stripping this entire topic) + // is more confusing (present nil vs. non-present nil). + if len(stript) > 0 { + strip[t] = stript + } } }) if strip != nil { @@ -435,9 +447,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { continue } - r.Topics = append(r.Topics, *t) - rt := &r.Topics[len(r.Topics)-1] - rt.Partitions = nil + var rt *FetchTopic + ensureTopicAdded := func() { + if rt != nil { + return + } + r.Topics = append(r.Topics, *t) + rt = &r.Topics[len(r.Topics)-1] + rt.Partitions = nil + } tCursors := b.usedOffsets[t.Topic] @@ -455,6 +473,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { continue } + ensureTopicAdded() rt.Partitions = append(rt.Partitions, *p) rp := &rt.Partitions[len(rt.Partitions)-1]