Skip to content

Commit

Permalink
kgo: add NewErrFetch
Browse files Browse the repository at this point in the history
Closes #552.
  • Loading branch information
twmb committed Sep 16, 2023
1 parent 01651af commit 310a5da
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ func (c *consumer) addFakeReadyForDraining(topic string, partition int32, err er
c.sourcesReadyCond.Broadcast()
}

func errFetch(err error) Fetches {
// NewErrFetch returns a fake fetch containing a single empty topic with a
// single zero partition with the given error.
func NewErrFetch(err error) Fetches {
return []Fetch{{
Topics: []FetchTopic{{
Topic: "",
Expand Down Expand Up @@ -408,7 +410,7 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
if ctx != nil {
select {
case <-ctx.Done():
return errFetch(ctx.Err())
return NewErrFetch(ctx.Err())
default:
}
}
Expand Down Expand Up @@ -511,10 +513,10 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
select {
case <-cl.ctx.Done():
exit()
return errFetch(ErrClientClosed)
return NewErrFetch(ErrClientClosed)
case <-ctx.Done():
exit()
return errFetch(ctx.Err())
return NewErrFetch(ctx.Err())
case <-done:
}

Expand Down

0 comments on commit 310a5da

Please sign in to comment.