Skip to content

Commit

Permalink
kgo: avoid rare panic
Browse files Browse the repository at this point in the history
Scenario is:
* Metadata update is actively running and has stopped an active session,
  returning all topicPartitions that were actively in list/epoch. These
  list/epoch loads are stored in reloadOffsets. Metadata grabs the
  session change mutex.
* Client.Close is now called, stores client.consumer.kill(true). The
  Close is blocked briefly because Close calls assignPartitions which
  tries to lock to stop the session. Close is now paused -- however,
  importantly, the consumer.kill atomic is set to true.
* Metadata tries to start a new session. startNewSession returns
  noConsumerSession because consumer.kill is now true.
* Metadata calls reloadOffsets.loadWithSession, which panics once
  the session tries to access the client variable c.

This panic can only happen if all of the following are true:
* Client.Close is being called
* Metadata is updating
* Metadata response is moving a partition from one broker to another
* The timing is perfect

The fix to this is to check in listOrEpoch if the consumerSession is
noConsumerSession. If so, return early.

Note that doOnMetadataUpdate, incWorker, and decWorker already checked
noConsumerSession. The other methods do not need to check:
* mapLoadsToBrokers is called in listOrEpochs on a valid session
* handleListOrEpochResults is the same
* desireFetch is only called in source after noConsumerSession is
  checked, and manageFetchConcurrency is called only in desireFetch

Closes redpanda-data/redpanda#13791.
  • Loading branch information
twmb committed Oct 21, 2023
1 parent 6a961da commit c013050
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,15 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession {
func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) {
defer s.decWorker()

// It is possible for a metadata update to try to migrate partition
// loads if the update moves partitions between brokers. If we are
// closing the client, the consumer session could already be stopped,
// but this stops before the metadata goroutine is killed. So, if we
// are in this function but actually have no session, we return.
if s == noConsumerSession {
return
}

wait := true
if immediate {
s.c.cl.triggerUpdateMetadataNow(why)
Expand Down

0 comments on commit c013050

Please sign in to comment.