Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: be sure to use topics when other topics are paused #610

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,10 +1763,16 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
// guard this entire function.

debug := s.c.cl.cfg.logger.Level() >= LogLevelDebug
var using, reloading map[string]map[int32]EpochOffset

var using map[string]map[int32]EpochOffset
type epochOffsetWhy struct {
EpochOffset
error
}
var reloading map[string]map[int32]epochOffsetWhy
if debug {
using = make(map[string]map[int32]EpochOffset)
reloading = make(map[string]map[int32]EpochOffset)
reloading = make(map[string]map[int32]epochOffsetWhy)
defer func() {
t := "list"
if loaded.loadType == loadTypeEpoch {
Expand Down Expand Up @@ -1818,10 +1824,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
if debug {
treloading := reloading[load.topic]
if treloading == nil {
treloading = make(map[int32]EpochOffset)
treloading = make(map[int32]epochOffsetWhy)
reloading[load.topic] = treloading
}
treloading[load.partition] = EpochOffset{load.leaderEpoch, load.offset}
treloading[load.partition] = epochOffsetWhy{EpochOffset{load.leaderEpoch, load.offset}, load.err}
}
}
}
Expand Down
156 changes: 130 additions & 26 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) {
}
}

func closed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

func TestPauseIssue489(t *testing.T) {
t.Parallel()

t1, cleanup := tmpTopicPartitions(t, 2)
t1, cleanup := tmpTopicPartitions(t, 3)
defer cleanup()

cl, _ := NewClient(
Expand All @@ -282,47 +291,142 @@ func TestPauseIssue489(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var zeroOne uint8
var which uint8
for !exit.Load() {
r := StringRecord("v")
r.Partition = int32(zeroOne % 2)
zeroOne++
r.Partition = int32(which % 3)
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
}()
defer cancel()

for i := 0; i < 10; i++ {
var sawZero, sawOne bool
for !sawZero || !sawOne {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
})
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
} else {
fs = cl.PollRecords(ctx, 2)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawZero, sawOne, sawTwo bool
for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne, sawTwo = false, false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
cancel()
if sawZero {
t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name)
}
if !sawOne {
t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name)
}
if !sawTwo {
t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}
}

func TestPauseIssueOct2023(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
t2, cleanup2 := tmpTopicPartitions(t, 1)
t3, cleanup3 := tmpTopicPartitions(t, 1)
defer cleanup1()
defer cleanup2()
defer cleanup3()
ts := []string{t1, t2, t3}

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
ConsumeTopics(ts...),
MetadataMinAge(50*time.Millisecond),
FetchMaxWait(100*time.Millisecond),
)
defer cl.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var which int
for !exit.Load() {
r := StringRecord("v")
r.Topic = ts[which%len(ts)]
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
if sawZero {
t.Error("saw partition zero even though it was paused")
}()
defer cancel()

for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawt1, sawt2, sawt3 bool
for (!sawt1 || !sawt2 || !sawt3) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cl.PauseFetchTopics(t1)
sawt1, sawt2, sawt3 = false, false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cancel()
if sawt1 {
t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name)
}
if !sawt2 {
t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name)
}
if !sawt3 {
t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name)
}
cl.ResumeFetchTopics(t1)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}

Expand Down
27 changes: 23 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
// and strip the topic entirely.
pps, ok := paused.t(t)
if !ok {
for _, o := range ps {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
continue
}
if strip == nil {
Expand All @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
continue
}
stript := make(map[int32]struct{})
strip[t] = stript
for _, o := range ps {
if _, ok := pps.m[o.from.partition]; ok {
o.from.allowUsable()
Expand All @@ -378,6 +381,15 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
// We only add stript to strip if there are any
// stripped partitions. We could have a paused
// partition that is on another broker, while this
// broker has no paused partitions -- if we add stript
// here, our logic below (stripping this entire topic)
// is more confusing (present nil vs. non-present nil).
if len(stript) > 0 {
twmb marked this conversation as resolved.
Show resolved Hide resolved
strip[t] = stript
}
}
})
if strip != nil {
Expand Down Expand Up @@ -435,9 +447,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

r.Topics = append(r.Topics, *t)
rt := &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
var rt *FetchTopic
ensureTopicAdded := func() {
if rt != nil {
return
}
r.Topics = append(r.Topics, *t)
rt = &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
}

tCursors := b.usedOffsets[t.Topic]

Expand All @@ -455,6 +473,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

ensureTopicAdded()
rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]

Expand Down
Loading