diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 475270ed..ba781964 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1667,6 +1667,15 @@ func (c *consumer) startNewSession(tps *topicsPartitions) *consumerSession { func (s *consumerSession) listOrEpoch(waiting listOrEpochLoads, immediate bool, why string) { defer s.decWorker() + // It is possible for a metadata update to try to migrate partition + // loads if the update moves partitions between brokers. If we are + // closing the client, the consumer session could already be stopped, + // but this stops before the metadata goroutine is killed. So, if we + // are in this function but actually have no session, we return. + if s == noConsumerSession { + return + } + wait := true if immediate { s.c.cl.triggerUpdateMetadataNow(why)