-
Notifications
You must be signed in to change notification settings - Fork 663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AUTHN-1982] Implement OAuthBearer mechanism for Kafka client to fetch JWT token by communicating with SPIRE agent #1015
base: master
Are you sure you want to change the base?
Changes from 8 commits
f3f7a55
1fdc6b0
d1f1fbd
87ba39f
1f69a9e
9e3286f
ff33ee2
728a155
92056a9
decdb12
89573cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
/** | ||
* Copyright 2023 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
// Example consumer with a custom SPIRE token implementation. | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/confluentinc/confluent-kafka-go/v2/kafka" | ||
"github.com/spiffe/go-spiffe/v2/svid/jwtsvid" | ||
"github.com/spiffe/go-spiffe/v2/workloadapi" | ||
"os" | ||
"os/signal" | ||
"syscall" | ||
"time" | ||
) | ||
|
||
// handleJWTTokenRefreshEvent retrieves JWT from the SPIRE workload API and | ||
// sets the token on the client for use in any future authentication attempt. | ||
// It must be invoked whenever kafka.OAuthBearerTokenRefresh appears on the client's event channel, | ||
// which will occur whenever the client requires a token (i.e. when it first starts and when the | ||
// previously-received token is 80% of the way to its expiration time). | ||
func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chang-you I don't think we're using the principal here correctly. In the oauth producer case, the principal needs to be passed in because it's the principal for which we get a token, and the token would then contain the principal in the For spire though, the principal value should be a spiffe id, and we should be using this spiffe id to look through all the svids returned by the spire agent to see if one of them matches the given principal, and use that one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arvindth Do you suggest we should use FetchJWTSVIDs instead of FetchJWTSVID and use the JWTSVID with its |
||
fmt.Fprintf(os.Stderr, "Token refresh\n") | ||
oauthBearerToken, closer, retrieveErr := retrieveJWTToken(ctx, principal, socketPath, audience) | ||
defer closer() | ||
if retrieveErr != nil { | ||
fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr) | ||
client.SetOAuthBearerTokenFailure(retrieveErr.Error()) | ||
} else { | ||
setTokenError := client.SetOAuthBearerToken(oauthBearerToken) | ||
if setTokenError != nil { | ||
fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError) | ||
client.SetOAuthBearerTokenFailure(setTokenError.Error()) | ||
} | ||
} | ||
} | ||
|
||
func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, func() error, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment for this method. We don't need to describe the exact logic that the method uses (in fact, it's probably a fair idea to say that we can use any arbitrary logic, like here we are using the go-spiffe library as long as we construct a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the comments. To clarify:
Here do you suggest that we don't need to include a description of this method in the readme, or is there something specific I should modify within this method? |
||
ctx, cancel := context.WithTimeout(ctx, time.Second) | ||
defer cancel() | ||
jwtSource, err := workloadapi.NewJWTSource( | ||
ctx, | ||
workloadapi.WithClientOptions(workloadapi.WithAddr(socketPath)), | ||
) | ||
|
||
if err != nil { | ||
return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to create JWTSource: %w", err) | ||
} | ||
|
||
defer jwtSource.Close() | ||
|
||
params := jwtsvid.Params{ | ||
Audience: audience[0], | ||
// Other fields... | ||
} | ||
|
||
jwtSVID, err := jwtSource.FetchJWTSVID(ctx, params) | ||
if err != nil { | ||
return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to fetch JWT SVID: %w", err) | ||
} | ||
|
||
extensions := map[string]string{ | ||
"logicalCluster": "lkc-0yoqvq", | ||
"identityPoolId": "pool-W9j5", | ||
} | ||
oauthBearerToken := kafka.OAuthBearerToken{ | ||
TokenValue: jwtSVID.Marshal(), | ||
Expiration: jwtSVID.Expiry, | ||
Principal: principal, | ||
Extensions: extensions, | ||
} | ||
|
||
return oauthBearerToken, jwtSource.Close, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't seem necessary to return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! |
||
} | ||
|
||
func main() { | ||
if len(os.Args) != 5 { | ||
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <topic> <principal> <socketPath> \n", os.Args[0]) | ||
os.Exit(1) | ||
} | ||
|
||
bootstrapServers := os.Args[1] | ||
topic := os.Args[2] | ||
principal := os.Args[3] | ||
socketPath := os.Args[4] | ||
audience := []string{"audience1", "audience2"} | ||
|
||
config := kafka.ConfigMap{ | ||
"bootstrap.servers": bootstrapServers, | ||
"security.protocol": "SASL_SSL", | ||
"sasl.mechanisms": "OAUTHBEARER", | ||
"sasl.oauthbearer.config": principal, | ||
"group.id": "myGroup", | ||
"session.timeout.ms": 6000, | ||
"auto.offset.reset": "earliest", | ||
"enable.auto.offset.store": false, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this config, as it's not relevant to this example and will complicate things |
||
} | ||
|
||
c, err := kafka.NewConsumer(&config) | ||
|
||
if err != nil { | ||
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) | ||
os.Exit(1) | ||
} | ||
|
||
fmt.Printf("Created Consumer %v\n", c) | ||
|
||
err = c.SubscribeTopics([]string{topic}, nil) | ||
|
||
if err != nil { | ||
fmt.Fprintf(os.Stderr, "Failed to subscribe to topic: %s\n", topic) | ||
os.Exit(1) | ||
} | ||
|
||
run := true | ||
signalChannel := make(chan os.Signal, 1) | ||
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) | ||
|
||
ctx := context.Background() | ||
|
||
for run { | ||
select { | ||
case sig := <-signalChannel: | ||
fmt.Printf("Caught signal %v: terminating\n", sig) | ||
run = false | ||
default: | ||
ev := c.Poll(100) | ||
if ev == nil { | ||
continue | ||
} | ||
|
||
switch e := ev.(type) { | ||
case *kafka.Message: | ||
fmt.Printf("%% Message on %s:\n%s\n", | ||
e.TopicPartition, string(e.Value)) | ||
if e.Headers != nil { | ||
fmt.Printf("%% Headers: %v\n", e.Headers) | ||
} | ||
_, err := c.StoreMessage(e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the StoreMessage and the subsequent error check, as it's not needed after removing "enable.auto.offset.store" |
||
if err != nil { | ||
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n", | ||
e.TopicPartition) | ||
} | ||
case kafka.Error: | ||
// Errors should generally be considered | ||
// informational, the client will try to | ||
// automatically recover. | ||
// But in this example we choose to terminate | ||
// the application if all brokers are down. | ||
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) | ||
if e.Code() == kafka.ErrAllBrokersDown { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This if block isn't needed, you can remove the comment too, and replace with just
|
||
run = false | ||
} | ||
case kafka.OAuthBearerTokenRefresh: | ||
handleJWTTokenRefreshEvent(ctx, c, principal, socketPath, audience) | ||
default: | ||
fmt.Printf("Ignored %v\n", e) | ||
} | ||
} | ||
} | ||
|
||
fmt.Printf("Closing consumer\n") | ||
c.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for correct terminology.