Skip to content

Commit

Permalink
feat(transport): add mTLS for Kafka (#367)
Browse files Browse the repository at this point in the history
Adds Kafka transport CLI options to use a specific CA and client certificates.
  • Loading branch information
lspgn authored Dec 31, 2024
1 parent a762550 commit 99f3629
Showing 1 changed file with 57 additions and 1 deletion.
58 changes: 57 additions & 1 deletion transport/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package kafka
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"strconv"
Expand All @@ -18,7 +20,12 @@ import (
)

type KafkaDriver struct {
kafkaTLS bool
kafkaTLS bool
kafkaClientCert string
kafkaClientKey string
kafkaServerCA string
kafkaTlsInsecure bool

kafkaSASL string
kafkaTopic string
kafkaSrv string
Expand Down Expand Up @@ -83,6 +90,12 @@ var (

func (d *KafkaDriver) Prepare() error {
flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka")

flag.StringVar(&d.kafkaClientCert, "transport.kafka.tls.client", "", "Kafka client certificate")
flag.StringVar(&d.kafkaClientKey, "transport.kafka.tls.key", "", "Kafka client key")
flag.StringVar(&d.kafkaServerCA, "transport.kafka.tls.ca", "", "Kafka certificate authority")
flag.BoolVar(&d.kafkaTlsInsecure, "transport.kafka.tls.insecure", false, "Skips TLS verification")

flag.StringVar(&d.kafkaSASL, "transport.kafka.sasl", "none",
fmt.Sprintf(
"Use SASL to connect to Kafka, available settings: %s (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)",
Expand Down Expand Up @@ -151,6 +164,49 @@ func (d *KafkaDriver) Init() error {
RootCAs: rootCAs,
MinVersion: tls.VersionTLS12,
}

kafkaConfig.Net.TLS.Config.InsecureSkipVerify = d.kafkaTlsInsecure

if d.kafkaServerCA != "" {
serverCaFile, err := os.Open(d.kafkaServerCA)
if err != nil {
return fmt.Errorf("error initializing server CA: %v", err)
}

serverCaBytes, err := io.ReadAll(serverCaFile)
serverCaFile.Close()
if err != nil {
return fmt.Errorf("error reading server CA: %v", err)
}

block, _ := pem.Decode(serverCaBytes)

serverCa, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return fmt.Errorf("error parsing server CA: %v", err)
}

certPool := x509.NewCertPool()
certPool.AddCert(serverCa)

kafkaConfig.Net.TLS.Config.RootCAs = certPool
}

if d.kafkaClientCert != "" && d.kafkaClientKey != "" {
_, err := tls.LoadX509KeyPair(d.kafkaClientCert, d.kafkaClientKey)
if err != nil {
return fmt.Errorf("error initializing mTLS: %v", err)
}

kafkaConfig.Net.TLS.Config.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(d.kafkaClientCert, d.kafkaClientKey)
if err != nil {
return nil, err
}
return &cert, nil
}
}

}

if d.kafkaHashing {
Expand Down

0 comments on commit 99f3629

Please sign in to comment.