diff --git a/internal/util/kafka.go b/internal/util/kafka.go index a9ae46f4..f032c602 100644 --- a/internal/util/kafka.go +++ b/internal/util/kafka.go @@ -24,16 +24,14 @@ type kafkautil struct{} func (k kafkautil) NewReader(topic string) *kafka.Reader { var dialer *kafka.Dialer - if config.DefaultConfig.KafkaCAPath != "" { - saslMechanism, tlsConfig := getSaslAndTLSConfig() - dialer = &kafka.Dialer{ - Timeout: 10 * time.Second, - DualStack: true, - TLS: tlsConfig, - } - if config.DefaultConfig.KafkaUsername != "" && config.DefaultConfig.KafkaPassword != "" { - dialer.SASLMechanism = saslMechanism - } + saslMechanism, tlsConfig := getSaslAndTLSConfig() + dialer = &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + TLS: tlsConfig, + } + if config.DefaultConfig.KafkaUsername != "" && config.DefaultConfig.KafkaPassword != "" { + dialer.SASLMechanism = saslMechanism } return kafka.NewReader(kafka.ReaderConfig{ @@ -49,15 +47,12 @@ func (k kafkautil) NewReader(topic string) *kafka.Reader { func (k kafkautil) NewWriter(topic string) *kafka.Writer { var transport *kafka.Transport = kafka.DefaultTransport.(*kafka.Transport) - if config.DefaultConfig.KafkaCAPath != "" { - saslMechanism, tlsConfig := getSaslAndTLSConfig() - - transport = &kafka.Transport{ - TLS: tlsConfig, - } - if config.DefaultConfig.KafkaUsername != "" && config.DefaultConfig.KafkaPassword != "" { - transport.SASL = saslMechanism - } + saslMechanism, tlsConfig := getSaslAndTLSConfig() + transport = &kafka.Transport{ + TLS: tlsConfig, + } + if config.DefaultConfig.KafkaUsername != "" && config.DefaultConfig.KafkaPassword != "" { + transport.SASL = saslMechanism } return &kafka.Writer{