Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): add mTLS for Kafka #367

Merged
merged 5 commits into from
Dec 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion transport/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package kafka
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"strconv"
Expand All @@ -18,7 +20,12 @@ import (
)

type KafkaDriver struct {
kafkaTLS bool
kafkaTLS bool
kafkaClientCert string
kafkaClientKey string
kafkaServerCA string
kafkaTlsInsecure bool

kafkaSASL string
kafkaTopic string
kafkaSrv string
Expand Down Expand Up @@ -83,6 +90,12 @@ var (

func (d *KafkaDriver) Prepare() error {
flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka")

flag.StringVar(&d.kafkaClientCert, "transport.kafka.tls.client", "", "Kafka client certificate")
flag.StringVar(&d.kafkaClientKey, "transport.kafka.tls.key", "", "Kafka client key")
flag.StringVar(&d.kafkaServerCA, "transport.kafka.tls.ca", "", "Kafka certificate authority")
flag.BoolVar(&d.kafkaTlsInsecure, "transport.kafka.tls.insecure", false, "Skips TLS verification")

flag.StringVar(&d.kafkaSASL, "transport.kafka.sasl", "none",
fmt.Sprintf(
"Use SASL to connect to Kafka, available settings: %s (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)",
Expand Down Expand Up @@ -151,6 +164,49 @@ func (d *KafkaDriver) Init() error {
RootCAs: rootCAs,
MinVersion: tls.VersionTLS12,
}

kafkaConfig.Net.TLS.Config.InsecureSkipVerify = d.kafkaTlsInsecure

if d.kafkaServerCA != "" {
serverCaFile, err := os.Open(d.kafkaServerCA)
if err != nil {
return fmt.Errorf("error initializing server CA: %v", err)
}

serverCaBytes, err := io.ReadAll(serverCaFile)
serverCaFile.Close()
if err != nil {
return fmt.Errorf("error reading server CA: %v", err)
}

block, _ := pem.Decode(serverCaBytes)

serverCa, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return fmt.Errorf("error parsing server CA: %v", err)
}

certPool := x509.NewCertPool()
certPool.AddCert(serverCa)

kafkaConfig.Net.TLS.Config.RootCAs = certPool
}

if d.kafkaClientCert != "" && d.kafkaClientKey != "" {
_, err := tls.LoadX509KeyPair(d.kafkaClientCert, d.kafkaClientKey)
if err != nil {
return fmt.Errorf("error initializing mTLS: %v", err)
}

kafkaConfig.Net.TLS.Config.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(d.kafkaClientCert, d.kafkaClientKey)
if err != nil {
return nil, err
}
return &cert, nil
}
}

}

if d.kafkaHashing {
Expand Down
Loading