diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad6e4ce2..fff3448e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Confluent's Golang client for Apache Kafka +## v2.2.1 + +This is a maintenance release: + +* Bundles librdkafka v2.2.0. +* SPIRE example [producer](examples/spire_producer.example), [consumer](examples/spire_consumer_example) for clients to fetch JWT token by communicating with SPIRE agent. + + +confluent-kafka-go is based on librdkafka v2.2.0, see the +[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0) +for a complete list of changes, enhancements, fixes and upgrade considerations. # v2.2.0 diff --git a/examples/.gitignore b/examples/.gitignore index 3519e7b27..acc862078 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -33,5 +33,7 @@ producer_custom_channel_example/producer_custom_channel_example producer_example/producer_example protobuf_consumer_example/protobuf_consumer_example protobuf_producer_example/protobuf_producer_example +spire_consumer_example/spire_consumer_example +spire_producer.example/spire_producer.example stats_example/stats_example transactions_example/transactions_example diff --git a/examples/go.mod b/examples/go.mod index 1d1e84e5c..77e36b04e 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -10,5 +10,6 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/confluentinc/confluent-kafka-go/v2 v2.2.0-RC1 github.com/gdamore/tcell v1.4.0 + github.com/spiffe/go-spiffe/v2 v2.1.6 // indirect google.golang.org/protobuf v1.30.0 ) diff --git a/examples/go.sum b/examples/go.sum index 0aaf7eb65..3db96927d 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -617,6 +617,8 @@ github.com/Microsoft/go-winio v0.4.17/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOp github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= +github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= +github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg= github.com/Microsoft/hcsshim v0.8.7-0.20190325164909-8abdbb8205e4/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg= github.com/Microsoft/hcsshim v0.8.7/go.mod h1:OHd7sQqRFrYd3RmSgbgji+ctCwkbq2wbEYNSzOYtcBQ= @@ -951,6 +953,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo= +github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= @@ -1488,6 +1492,8 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/spiffe/go-spiffe/v2 v2.1.6 h1:4SdizuQieFyL9eNU+SPiCArH4kynzaKOOj0VvM8R7Xo= +github.com/spiffe/go-spiffe/v2 v2.1.6/go.mod h1:eVDqm9xFvyqao6C+eQensb9ZPkyNEeaUbqbBpOhBnNk= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -1551,6 +1557,8 @@ github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= +github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1615,6 +1623,7 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -1625,6 +1634,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1680,6 +1691,7 @@ golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1754,6 +1766,7 @@ golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= +golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= @@ -1933,6 +1946,7 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= @@ -1944,6 +1958,7 @@ golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= +golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= @@ -2049,6 +2064,7 @@ golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -2310,10 +2326,12 @@ google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= +google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= +google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20/go.mod h1:Nr5H8+MlGWr5+xX/STzdoEqJrO+YteqFbMyCsrb6mH0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/examples/spire_consumer_example/spire_consumer_example.go b/examples/spire_consumer_example/spire_consumer_example.go new file mode 100644 index 000000000..3e06ac6d5 --- /dev/null +++ b/examples/spire_consumer_example/spire_consumer_example.go @@ -0,0 +1,165 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Example consumer with a custom SPIRE token implementation. +package main + +import ( + "context" + "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/spiffe/go-spiffe/v2/svid/jwtsvid" + "github.com/spiffe/go-spiffe/v2/workloadapi" + "os" + "os/signal" + "syscall" + "time" +) + +// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and +// sets the token on the client for use in any future authentication attempt. +// It must be invoked whenever kafka.OAuthBearerTokenRefresh appears on the client's event channel, +// which will occur whenever the client requires a token (i.e. when it first starts and when the +// previously-received token is 80% of the way to its expiration time). +func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string) { + fmt.Fprintf(os.Stderr, "Token refresh\n") + oauthBearerToken, closer, retrieveErr := retrieveJWTToken(ctx, principal, socketPath, audience) + defer closer() + if retrieveErr != nil { + fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr) + client.SetOAuthBearerTokenFailure(retrieveErr.Error()) + } else { + setTokenError := client.SetOAuthBearerToken(oauthBearerToken) + if setTokenError != nil { + fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError) + client.SetOAuthBearerTokenFailure(setTokenError.Error()) + } + } +} + +func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, func() error, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + jwtSource, err := workloadapi.NewJWTSource( + ctx, + workloadapi.WithClientOptions(workloadapi.WithAddr(socketPath)), + ) + + if err != nil { + return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to create JWTSource: %w", err) + } + + defer jwtSource.Close() + + params := jwtsvid.Params{ + Audience: audience[0], + // Other fields... + } + + jwtSVID, err := jwtSource.FetchJWTSVID(ctx, params) + if err != nil { + return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to fetch JWT SVID: %w", err) + } + + extensions := map[string]string{ + "logicalCluster": "lkc-0yoqvq", + "identityPoolId": "pool-W9j5", + } + oauthBearerToken := kafka.OAuthBearerToken{ + TokenValue: jwtSVID.Marshal(), + Expiration: jwtSVID.Expiry, + Principal: principal, + Extensions: extensions, + } + + return oauthBearerToken, jwtSource.Close, nil +} + +func main() { + if len(os.Args) != 5 { + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + topic := os.Args[2] + principal := os.Args[3] + socketPath := os.Args[4] + audience := []string{"audience1", "audience2"} + + config := kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "OAUTHBEARER", + "sasl.oauthbearer.config": principal, + } + + c, err := kafka.NewConsumer(&config) + + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err) + os.Exit(1) + } + + fmt.Printf("Created Consumer %v\n", c) + + err = c.SubscribeTopics([]string{topic}, nil) + + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to subscribe to topic: %s\n", topic) + os.Exit(1) + } + + run := true + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + + ctx := context.Background() + + for run { + select { + case sig := <-signalChannel: + fmt.Printf("Caught signal %v: terminating\n", sig) + run = false + default: + ev := c.Poll(100) + if ev == nil { + continue + } + + switch e := ev.(type) { + case *kafka.Message: + fmt.Printf("%% Message on %s:\n%s\n", + e.TopicPartition, string(e.Value)) + if e.Headers != nil { + fmt.Printf("%% Headers: %v\n", e.Headers) + } + case kafka.Error: + // Errors should generally be considered + // informational, the client will try to + // automatically recover. + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + case kafka.OAuthBearerTokenRefresh: + handleJWTTokenRefreshEvent(ctx, c, principal, socketPath, audience) + default: + fmt.Printf("Ignored %v\n", e) + } + } + } + + fmt.Printf("Closing consumer\n") + c.Close() +} diff --git a/examples/spire_producer.example/spire_producer.example.go b/examples/spire_producer.example/spire_producer.example.go new file mode 100644 index 000000000..6622c1063 --- /dev/null +++ b/examples/spire_producer.example/spire_producer.example.go @@ -0,0 +1,172 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Example producer with a custom SPIRE token implementation. +package main + +import ( + "context" + "fmt" + "github.com/spiffe/go-spiffe/v2/svid/jwtsvid" + "github.com/spiffe/go-spiffe/v2/workloadapi" + "os" + "os/signal" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +// handleJWTTokenRefreshEvent retrieves JWT from the SPIFFE workload API and +// sets the token on the client for use in any future authentication attempt. +// It must be invoked whenever kafka.OAuthBearerTokenRefresh appears on the client's event channel, +// which will occur whenever the client requires a token (i.e. when it first starts and when the +// previously-received token is 80% of the way to its expiration time). +func handleJWTTokenRefreshEvent(ctx context.Context, client kafka.Handle, principal, socketPath string, audience []string) { + fmt.Fprintf(os.Stderr, "Token refresh\n") + oauthBearerToken, closer, retrieveErr := retrieveJWTToken(ctx, principal, socketPath, audience) + defer closer() + if retrieveErr != nil { + fmt.Fprintf(os.Stderr, "%% Token retrieval error: %v\n", retrieveErr) + client.SetOAuthBearerTokenFailure(retrieveErr.Error()) + } else { + setTokenError := client.SetOAuthBearerToken(oauthBearerToken) + if setTokenError != nil { + fmt.Fprintf(os.Stderr, "%% Error setting token and extensions: %v\n", setTokenError) + client.SetOAuthBearerTokenFailure(setTokenError.Error()) + } + } +} + +func retrieveJWTToken(ctx context.Context, principal, socketPath string, audience []string) (kafka.OAuthBearerToken, func() error, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + jwtSource, err := workloadapi.NewJWTSource( + ctx, + workloadapi.WithClientOptions(workloadapi.WithAddr(socketPath)), + ) + + if err != nil { + return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to create JWTSource: %w", err) + } + + defer jwtSource.Close() + + params := jwtsvid.Params{ + // initialize the fields of Params here + Audience: audience[0], + // Other fields... + } + + jwtSVID, err := jwtSource.FetchJWTSVID(ctx, params) + if err != nil { + return kafka.OAuthBearerToken{}, nil, fmt.Errorf("unable to fetch JWT SVID: %w", err) + } + + extensions := map[string]string{ + "logicalCluster": "lkc-0yoqvq", + "identityPoolId": "pool-W9j5", + } + oauthBearerToken := kafka.OAuthBearerToken{ + TokenValue: jwtSVID.Marshal(), + Expiration: jwtSVID.Expiry, + Principal: principal, + Extensions: extensions, + } + + return oauthBearerToken, jwtSource.Close, nil +} + +func main() { + + if len(os.Args) != 5 { + fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) + os.Exit(1) + } + + bootstrapServers := os.Args[1] + topic := os.Args[2] + principal := os.Args[3] + socketPath := os.Args[4] + audience := []string{"audience1", "audience2"} + + // You'll probably need to modify this configuration to + // match your environment. + config := kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "OAUTHBEARER", + "sasl.oauthbearer.config": principal, + } + + p, err := kafka.NewProducer(&config) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create producer: %s\n", err) + os.Exit(1) + } + + // Token refresh events are posted on the Events channel, instructing + // the application to refresh its token. + ctx := context.Background() + + go func(eventsChan chan kafka.Event) { + for ev := range eventsChan { + _, ok := ev.(kafka.OAuthBearerTokenRefresh) + if !ok { + // Ignore other event types + continue + } + + handleJWTTokenRefreshEvent(ctx, p, principal, socketPath, audience) + } + }(p.Events()) + + run := true + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + + msgcnt := 0 + for run { + select { + case sig := <-signalChannel: + fmt.Printf("Caught signal %v: terminating\n", sig) + run = false + default: + value := fmt.Sprintf("Producer example, message #%d", msgcnt) + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, + Value: []byte(value), + Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, + }, nil) + + if err != nil { + if err.(kafka.Error).Code() == kafka.ErrQueueFull { + // Producer queue is full, wait 1s for messages + // to be delivered then try again. + time.Sleep(time.Second) + continue + } + fmt.Printf("Failed to produce message: %v\n", err) + } else { + fmt.Printf("Produced message: %s\n", value) + } + + time.Sleep(1 * time.Second) + msgcnt++ + } + } + + p.Close() +}