Skip to content

Commit

Permalink
support Kafka scram-sha-256, scram-sha-512 and aws-msk-iam SASL
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Nov 2, 2024
1 parent f0a8743 commit 4418b72
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions internal/consuming/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/centrifugal/centrifuge"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
)

type KafkaConfig struct {
Expand Down Expand Up @@ -157,13 +159,30 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) {
}

if c.config.SASLMechanism != "" {
if c.config.SASLMechanism != "plain" {
return nil, fmt.Errorf("only plain SASL auth mechanism is supported")
switch c.config.SASLMechanism {
case "plain":
opts = append(opts, kgo.SASL(plain.Auth{
User: c.config.SASLUser,
Pass: c.config.SASLPassword,
}.AsMechanism()))
case "scram-sha-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: c.config.SASLUser,
Pass: c.config.SASLPassword,
}.AsSha256Mechanism()))
case "scram-sha-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: c.config.SASLUser,
Pass: c.config.SASLPassword,
}.AsSha512Mechanism()))
case "aws-msk-iam":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: c.config.SASLUser,
SecretKey: c.config.SASLPassword,
}.AsManagedStreamingIAMMechanism()))
default:
return nil, fmt.Errorf("unsupported SASL mechanism: %s", c.config.SASLMechanism)
}
opts = append(opts, kgo.SASL(plain.Auth{
User: c.config.SASLUser,
Pass: c.config.SASLPassword,
}.AsMechanism()))
}

client, err := kgo.NewClient(opts...)
Expand Down

0 comments on commit 4418b72

Please sign in to comment.