From 861efe07084892b749a5d2f56189c5353a59b223 Mon Sep 17 00:00:00 2001 From: darkz Date: Mon, 1 Apr 2024 14:20:14 +0800 Subject: [PATCH 1/3] return more clearly topic error message topic error was catch in ErrInvalidArg but it can show more clearly --- kafka/producer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/producer.go b/kafka/producer.go index 47bc2ee94..ca4a681a8 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -173,8 +173,10 @@ func (p *Producer) gethandle() *handle { } func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error { - if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 { + if msg == nil || len(*msg.TopicPartition.Topic) == 0 { return newErrorFromString(ErrInvalidArg, "") + }else if msg.TopicPartition.Topic == nil{ + return newErrorFromString(ErrUnknownTopic, "topic is null, please check") } crkt := p.handle.getRkt(*msg.TopicPartition.Topic) From 1a1b1cc6ecb255dca3cc94e6ebecc09783bfe359 Mon Sep 17 00:00:00 2001 From: darkz Date: Wed, 17 Apr 2024 19:59:53 +0800 Subject: [PATCH 2/3] change error detect add nil message detect add topic error detect --- kafka/producer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/producer.go b/kafka/producer.go index ca4a681a8..f66a23a86 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -173,12 +173,12 @@ func (p *Producer) gethandle() *handle { } func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error { - if msg == nil || len(*msg.TopicPartition.Topic) == 0 { - return newErrorFromString(ErrInvalidArg, "") - }else if msg.TopicPartition.Topic == nil{ - return newErrorFromString(ErrUnknownTopic, "topic is null, please check") + if msg == nil { + return newErrorFromString(ErrInvalidArg, "msg cannot be nil") + }else if msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 { + return newErrorFromString(ErrInvalidArg, "topic cannot be nil or empty") } - + crkt := p.handle.getRkt(*msg.TopicPartition.Topic) // Three problems: From d842be0e7d9a58d2a80e83fc2fe105bd9f8844d7 Mon Sep 17 00:00:00 2001 From: darkz Date: Tue, 7 May 2024 19:52:10 +0800 Subject: [PATCH 3/3] format commit --- kafka/producer.go | 1076 ++++++++++++++++++++++----------------------- 1 file changed, 538 insertions(+), 538 deletions(-) diff --git a/kafka/producer.go b/kafka/producer.go index f66a23a86..fe0b1838b 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -17,11 +17,11 @@ package kafka */ import ( - "context" - "fmt" - "sync/atomic" - "time" - "unsafe" + "context" + "fmt" + "sync/atomic" + "time" + "unsafe" ) /* @@ -131,170 +131,170 @@ import "C" // minInt64 finds the minimum of two int64s. // Required until we start using Go 1.18 with generic functions. func minInt64(a int64, b int64) int64 { - if a > b { - return b - } - return a + if a > b { + return b + } + return a } // Producer implements a High-level Apache Kafka Producer instance type Producer struct { - events chan Event - produceChannel chan *Message - handle handle + events chan Event + produceChannel chan *Message + handle handle - // Terminates the poller() goroutine - pollerTermChan chan bool + // Terminates the poller() goroutine + pollerTermChan chan bool - // checks if Producer has been closed or not. - isClosed uint32 + // checks if Producer has been closed or not. + isClosed uint32 } // IsClosed returns boolean representing if client is closed or not func (p *Producer) IsClosed() bool { - return atomic.LoadUint32(&p.isClosed) == 1 + return atomic.LoadUint32(&p.isClosed) == 1 } func (p *Producer) verifyClient() error { - if p.IsClosed() { - return getOperationNotAllowedErrorForClosedClient() - } - return nil + if p.IsClosed() { + return getOperationNotAllowedErrorForClosedClient() + } + return nil } // String returns a human readable name for a Producer instance func (p *Producer) String() string { - return p.handle.String() + return p.handle.String() } // get_handle implements the Handle interface func (p *Producer) gethandle() *handle { - return &p.handle + return &p.handle } func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error { - if msg == nil { - return newErrorFromString(ErrInvalidArg, "msg cannot be nil") - }else if msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 { - return newErrorFromString(ErrInvalidArg, "topic cannot be nil or empty") - } - - crkt := p.handle.getRkt(*msg.TopicPartition.Topic) - - // Three problems: - // 1) There's a difference between an empty Value or Key (length 0, proper pointer) and - // a null Value or Key (length 0, null pointer). - // 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0]) - // dereference can't be performed on a nil slice. - // 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made - // in the call to the C function. - // - // Solution: - // Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers - // point to a 1-byte slice (but the length to send is still 0) so that the dereference (2) - // works. - // Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point - // to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3). - // - var valp []byte - var keyp []byte - oneByte := []byte{0} - var valIsNull C.int - var keyIsNull C.int - var valLen int - var keyLen int - - if msg.Value == nil { - valIsNull = 1 - valLen = 0 - valp = oneByte - } else { - valLen = len(msg.Value) - if valLen > 0 { - valp = msg.Value - } else { - valp = oneByte - } - } - - if msg.Key == nil { - keyIsNull = 1 - keyLen = 0 - keyp = oneByte - } else { - keyLen = len(msg.Key) - if keyLen > 0 { - keyp = msg.Key - } else { - keyp = oneByte - } - } - - var cgoid int - - // Per-message state that needs to be retained through the C code: - // delivery channel (if specified) - // message opaque (if specified) - // Since these cant be passed as opaque pointers to the C code, - // due to cgo constraints, we add them to a per-producer map for lookup - // when the C code triggers the callbacks or events. - if deliveryChan != nil || msg.Opaque != nil { - cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque}) - } - - var timestamp int64 - if !msg.Timestamp.IsZero() { - timestamp = msg.Timestamp.UnixNano() / 1000000 - } - - // Convert headers to C-friendly tmphdrs - var tmphdrs []C.tmphdr_t - tmphdrsCnt := len(msg.Headers) - - if tmphdrsCnt > 0 { - tmphdrs = make([]C.tmphdr_t, tmphdrsCnt) - - for n, hdr := range msg.Headers { - // Make a copy of the key - // to avoid runtime panic with - // foreign Go pointers in cgo. - tmphdrs[n].key = C.CString(hdr.Key) - if hdr.Value != nil { - tmphdrs[n].size = C.ssize_t(len(hdr.Value)) - if tmphdrs[n].size > 0 { - // Make a copy of the value - // to avoid runtime panic with - // foreign Go pointers in cgo. - tmphdrs[n].val = C.CBytes(hdr.Value) - } - } else { - // null value - tmphdrs[n].size = C.ssize_t(-1) - } - } - } else { - // no headers, need a dummy tmphdrs of size 1 to avoid index - // out of bounds panic in do_produce() call below. - // tmphdrsCnt will be 0. - tmphdrs = []C.tmphdr_t{{nil, nil, 0}} - } - - cErr := C.do_produce(p.handle.rk, crkt, - C.int32_t(msg.TopicPartition.Partition), - C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY, - valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen), - keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen), - C.int64_t(timestamp), - (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt), - (C.uintptr_t)(cgoid)) - if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { - if cgoid != 0 { - p.handle.cgoGet(cgoid) - } - return newError(cErr) - } - - return nil + if msg == nil { + return newErrorFromString(ErrInvalidArg, "msg cannot be nil") + } else if msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 { + return newErrorFromString(ErrInvalidArg, "topic cannot be nil or empty") + } + + crkt := p.handle.getRkt(*msg.TopicPartition.Topic) + + // Three problems: + // 1) There's a difference between an empty Value or Key (length 0, proper pointer) and + // a null Value or Key (length 0, null pointer). + // 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0]) + // dereference can't be performed on a nil slice. + // 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made + // in the call to the C function. + // + // Solution: + // Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers + // point to a 1-byte slice (but the length to send is still 0) so that the dereference (2) + // works. + // Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point + // to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3). + // + var valp []byte + var keyp []byte + oneByte := []byte{0} + var valIsNull C.int + var keyIsNull C.int + var valLen int + var keyLen int + + if msg.Value == nil { + valIsNull = 1 + valLen = 0 + valp = oneByte + } else { + valLen = len(msg.Value) + if valLen > 0 { + valp = msg.Value + } else { + valp = oneByte + } + } + + if msg.Key == nil { + keyIsNull = 1 + keyLen = 0 + keyp = oneByte + } else { + keyLen = len(msg.Key) + if keyLen > 0 { + keyp = msg.Key + } else { + keyp = oneByte + } + } + + var cgoid int + + // Per-message state that needs to be retained through the C code: + // delivery channel (if specified) + // message opaque (if specified) + // Since these cant be passed as opaque pointers to the C code, + // due to cgo constraints, we add them to a per-producer map for lookup + // when the C code triggers the callbacks or events. + if deliveryChan != nil || msg.Opaque != nil { + cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque}) + } + + var timestamp int64 + if !msg.Timestamp.IsZero() { + timestamp = msg.Timestamp.UnixNano() / 1000000 + } + + // Convert headers to C-friendly tmphdrs + var tmphdrs []C.tmphdr_t + tmphdrsCnt := len(msg.Headers) + + if tmphdrsCnt > 0 { + tmphdrs = make([]C.tmphdr_t, tmphdrsCnt) + + for n, hdr := range msg.Headers { + // Make a copy of the key + // to avoid runtime panic with + // foreign Go pointers in cgo. + tmphdrs[n].key = C.CString(hdr.Key) + if hdr.Value != nil { + tmphdrs[n].size = C.ssize_t(len(hdr.Value)) + if tmphdrs[n].size > 0 { + // Make a copy of the value + // to avoid runtime panic with + // foreign Go pointers in cgo. + tmphdrs[n].val = C.CBytes(hdr.Value) + } + } else { + // null value + tmphdrs[n].size = C.ssize_t(-1) + } + } + } else { + // no headers, need a dummy tmphdrs of size 1 to avoid index + // out of bounds panic in do_produce() call below. + // tmphdrsCnt will be 0. + tmphdrs = []C.tmphdr_t{{nil, nil, 0}} + } + + cErr := C.do_produce(p.handle.rk, crkt, + C.int32_t(msg.TopicPartition.Partition), + C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY, + valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen), + keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen), + C.int64_t(timestamp), + (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt), + (C.uintptr_t)(cgoid)) + if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { + if cgoid != 0 { + p.handle.cgoGet(cgoid) + } + return newError(cErr) + } + + return nil } // Produce single message. @@ -308,11 +308,11 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) // api.version.request=true, and broker >= 0.11.0.0. // Returns an error if message could not be enqueued. func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error { - err := p.verifyClient() - if err != nil { - return err - } - return p.produce(msg, 0, deliveryChan) + err := p.verifyClient() + if err != nil { + return err + } + return p.produce(msg, 0, deliveryChan) } // Produce a batch of messages. @@ -321,29 +321,29 @@ func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error { // WARNING: This is an experimental API. // NOTE: timestamps and headers are not supported with this API. func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error { - crkt := p.handle.getRkt(topic) - - cmsgs := make([]C.rd_kafka_message_t, len(msgs)) - for i, m := range msgs { - p.handle.messageToC(m, &cmsgs[i]) - } - r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE, - (*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs))) - if r == -1 { - return newError(C.rd_kafka_last_error()) - } - - return nil + crkt := p.handle.getRkt(topic) + + cmsgs := make([]C.rd_kafka_message_t, len(msgs)) + for i, m := range msgs { + p.handle.messageToC(m, &cmsgs[i]) + } + r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE, + (*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs))) + if r == -1 { + return newError(C.rd_kafka_last_error()) + } + + return nil } // Events returns the Events channel (read) func (p *Producer) Events() chan Event { - return p.events + return p.events } // Logs returns the Log channel (if enabled), else nil func (p *Producer) Logs() chan LogEvent { - return p.handle.logs + return p.handle.logs } // ProduceChannel returns the produce *Message channel (write) @@ -352,14 +352,14 @@ func (p *Producer) Logs() chan LogEvent { // of Produce(). // Flush() and Len() are not guaranteed to be reliable with ProduceChannel. func (p *Producer) ProduceChannel() chan *Message { - return p.produceChannel + return p.produceChannel } // Len returns the number of messages and requests waiting to be transmitted to the broker // as well as delivery reports queued for the application. // BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to be reliable. func (p *Producer) Len() int { - return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk)) + return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk)) } // Flush and wait for outstanding messages and requests to complete delivery. @@ -367,76 +367,76 @@ func (p *Producer) Len() int { // Returns the number of outstanding events still un-flushed. // BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to be reliable. func (p *Producer) Flush(timeoutMs int) int { - termChan := make(chan bool) // unused stand-in termChan - // used to specify timeout for the underlying flush - flushIntervalChan := make(chan int64) - - // Keep calling rd_kafka_flush to ignore queue.buffering.max.ms, and - // account for any other state changes that the underlying library - // might do in case it is flushing. - go func() { - for flushInterval := range flushIntervalChan { - C.rd_kafka_flush(p.handle.rk, C.int(flushInterval)) - } - }() - - defer close(flushIntervalChan) - - timeoutDuration := time.Duration(timeoutMs) * time.Millisecond - tEnd := time.Now().Add(timeoutDuration) - for p.Len() > 0 { - remain := time.Until(tEnd).Milliseconds() - if remain <= 0 { - return p.Len() - } - - tWait := minInt64(100, remain) - - // If the previous eventPoll returned immediately, and the previous - // rd_kafka_flush did not, we'll end up in a situation where this - // channel blocks. However, this is acceptable as it may block for - // a maximum of 100ms. - flushIntervalChan <- tWait - p.handle.eventPoll(p.events, - int(tWait), 1000, termChan) - } - - return 0 + termChan := make(chan bool) // unused stand-in termChan + // used to specify timeout for the underlying flush + flushIntervalChan := make(chan int64) + + // Keep calling rd_kafka_flush to ignore queue.buffering.max.ms, and + // account for any other state changes that the underlying library + // might do in case it is flushing. + go func() { + for flushInterval := range flushIntervalChan { + C.rd_kafka_flush(p.handle.rk, C.int(flushInterval)) + } + }() + + defer close(flushIntervalChan) + + timeoutDuration := time.Duration(timeoutMs) * time.Millisecond + tEnd := time.Now().Add(timeoutDuration) + for p.Len() > 0 { + remain := time.Until(tEnd).Milliseconds() + if remain <= 0 { + return p.Len() + } + + tWait := minInt64(100, remain) + + // If the previous eventPoll returned immediately, and the previous + // rd_kafka_flush did not, we'll end up in a situation where this + // channel blocks. However, this is acceptable as it may block for + // a maximum of 100ms. + flushIntervalChan <- tWait + p.handle.eventPoll(p.events, + int(tWait), 1000, termChan) + } + + return 0 } // Close a Producer instance. // The Producer object or its channels are no longer usable after this call. func (p *Producer) Close() { - if !atomic.CompareAndSwapUint32(&p.isClosed, 0, 1) { - return - } + if !atomic.CompareAndSwapUint32(&p.isClosed, 0, 1) { + return + } - // Wait for poller() (signaled by closing pollerTermChan) - // and channel_producer() (signaled by closing ProduceChannel) - close(p.pollerTermChan) - close(p.produceChannel) - p.handle.waitGroup.Wait() + // Wait for poller() (signaled by closing pollerTermChan) + // and channel_producer() (signaled by closing ProduceChannel) + close(p.pollerTermChan) + close(p.produceChannel) + p.handle.waitGroup.Wait() - close(p.events) + close(p.events) - p.handle.cleanup() + p.handle.cleanup() - C.rd_kafka_destroy(p.handle.rk) + C.rd_kafka_destroy(p.handle.rk) } const ( - // PurgeInFlight purges messages in-flight to or from the broker. - // Purging these messages will void any future acknowledgements from the - // broker, making it impossible for the application to know if these - // messages were successfully delivered or not. - // Retrying these messages may lead to duplicates. - PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT) - - // PurgeQueue Purge messages in internal queues. - PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE) - - // PurgeNonBlocking Don't wait for background thread queue purging to finish. - PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING) + // PurgeInFlight purges messages in-flight to or from the broker. + // Purging these messages will void any future acknowledgements from the + // broker, making it impossible for the application to know if these + // messages were successfully delivered or not. + // Retrying these messages may lead to duplicates. + PurgeInFlight = int(C.RD_KAFKA_PURGE_F_INFLIGHT) + + // PurgeQueue Purge messages in internal queues. + PurgeQueue = int(C.RD_KAFKA_PURGE_F_QUEUE) + + // PurgeNonBlocking Don't wait for background thread queue purging to finish. + PurgeNonBlocking = int(C.RD_KAFKA_PURGE_F_NON_BLOCKING) ) // Purge messages currently handled by this producer instance. @@ -463,16 +463,16 @@ const ( // // Returns nil on success, ErrInvalidArg if the purge flags are invalid or unknown. func (p *Producer) Purge(flags int) error { - err := p.verifyClient() - if err != nil { - return err - } - cErr := C.rd_kafka_purge(p.handle.rk, C.int(flags)) - if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { - return newError(cErr) - } - - return nil + err := p.verifyClient() + if err != nil { + return err + } + cErr := C.rd_kafka_purge(p.handle.rk, C.int(flags)) + if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { + return newError(cErr) + } + + return nil } // NewProducer creates a new high-level Producer instance. @@ -481,219 +481,219 @@ func (p *Producer) Purge(flags int) error { // // Supported special configuration properties (type, default): // -// go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). -// These batches do not relate to Kafka message batches in any way. -// Note: timestamps and headers are not supported with this interface. -// go.delivery.reports (bool, true) - Forward per-message delivery reports to the -// Events() channel. -// go.delivery.report.fields (string, "key,value") - Comma separated list of fields to enable for delivery reports. -// Allowed values: all, none (or empty string), key, value, headers -// Warning: There is a performance penalty to include headers in the delivery report. -// go.events.channel.size (int, 1000000) - Events(). -// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages) -// go.logs.channel.enable (bool, false) - Forward log to Logs() channel. -// go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true. +// go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). +// These batches do not relate to Kafka message batches in any way. +// Note: timestamps and headers are not supported with this interface. +// go.delivery.reports (bool, true) - Forward per-message delivery reports to the +// Events() channel. +// go.delivery.report.fields (string, "key,value") - Comma separated list of fields to enable for delivery reports. +// Allowed values: all, none (or empty string), key, value, headers +// Warning: There is a performance penalty to include headers in the delivery report. +// go.events.channel.size (int, 1000000) - Events(). +// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages) +// go.logs.channel.enable (bool, false) - Forward log to Logs() channel. +// go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true. func NewProducer(conf *ConfigMap) (*Producer, error) { - err := versionCheck() - if err != nil { - return nil, err - } - - p := &Producer{} - - // before we do anything with the configuration, create a copy such that - // the original is not mutated. - confCopy := conf.clone() - - v, err := confCopy.extract("delivery.report.only.error", false) - if v == true { - // FIXME: The filtering of successful DRs must be done in - // the Go client to avoid cgoDr memory leaks. - return nil, newErrorFromString(ErrUnsupportedFeature, - "delivery.report.only.error=true is not currently supported by the Go client") - } - - v, err = confCopy.extract("go.batch.producer", false) - if err != nil { - return nil, err - } - batchProducer := v.(bool) - - v, err = confCopy.extract("go.delivery.reports", true) - if err != nil { - return nil, err - } - p.handle.fwdDr = v.(bool) - - v, err = confCopy.extract("go.delivery.report.fields", "key,value") - if err != nil { - return nil, err - } - - p.handle.msgFields, err = newMessageFieldsFrom(v) - if err != nil { - return nil, err - } - - v, err = confCopy.extract("go.events.channel.size", 1000000) - if err != nil { - return nil, err - } - eventsChanSize := v.(int) - - v, err = confCopy.extract("go.produce.channel.size", 1000000) - if err != nil { - return nil, err - } - produceChannelSize := v.(int) - - logsChanEnable, logsChan, err := confCopy.extractLogConfig() - if err != nil { - return nil, err - } - - if int(C.rd_kafka_version()) < 0x01000000 { - // produce.offset.report is no longer used in librdkafka >= v1.0.0 - v, _ = confCopy.extract("{topic}.produce.offset.report", nil) - if v == nil { - // Enable offset reporting by default, unless overriden. - confCopy.SetKey("{topic}.produce.offset.report", true) - } - } - - // Convert ConfigMap to librdkafka conf_t - cConf, err := confCopy.convert() - if err != nil { - return nil, err - } - - cErrstr := (*C.char)(C.malloc(C.size_t(256))) - defer C.free(unsafe.Pointer(cErrstr)) - - C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH) - - // Create librdkafka producer instance - p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256) - if p.handle.rk == nil { - return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) - } - - p.handle.p = p - p.handle.setup() - p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk) - p.events = make(chan Event, eventsChanSize) - p.produceChannel = make(chan *Message, produceChannelSize) - p.pollerTermChan = make(chan bool) - p.isClosed = 0 - - if logsChanEnable { - p.handle.setupLogQueue(logsChan, p.pollerTermChan) - } - - p.handle.waitGroup.Add(1) - go func() { - poller(p, p.pollerTermChan) - p.handle.waitGroup.Done() - }() - - // non-batch or batch producer, only one must be used - var producer func(*Producer) - if batchProducer { - producer = channelBatchProducer - } else { - producer = channelProducer - } - - p.handle.waitGroup.Add(1) - go func() { - producer(p) - p.handle.waitGroup.Done() - }() - - return p, nil + err := versionCheck() + if err != nil { + return nil, err + } + + p := &Producer{} + + // before we do anything with the configuration, create a copy such that + // the original is not mutated. + confCopy := conf.clone() + + v, err := confCopy.extract("delivery.report.only.error", false) + if v == true { + // FIXME: The filtering of successful DRs must be done in + // the Go client to avoid cgoDr memory leaks. + return nil, newErrorFromString(ErrUnsupportedFeature, + "delivery.report.only.error=true is not currently supported by the Go client") + } + + v, err = confCopy.extract("go.batch.producer", false) + if err != nil { + return nil, err + } + batchProducer := v.(bool) + + v, err = confCopy.extract("go.delivery.reports", true) + if err != nil { + return nil, err + } + p.handle.fwdDr = v.(bool) + + v, err = confCopy.extract("go.delivery.report.fields", "key,value") + if err != nil { + return nil, err + } + + p.handle.msgFields, err = newMessageFieldsFrom(v) + if err != nil { + return nil, err + } + + v, err = confCopy.extract("go.events.channel.size", 1000000) + if err != nil { + return nil, err + } + eventsChanSize := v.(int) + + v, err = confCopy.extract("go.produce.channel.size", 1000000) + if err != nil { + return nil, err + } + produceChannelSize := v.(int) + + logsChanEnable, logsChan, err := confCopy.extractLogConfig() + if err != nil { + return nil, err + } + + if int(C.rd_kafka_version()) < 0x01000000 { + // produce.offset.report is no longer used in librdkafka >= v1.0.0 + v, _ = confCopy.extract("{topic}.produce.offset.report", nil) + if v == nil { + // Enable offset reporting by default, unless overriden. + confCopy.SetKey("{topic}.produce.offset.report", true) + } + } + + // Convert ConfigMap to librdkafka conf_t + cConf, err := confCopy.convert() + if err != nil { + return nil, err + } + + cErrstr := (*C.char)(C.malloc(C.size_t(256))) + defer C.free(unsafe.Pointer(cErrstr)) + + C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH) + + // Create librdkafka producer instance + p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256) + if p.handle.rk == nil { + return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) + } + + p.handle.p = p + p.handle.setup() + p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk) + p.events = make(chan Event, eventsChanSize) + p.produceChannel = make(chan *Message, produceChannelSize) + p.pollerTermChan = make(chan bool) + p.isClosed = 0 + + if logsChanEnable { + p.handle.setupLogQueue(logsChan, p.pollerTermChan) + } + + p.handle.waitGroup.Add(1) + go func() { + poller(p, p.pollerTermChan) + p.handle.waitGroup.Done() + }() + + // non-batch or batch producer, only one must be used + var producer func(*Producer) + if batchProducer { + producer = channelBatchProducer + } else { + producer = channelProducer + } + + p.handle.waitGroup.Add(1) + go func() { + producer(p) + p.handle.waitGroup.Done() + }() + + return p, nil } // channel_producer serves the ProduceChannel channel func channelProducer(p *Producer) { - for m := range p.produceChannel { - err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil) - if err != nil { - m.TopicPartition.Error = err - p.events <- m - } - } + for m := range p.produceChannel { + err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil) + if err != nil { + m.TopicPartition.Error = err + p.events <- m + } + } } // channelBatchProducer serves the ProduceChannel channel and attempts to // improve cgo performance by using the produceBatch() interface. func channelBatchProducer(p *Producer) { - var buffered = make(map[string][]*Message) - bufferedCnt := 0 - const batchSize int = 1000000 - totMsgCnt := 0 - totBatchCnt := 0 - - for m := range p.produceChannel { - buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m) - bufferedCnt++ - - loop2: - for true { - select { - case m, ok := <-p.produceChannel: - if !ok { - break loop2 - } - if m == nil { - panic("nil message received on ProduceChannel") - } - if m.TopicPartition.Topic == nil { - panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m)) - } - buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m) - bufferedCnt++ - if bufferedCnt >= batchSize { - break loop2 - } - default: - break loop2 - } - } - - totBatchCnt++ - totMsgCnt += len(buffered) - - for topic, buffered2 := range buffered { - err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK) - if err != nil { - for _, m = range buffered2 { - m.TopicPartition.Error = err - p.events <- m - } - } - } - - buffered = make(map[string][]*Message) - bufferedCnt = 0 - } + var buffered = make(map[string][]*Message) + bufferedCnt := 0 + const batchSize int = 1000000 + totMsgCnt := 0 + totBatchCnt := 0 + + for m := range p.produceChannel { + buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m) + bufferedCnt++ + + loop2: + for true { + select { + case m, ok := <-p.produceChannel: + if !ok { + break loop2 + } + if m == nil { + panic("nil message received on ProduceChannel") + } + if m.TopicPartition.Topic == nil { + panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m)) + } + buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m) + bufferedCnt++ + if bufferedCnt >= batchSize { + break loop2 + } + default: + break loop2 + } + } + + totBatchCnt++ + totMsgCnt += len(buffered) + + for topic, buffered2 := range buffered { + err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK) + if err != nil { + for _, m = range buffered2 { + m.TopicPartition.Error = err + p.events <- m + } + } + } + + buffered = make(map[string][]*Message) + bufferedCnt = 0 + } } // poller polls the rd_kafka_t handle for events until signalled for termination func poller(p *Producer, termChan chan bool) { - for { - select { - case _ = <-termChan: - return - - default: - _, term := p.handle.eventPoll(p.events, 100, 1000, termChan) - if term { - return - } - break - } - } + for { + select { + case _ = <-termChan: + return + + default: + _, term := p.handle.eventPoll(p.events, 100, 1000, termChan) + if term { + return + } + break + } + } } // GetMetadata queries broker for cluster and topic metadata. @@ -702,21 +702,21 @@ func poller(p *Producer, termChan chan bool) { // else information about all topics is returned. // GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) { - err := p.verifyClient() - if err != nil { - return nil, err - } - return getMetadata(p, topic, allTopics, timeoutMs) + err := p.verifyClient() + if err != nil { + return nil, err + } + return getMetadata(p, topic, allTopics, timeoutMs) } // QueryWatermarkOffsets returns the broker's low and high offsets for the given topic // and partition. func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) { - err = p.verifyClient() - if err != nil { - return -1, -1, err - } - return queryWatermarkOffsets(p, topic, partition, timeoutMs) + err = p.verifyClient() + if err != nil { + return -1, -1, err + } + return queryWatermarkOffsets(p, topic, partition, timeoutMs) } // OffsetsForTimes looks up offsets by timestamp for the given partitions. @@ -735,26 +735,26 @@ func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutM // Duplicate Topic+Partitions are not supported. // Per-partition errors may be returned in the `.Error` field. func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) { - err = p.verifyClient() - if err != nil { - return nil, err - } - return offsetsForTimes(p, times, timeoutMs) + err = p.verifyClient() + if err != nil { + return nil, err + } + return offsetsForTimes(p, times, timeoutMs) } // GetFatalError returns an Error object if the client instance has raised a fatal error, else nil. func (p *Producer) GetFatalError() error { - err := p.verifyClient() - if err != nil { - return err - } - return getFatalError(p) + err := p.verifyClient() + if err != nil { + return err + } + return getFatalError(p) } // TestFatalError triggers a fatal error in the underlying client. // This is to be used strictly for testing purposes. func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode { - return testFatalError(p, code, str) + return testFatalError(p, code, str) } // SetOAuthBearerToken sets the the data to be transmitted @@ -768,11 +768,11 @@ func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode { // 3) SASL/OAUTHBEARER is supported but is not configured as the client's // authentication mechanism. func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error { - err := p.verifyClient() - if err != nil { - return err - } - return p.handle.setOAuthBearerToken(oauthBearerToken) + err := p.verifyClient() + if err != nil { + return err + } + return p.handle.setOAuthBearerToken(oauthBearerToken) } // SetOAuthBearerTokenFailure sets the error message describing why token @@ -783,11 +783,11 @@ func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error // 2) SASL/OAUTHBEARER is supported but is not configured as the client's // authentication mechanism. func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error { - err := p.verifyClient() - if err != nil { - return err - } - return p.handle.setOAuthBearerTokenFailure(errstr) + err := p.verifyClient() + if err != nil { + return err + } + return p.handle.setOAuthBearerTokenFailure(errstr) } // Transactional API @@ -822,17 +822,17 @@ func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error { // by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal // error has been raised by calling `err.(kafka.Error).IsFatal()`. func (p *Producer) InitTransactions(ctx context.Context) error { - err := p.verifyClient() - if err != nil { - return err - } - cError := C.rd_kafka_init_transactions(p.handle.rk, - cTimeoutFromContext(ctx)) - if cError != nil { - return newErrorFromCErrorDestroy(cError) - } - - return nil + err := p.verifyClient() + if err != nil { + return err + } + cError := C.rd_kafka_init_transactions(p.handle.rk, + cTimeoutFromContext(ctx)) + if cError != nil { + return newErrorFromCErrorDestroy(cError) + } + + return nil } // BeginTransaction starts a new transaction. @@ -864,16 +864,16 @@ func (p *Producer) InitTransactions(ctx context.Context) error { // Any produce call outside an on-going transaction, or for a failed // transaction, will fail. func (p *Producer) BeginTransaction() error { - err := p.verifyClient() - if err != nil { - return err - } - cError := C.rd_kafka_begin_transaction(p.handle.rk) - if cError != nil { - return newErrorFromCErrorDestroy(cError) - } - - return nil + err := p.verifyClient() + if err != nil { + return err + } + cError := C.rd_kafka_begin_transaction(p.handle.rk) + if cError != nil { + return newErrorFromCErrorDestroy(cError) + } + + return nil } // SendOffsetsToTransaction sends a list of topic partition offsets to the @@ -913,32 +913,32 @@ func (p *Producer) BeginTransaction() error { // `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` // respectively. func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, consumerMetadata *ConsumerGroupMetadata) error { - err := p.verifyClient() - if err != nil { - return err - } - var cOffsets *C.rd_kafka_topic_partition_list_t - if offsets != nil { - cOffsets = newCPartsFromTopicPartitions(offsets) - defer C.rd_kafka_topic_partition_list_destroy(cOffsets) - } - - cgmd, err := deserializeConsumerGroupMetadata(consumerMetadata.serialized) - if err != nil { - return err - } - defer C.rd_kafka_consumer_group_metadata_destroy(cgmd) - - cError := C.rd_kafka_send_offsets_to_transaction( - p.handle.rk, - cOffsets, - cgmd, - cTimeoutFromContext(ctx)) - if cError != nil { - return newErrorFromCErrorDestroy(cError) - } - - return nil + err := p.verifyClient() + if err != nil { + return err + } + var cOffsets *C.rd_kafka_topic_partition_list_t + if offsets != nil { + cOffsets = newCPartsFromTopicPartitions(offsets) + defer C.rd_kafka_topic_partition_list_destroy(cOffsets) + } + + cgmd, err := deserializeConsumerGroupMetadata(consumerMetadata.serialized) + if err != nil { + return err + } + defer C.rd_kafka_consumer_group_metadata_destroy(cgmd) + + cError := C.rd_kafka_send_offsets_to_transaction( + p.handle.rk, + cOffsets, + cgmd, + cTimeoutFromContext(ctx)) + if cError != nil { + return newErrorFromCErrorDestroy(cError) + } + + return nil } // CommitTransaction commits the current transaction. @@ -973,17 +973,17 @@ func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []Topic // `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` // respectively. func (p *Producer) CommitTransaction(ctx context.Context) error { - err := p.verifyClient() - if err != nil { - return err - } - cError := C.rd_kafka_commit_transaction(p.handle.rk, - cTimeoutFromContext(ctx)) - if cError != nil { - return newErrorFromCErrorDestroy(cError) - } - - return nil + err := p.verifyClient() + if err != nil { + return err + } + cError := C.rd_kafka_commit_transaction(p.handle.rk, + cTimeoutFromContext(ctx)) + if cError != nil { + return newErrorFromCErrorDestroy(cError) + } + + return nil } // AbortTransaction aborts the ongoing transaction. @@ -1013,17 +1013,17 @@ func (p *Producer) CommitTransaction(ctx context.Context) error { // by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal error // has been raised by calling `err.(kafka.Error).IsFatal()`. func (p *Producer) AbortTransaction(ctx context.Context) error { - err := p.verifyClient() - if err != nil { - return err - } - cError := C.rd_kafka_abort_transaction(p.handle.rk, - cTimeoutFromContext(ctx)) - if cError != nil { - return newErrorFromCErrorDestroy(cError) - } - - return nil + err := p.verifyClient() + if err != nil { + return err + } + cError := C.rd_kafka_abort_transaction(p.handle.rk, + cTimeoutFromContext(ctx)) + if cError != nil { + return newErrorFromCErrorDestroy(cError) + } + + return nil } // SetSaslCredentials sets the SASL credentials used for this producer. The new credentials @@ -1033,9 +1033,9 @@ func (p *Producer) AbortTransaction(ctx context.Context) error { // existing broker connections that were established with the old credentials. // This method applies only to the SASL PLAIN and SCRAM mechanisms. func (p *Producer) SetSaslCredentials(username, password string) error { - err := p.verifyClient() - if err != nil { - return err - } - return setSaslCredentials(p.handle.rk, username, password) + err := p.verifyClient() + if err != nil { + return err + } + return setSaslCredentials(p.handle.rk, username, password) }