Skip to content

Commit

Permalink
Add PLAIN sasl
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 12, 2024
1 parent e0f1cd5 commit 0bf6a12
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 4 deletions.
7 changes: 6 additions & 1 deletion flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kslog"
"github.com/yuin/gopher-lua"
Expand Down Expand Up @@ -44,11 +45,15 @@ func NewKafkaConnector(
optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}))
}
if config.Username != "" {
auth := scram.Auth{User: config.Username, Pass: config.Password}
switch config.Sasl {
case "PLAIN":
auth := plain.Auth{User: config.Username, Pass: config.Password}
optionalOpts = append(optionalOpts, kgo.SASL(auth.AsMechanism()))
case "SCRAM-SHA-256":
auth := scram.Auth{User: config.Username, Pass: config.Password}
optionalOpts = append(optionalOpts, kgo.SASL(auth.AsSha256Mechanism()))
case "SCRAM-SHA-512":
auth := scram.Auth{User: config.Username, Pass: config.Password}
optionalOpts = append(optionalOpts, kgo.SASL(auth.AsSha512Mechanism()))
default:
return nil, fmt.Errorf("unsupported SASL mechanism: %s", config.Sasl)
Expand Down
2 changes: 1 addition & 1 deletion ui/app/peers/create/[peerType]/helpers/ka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ export const blankKaSetting: KafkaConfig = {
servers: [],
username: '',
password: '',
sasl: 'SCRAM-SHA-512',
sasl: 'PLAIN',
disableTls: false,
};
4 changes: 2 additions & 2 deletions ui/app/peers/create/[peerType]/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ export const kaSchema = z.object({
.string({ required_error: 'Password is required' })
.min(1, { message: 'Password cannot be empty' }),
sasl: z
.union([z.literal('SCRAM-SHA-256'), z.literal('SCRAM-SHA-512')], {
.union([z.literal('PLAIN'), z.literal('SCRAM-SHA-256'), z.literal('SCRAM-SHA-512')], {
errorMap: () => ({
message: 'Either SCRAM-SHA-256 or SCRAM-SHA-512 is required.',
message: 'One of PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 is required.',
}),
})
.optional(),
Expand Down
1 change: 1 addition & 0 deletions ui/components/PeerForms/KafkaConfig.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ interface KafkaProps {
}

const saslOptions = [
{ value: 'PLAIN', label: 'PLAIN' },
{ value: 'SCRAM-SHA-256', label: 'SCRAM-SHA-256' },
{ value: 'SCRAM-SHA-512', label: 'SCRAM-SHA-512' },
];
Expand Down

0 comments on commit 0bf6a12

Please sign in to comment.