We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
i use kafka to send lots of msgs,i use 6c8g k8s pod to send data, the mem of pod grows up to 90%-100% after one day and then my pod oom.
but pprof shows that i only use a few mem. so i infers that kafka sdk may have some mem problem in cgo.
here is my producer init code , i set batch_produce false, go.delivery.reports false
func NewAsyncProducer(kafkaConf ProducerAsyncConfig) (*Producer, error) { conf := kafka.ConfigMap{ "bootstrap.servers": strings.Join(kafkaConf.Addr, ","), "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_PLAINTEXT", "sasl.username": kafkaConf.UserName, "sasl.password": kafkaConf.Password, "acks": kafkaConf.Acks, "go.batch.producer": false, "go.delivery.reports": kafkaConf.DeliveryReports, } if kafkaConf.BatchProduce { conf["go.batch.producer"] = true conf["batch.size"] = kafkaConf.BatchSize conf["go.events.channel.size"] = kafkaConf.EventsChannelSize conf["go.produce.channel.size"] = kafkaConf.ProduceChannelSize } if kafkaConf.CompressionType != "" { conf["compression.type"] = kafkaConf.CompressionType } producer, err := kafka.NewProducer(&conf) if err != nil { return nil, err } ctx := context.Background() go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { logger.Error(ctx, "kafka Delivery failed: %v\n", ev.TopicPartition, string(ev.Value)) } } } }() return &Producer{producer}, err } func (p *Producer) ProduceAsyncMsg(topic, value string, partition int32) error { err := p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: partition}, Value: []byte(value), Headers: []kafka.Header{}, }, nil) return err }
Please provide the following information:
LibraryVersion()
ConfigMap{...}
conf := kafka.ConfigMap{ "bootstrap.servers": strings.Join(kafkaConf.Addr, ","), "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_PLAINTEXT", "sasl.username": kafkaConf.UserName, "sasl.password": kafkaConf.Password, "acks": kafkaConf.Acks, "go.batch.producer": false, "go.delivery.reports": kafkaConf.DeliveryReports, }
"debug": ".."
The text was updated successfully, but these errors were encountered:
func NewAsyncProducer(kafkaConf ProducerAsyncConfig) (*Producer, error) { conf := kafka.ConfigMap{ "bootstrap.servers": strings.Join(kafkaConf.Addr, ","), "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_PLAINTEXT", "sasl.username": kafkaConf.UserName, "sasl.password": kafkaConf.Password, "acks": kafkaConf.Acks, "go.batch.producer": false, "go.delivery.reports": kafkaConf.DeliveryReports, } if kafkaConf.BatchProduce { conf["go.batch.producer"] = true // the second assignment conf["batch.size"] = kafkaConf.BatchSize conf["go.events.channel.size"] = kafkaConf.EventsChannelSize conf["go.produce.channel.size"] = kafkaConf.ProduceChannelSize }
func NewAsyncProducer(kafkaConf ProducerAsyncConfig) (*Producer, error) { conf := kafka.ConfigMap{ "bootstrap.servers": strings.Join(kafkaConf.Addr, ","), "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_PLAINTEXT", "sasl.username": kafkaConf.UserName, "sasl.password": kafkaConf.Password, "acks": kafkaConf.Acks, "go.batch.producer": false, "go.delivery.reports": kafkaConf.DeliveryReports, }
if kafkaConf.BatchProduce { conf["go.batch.producer"] = true // the second assignment conf["batch.size"] = kafkaConf.BatchSize conf["go.events.channel.size"] = kafkaConf.EventsChannelSize conf["go.produce.channel.size"] = kafkaConf.ProduceChannelSize }
it's hard to make sure the actual value of 'go.delivery.reports', since there are two assignment to go.batch.producer.
go.batch.producer
Sorry, something went wrong.
No branches or pull requests
Description
i use kafka to send lots of msgs,i use 6c8g k8s pod to send data, the mem of pod grows up to 90%-100% after one day and then my pod oom.
but pprof shows that i only use a few mem. so i infers that kafka sdk may have some mem problem in cgo.
How to reproduce
here is my producer init code , i set batch_produce false, go.delivery.reports false
Checklist
Please provide the following information:
LibraryVersion()
): 2.4.0ConfigMap{...}
"debug": ".."
as necessary)The text was updated successfully, but these errors were encountered: