From c013050edf5432b0a52aaa1736ce6f726f9a8214 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 21 Oct 2023 12:01:51 -0600 Subject: [PATCH] kgo: avoid rare panic Scenario is: * Metadata update is actively running and has stopped an active session, returning all topicPartitions that were actively in list/epoch. These list/epoch loads are stored in reloadOffsets. Metadata grabs the session change mutex. * Client.Close is now called, stores client.consumer.kill(true). The Close is blocked briefly because Close calls assignPartitions which tries to lock to stop the session. Close is now paused -- however, importantly, the consumer.kill atomic is set to true. * Metadata tries to start a new session. startNewSession returns noConsumerSession because consumer.kill is now true. * Metadata calls reloadOffsets.loadWithSession, which panics once the session tries to access the client variable c. This panic can only happen if all of the following are true: * Client.Close is being called * Metadata is updating * Metadata response is moving a partition from one broker to another * The timing is perfect The fix to this is to check in listOrEpoch if the consumerSession is noConsumerSession. If so, return early. Note that doOnMetadataUpdate, incWorker, and decWorker already checked noConsumerSession. The other methods do not need to check: * mapLoadsToBrokers is called in listOrEpochs on a valid session * handleListOrEpochResults is the same * desireFetch is only called in source after noConsumerSession is checked, and manageFetchConcurrency is called only in desireFetch Closes redpanda-data/redpanda#13791. --- pkg/kgo/consumer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 475270ed..ba781964 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1667,6 +1667,15 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession { func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) { defer s.decWorker() + // It is possible for a metadata update to try to migrate partition + // loads if the update moves partitions between brokers. If we are + // closing the client, the consumer session could already be stopped, + // but this stops before the metadata goroutine is killed. So, if we + // are in this function but actually have no session, we return. + if s == noConsumerSession { + return + } + wait := true if immediate { s.c.cl.triggerUpdateMetadataNow(why)