diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 6569a60d..ceaed081 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1763,10 +1763,16 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload // guard this entire function. debug := s.c.cl.cfg.logger.Level() >= LogLevelDebug - var using, reloading map[string]map[int32]EpochOffset + + var using map[string]map[int32]EpochOffset + type epochOffsetWhy struct { + EpochOffset + error + } + var reloading map[string]map[int32]epochOffsetWhy if debug { using = make(map[string]map[int32]EpochOffset) - reloading = make(map[string]map[int32]EpochOffset) + reloading = make(map[string]map[int32]epochOffsetWhy) defer func() { t := "list" if loaded.loadType == loadTypeEpoch { @@ -1818,10 +1824,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload if debug { treloading := reloading[load.topic] if treloading == nil { - treloading = make(map[int32]EpochOffset) + treloading = make(map[int32]epochOffsetWhy) reloading[load.topic] = treloading } - treloading[load.partition] = EpochOffset{load.leaderEpoch, load.offset} + treloading[load.partition] = epochOffsetWhy{EpochOffset{load.leaderEpoch, load.offset}, load.err} } } } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index ac385e8e..f4ddc4db 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) { } } +func closed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + func TestPauseIssue489(t *testing.T) { t.Parallel() - t1, cleanup := tmpTopicPartitions(t, 2) + t1, cleanup := tmpTopicPartitions(t, 3) defer cleanup() cl, _ := NewClient( @@ -282,47 +291,142 @@ func TestPauseIssue489(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { var exit atomic.Bool - var zeroOne uint8 + var which uint8 for !exit.Load() { r := StringRecord("v") - r.Partition = int32(zeroOne % 2) - zeroOne++ + r.Partition = int32(which % 3) + which++ cl.Produce(ctx, r, func(r *Record, err error) { if err == context.Canceled { exit.Store(true) } }) + time.Sleep(100 * time.Microsecond) } }() defer cancel() - for i := 0; i < 10; i++ { - var sawZero, sawOne bool - for !sawZero || !sawOne { - fs := cl.PollFetches(ctx) - fs.EachRecord(func(r *Record) { - sawZero = sawZero || r.Partition == 0 - sawOne = sawOne || r.Partition == 1 - }) - } - cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) - sawZero, sawOne = false, false + for _, pollfn := range []struct { + name string + fn func(context.Context) Fetches + }{ + {"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }}, + {"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }}, + } { for i := 0; i < 10; i++ { - var fs Fetches - if i < 5 { - fs = cl.PollFetches(ctx) - } else { - fs = cl.PollRecords(ctx, 2) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + var sawZero, sawOne, sawTwo bool + for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + sawTwo = sawTwo || r.Partition == 2 + }) } - fs.EachRecord(func(r *Record) { - sawZero = sawZero || r.Partition == 0 - sawOne = sawOne || r.Partition == 1 + cl.PauseFetchPartitions(map[string][]int32{t1: {0}}) + sawZero, sawOne, sawTwo = false, false, false + for i := 0; i < 10 && !closed(ctx.Done()); i++ { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawZero = sawZero || r.Partition == 0 + sawOne = sawOne || r.Partition == 1 + sawTwo = sawTwo || r.Partition == 2 + }) + } + cancel() + if sawZero { + t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name) + } + if !sawOne { + t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name) + } + if !sawTwo { + t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name) + } + cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) + } + } +} + +func TestPauseIssueOct2023(t *testing.T) { + t.Parallel() + + t1, cleanup1 := tmpTopicPartitions(t, 1) + t2, cleanup2 := tmpTopicPartitions(t, 1) + t3, cleanup3 := tmpTopicPartitions(t, 1) + defer cleanup1() + defer cleanup2() + defer cleanup3() + ts := []string{t1, t2, t3} + + cl, _ := NewClient( + getSeedBrokers(), + UnknownTopicRetries(-1), + ConsumeTopics(ts...), + MetadataMinAge(50*time.Millisecond), + FetchMaxWait(100*time.Millisecond), + ) + defer cl.Close() + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + var exit atomic.Bool + var which int + for !exit.Load() { + r := StringRecord("v") + r.Topic = ts[which%len(ts)] + which++ + cl.Produce(ctx, r, func(r *Record, err error) { + if err == context.Canceled { + exit.Store(true) + } }) + time.Sleep(100 * time.Microsecond) } - if sawZero { - t.Error("saw partition zero even though it was paused") + }() + defer cancel() + + for _, pollfn := range []struct { + name string + fn func(context.Context) Fetches + }{ + {"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }}, + {"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }}, + } { + for i := 0; i < 10; i++ { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + var sawt1, sawt2, sawt3 bool + for (!sawt1 || !sawt2 || !sawt3) && !closed(ctx.Done()) { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawt1 = sawt1 || r.Topic == t1 + sawt2 = sawt2 || r.Topic == t2 + sawt3 = sawt3 || r.Topic == t3 + }) + } + cl.PauseFetchTopics(t1) + sawt1, sawt2, sawt3 = false, false, false + for i := 0; i < 10 && !closed(ctx.Done()); i++ { + fs := pollfn.fn(ctx) + fs.EachRecord(func(r *Record) { + sawt1 = sawt1 || r.Topic == t1 + sawt2 = sawt2 || r.Topic == t2 + sawt3 = sawt3 || r.Topic == t3 + }) + } + cancel() + if sawt1 { + t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name) + } + if !sawt2 { + t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name) + } + if !sawt3 { + t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name) + } + cl.ResumeFetchTopics(t1) } - cl.ResumeFetchPartitions(map[string][]int32{t1: {0}}) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 3086e60d..25a3c2d3 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { // and strip the topic entirely. pps, ok := paused.t(t) if !ok { + for _, o := range ps { + o.from.setOffset(o.cursorOffset) + o.from.allowUsable() + } continue } if strip == nil { @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { continue } stript := make(map[int32]struct{}) - strip[t] = stript for _, o := range ps { if _, ok := pps.m[o.from.partition]; ok { o.from.allowUsable() @@ -378,6 +381,15 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch { o.from.setOffset(o.cursorOffset) o.from.allowUsable() } + // We only add stript to strip if there are any + // stripped partitions. We could have a paused + // partition that is on another broker, while this + // broker has no paused partitions -- if we add stript + // here, our logic below (stripping this entire topic) + // is more confusing (present nil vs. non-present nil). + if len(stript) > 0 { + strip[t] = stript + } } }) if strip != nil { @@ -435,9 +447,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { continue } - r.Topics = append(r.Topics, *t) - rt := &r.Topics[len(r.Topics)-1] - rt.Partitions = nil + var rt *FetchTopic + ensureTopicAdded := func() { + if rt != nil { + return + } + r.Topics = append(r.Topics, *t) + rt = &r.Topics[len(r.Topics)-1] + rt.Partitions = nil + } tCursors := b.usedOffsets[t.Topic] @@ -455,6 +473,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) { continue } + ensureTopicAdded() rt.Partitions = append(rt.Partitions, *p) rp := &rt.Partitions[len(rt.Partitions)-1]