Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

my producer use too many memory #1326

Open
5 of 7 tasks
ucanme opened this issue Oct 31, 2024 · 1 comment
Open
5 of 7 tasks

my producer use too many memory #1326

ucanme opened this issue Oct 31, 2024 · 1 comment

Comments

@ucanme
Copy link

ucanme commented Oct 31, 2024

Description

i use kafka to send lots of msgs,i use 6c8g k8s pod to send data, the mem of pod grows up to 90%-100% after one day and then my pod oom.
image

but pprof shows that i only use a few mem. so i infers that kafka sdk may have some mem problem in cgo.
image

How to reproduce

here is my producer init code , i set batch_produce false, go.delivery.reports false

   func NewAsyncProducer(kafkaConf ProducerAsyncConfig) (*Producer, error) {
	conf := kafka.ConfigMap{
		"bootstrap.servers":   strings.Join(kafkaConf.Addr, ","),
		"sasl.mechanisms":     "PLAIN",
		"security.protocol":   "SASL_PLAINTEXT",
		"sasl.username":       kafkaConf.UserName,
		"sasl.password":       kafkaConf.Password,
		"acks":                kafkaConf.Acks,
		"go.batch.producer":   false,
		"go.delivery.reports": kafkaConf.DeliveryReports,
	}


	if kafkaConf.BatchProduce {
		conf["go.batch.producer"] = true
		conf["batch.size"] = kafkaConf.BatchSize
		conf["go.events.channel.size"] = kafkaConf.EventsChannelSize
		conf["go.produce.channel.size"] = kafkaConf.ProduceChannelSize
	}

	if kafkaConf.CompressionType != "" {
		conf["compression.type"] = kafkaConf.CompressionType
	}

	producer, err := kafka.NewProducer(&conf)
	if err != nil {
		return nil, err
	}
	ctx := context.Background()
	go func() {
		for e := range producer.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					logger.Error(ctx, "kafka Delivery failed: %v\n", ev.TopicPartition, string(ev.Value))
				}
			}
		}
	}()

	return &Producer{producer}, err
}

func (p *Producer) ProduceAsyncMsg(topic, value string, partition int32) error {
	err := p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: partition},
		Value:          []byte(value),
		Headers:        []kafka.Header{},
	}, nil)
	return err
}

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 2.4.0
  • Apache Kafka broker version: v.0.11.0.2
  • Client configuration: ConfigMap{...}
	conf := kafka.ConfigMap{
		"bootstrap.servers":   strings.Join(kafkaConf.Addr, ","),
		"sasl.mechanisms":     "PLAIN",
		"security.protocol":   "SASL_PLAINTEXT",
		"sasl.username":       kafkaConf.UserName,
		"sasl.password":       kafkaConf.Password,
		"acks":                kafkaConf.Acks,
		"go.batch.producer":   false,
		"go.delivery.reports": kafkaConf.DeliveryReports,
	}
  • Operating system:
  • k8s pod rocky8.0
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@jizhilong
Copy link

here is my producer init code , i set batch_produce false, go.delivery.reports false

func NewAsyncProducer(kafkaConf ProducerAsyncConfig) (*Producer, error) {
conf := kafka.ConfigMap{
"bootstrap.servers": strings.Join(kafkaConf.Addr, ","),
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_PLAINTEXT",
"sasl.username": kafkaConf.UserName,
"sasl.password": kafkaConf.Password,
"acks": kafkaConf.Acks,
"go.batch.producer": false,
"go.delivery.reports": kafkaConf.DeliveryReports,
}

if kafkaConf.BatchProduce {
conf["go.batch.producer"] = true // the second assignment
conf["batch.size"] = kafkaConf.BatchSize
conf["go.events.channel.size"] = kafkaConf.EventsChannelSize
conf["go.produce.channel.size"] = kafkaConf.ProduceChannelSize
}

it's hard to make sure the actual value of 'go.delivery.reports', since there are two assignment to go.batch.producer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants