diff --git a/README.md b/README.md index 76cbfb01..2a0d01bb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ This library attempts to provide an intuitive API while interacting with Kafka t ## Features -- Feature complete client (Kafka >= 0.8.0 through v3.4+) +- Feature complete client (Kafka >= 0.8.0 through v3.8+) _minus_ the next generation group protocol - Full Exactly-Once-Semantics (EOS) - Idempotent & transactional producers - Simple (legacy) consumer @@ -403,11 +403,13 @@ generation. | [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft) — `AlterPartition.TopicID` | 3.3 | Supported | | [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) — Next gen consumer rebalance protocol | 3.7 | Unsupported (proto supported) | | [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported | +| [KIP-890](https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense) — Transactions server side defense | 3.8 (partial) | Supported | | [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported | | [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) | | [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR) — Stale broker epoch fencing | 3.5 | Supported (proto) | | [KIP-919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration) — Admin client talk to KRaft , Controller registration | 3.7 | Supported (proto) | | [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) — Leader discovery optimizations | 3.7 | Supported | +| [KIP-994](https://cwiki.apache.org/confluence/display/KAFKA/KIP-994%3A+Minor+Enhancements+to+ListTransactions+and+DescribeTransactions+APIs) — List/Describe transactions enhancements | 3.8 (partial) | Supported | Missing from above but included in librdkafka is: diff --git a/go.mod b/go.mod index b29fcb56..2fa3ece2 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 require ( github.com/klauspost/compress v1.17.8 github.com/pierrec/lz4/v4 v4.1.21 - github.com/twmb/franz-go/pkg/kmsg v1.8.0 + github.com/twmb/franz-go/pkg/kmsg v1.9.0 golang.org/x/crypto v0.23.0 ) diff --git a/go.sum b/go.sum index 80dc0fb0..757ece4c 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,7 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= -github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= diff --git a/pkg/kerr/kerr.go b/pkg/kerr/kerr.go index 731a23a1..1f408783 100644 --- a/pkg/kerr/kerr.go +++ b/pkg/kerr/kerr.go @@ -190,6 +190,21 @@ var ( MismatchedEndpointType = &Error{"MISMATCHED_ENDPOINT_TYPE", 114, false, "The request was sent to an endpoint of the wrong type."} UnsupportedEndpointType = &Error{"UNSUPPORTED_ENDPOINT_TYPE", 115, false, "This endpoint type is not supported yet."} UnknownControllerID = &Error{"UNKNOWN_CONTROLLER_ID", 116, false, "This controller ID is not known"} + + // UnknownSubscriptionID = &Error{"UNKNOWN_SUBSCRIPTION_ID", 117, false, "Client sent a push telemetry request with an invalid or outdated subscription ID."} + // TelemetryTooLarge = &Error{"TELEMETRY_TOO_LARGE", 118, false, "Client sent a push telemetry request larger than the maximum size the broker will accept."} + // InvalidRegistration = &Error{"INVALID_REGISTRATION", 119, false, "The controller has considered the broker registration to be invalid."} + + TransactionAbortable = &Error{"TRANSACTION_ABORTABLE", 120, false, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID."} + + // InvalidRecordState = &Error{"INVALID_RECORD_STATE", 121, false, "The record state is invalid. The acknowledgement of delivery could not be completed."} + // ShareSessionNowFound = &Error{"SHARE_SESSION_NOT_FOUND", 122, false, "The share session was not found."} + // InvalidShareSessionEpoch = &Error{"INVALID_SHARE_SESSION_EPOCH", 123, false, "The share session epoch is invalid."} + // FencedStateEpoch = &Error{"FENCED_STATE_EPOCH", 124, false, "The share coordinator rejected the request because the share-group state epoch did not match."} + // InvalidVoterKey = &Error{"INVALID_VOTER_KEY", 125, false, "The voter key doesn't match the receiving replica's key."} + // DuplicateVoter = &Error{"DUPLICATE_VOTER", 126, false, "The voter is already part of the set of voters."} + // VoterNotFound = &Error{"VOTER_NOT_FOUND", 127, false, "The voter is not part of the set of voters."} + // InvalidRegularExpression = &Error{"INVALID_REGULAR_EXPRESSION", 128, false, "The regular expression is not valid."} ) var code2err = map[int16]error{ @@ -312,4 +327,9 @@ var code2err = map[int16]error{ 115: UnsupportedEndpointType, // "" 116: UnknownControllerID, // "" + // 117: UnknownSubscriptionID, // KIP-714 f1819f448 KAFKA-15778 & KAFKA-15779 + // 118: TelemetryTooLarge, // "" + // 119: InvalidRegistration, // KIP-858 f467f6bb4 KAFKA-15361 + + 120: TransactionAbortable, // KIP-890 2e8d69b78 KAFKA-16314 } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6d0f3dfe..fa66e304 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -406,6 +406,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { if txnReq != nil { // txnReq can fail from: + // - TransactionAbortable // - retry failure // - auth failure // - producer id mapping / epoch errors @@ -417,6 +418,10 @@ func (s *sink) produce(sem <-chan struct{}) bool { batchesStripped, err := s.doTxnReq(req, txnReq) if err != nil { switch { + case errors.Is(err, kerr.TransactionAbortable): + // If we get TransactionAbortable, we continue into producing. + // The produce will fail with the same error, and this is the + // only way to notify the user to abort the txn. case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err): s.cl.bumpRepeatedLoadErr(err) s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err) @@ -431,8 +436,8 @@ func (s *sink) produce(sem <-chan struct{}) bool { // with produce request vs. end txn (KAFKA-12671) s.cl.failProducerID(id, epoch, err) s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err) + return false } - return false } // If we stripped everything, ensure we backoff to force a @@ -563,7 +568,7 @@ func (s *sink) issueTxnReq( continue } for _, partition := range topic.Partitions { - if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { + if err := kerr.ErrorForCode(partition.ErrorCode); err != nil && err != kerr.TransactionAbortable { // see below for txn abortable // OperationNotAttempted is set for all partitions that are authorized // if any partition is unauthorized _or_ does not exist. We simply remove // unattempted partitions and treat them as retryable. @@ -2057,7 +2062,7 @@ func (b *recBatch) tryBuffer(pr promisedRec, produceVersion, maxBatchBytes int32 ////////////// func (*produceRequest) Key() int16 { return 0 } -func (*produceRequest) MaxVersion() int16 { return 10 } +func (*produceRequest) MaxVersion() int16 { return 11 } func (p *produceRequest) SetVersion(v int16) { p.version = v } func (p *produceRequest) GetVersion() int16 { return p.version } func (p *produceRequest) IsFlexible() bool { return p.version >= 9 } diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 25cfd443..68cba7cd 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -281,7 +281,8 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry errors.Is(err, kerr.CoordinatorLoadInProgress), errors.Is(err, kerr.NotCoordinator), errors.Is(err, kerr.ConcurrentTransactions), - errors.Is(err, kerr.UnknownServerError): + errors.Is(err, kerr.UnknownServerError), + errors.Is(err, kerr.TransactionAbortable): return true } return false @@ -408,6 +409,11 @@ retry: willTryCommit = false goto retry + case errors.Is(endTxnErr, kerr.TransactionAbortable): + s.cl.cfg.logger.Log(LogLevelInfo, "end transaction returned TransactionAbortable; retrying as abort") + willTryCommit = false + goto retry + case errors.Is(endTxnErr, kerr.UnknownServerError): s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying") after := time.NewTimer(s.cl.cfg.retryBackoff(tries)) @@ -517,7 +523,7 @@ const ( // Deprecated: Kafka 3.6 removed support for the hacky behavior that // this option was abusing. Thus, as of Kafka 3.6, this option does not // work against Kafka. This option also has never worked for Redpanda - // becuse Redpanda always strictly validated that partitions were a + // because Redpanda always strictly validated that partitions were a // part of a transaction. Later versions of Kafka and Redpanda will // remove the need for AddPartitionsToTxn at all and thus this option // ultimately will be unnecessary anyway. @@ -820,8 +826,9 @@ func (cl *Client) UnsafeAbortBufferedRecords() { // // If the producer ID has an error and you are trying to commit, this will // return with kerr.OperationNotAttempted. If this happened, retry -// EndTransaction with TryAbort. Not other error is retryable, and you should -// not retry with TryAbort. +// EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you +// can retry with TryAbort. No other error is retryable, and you should not +// retry with TryAbort. // // If records failed with UnknownProducerID and your Kafka version is at least // 2.5, then aborting here will potentially allow the client to recover for diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 3081c346..2267b919 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -67,6 +67,7 @@ var versions = []struct { {"v3.5", V3_5_0()}, {"v3.6", V3_6_0()}, {"v3.7", V3_7_0()}, + {"v3.8", V3_8_0()}, } // VersionStrings returns all recognized versions, minus any patch, that can be @@ -333,7 +334,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { // // TODO: add introduced-version to differentiate some specific // keys. - skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67}, + skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67, 74, 75}, } for _, opt := range opts { opt.apply(&cfg) @@ -378,6 +379,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { {max350, "v3.5"}, {max360, "v3.6"}, {max370, "v3.7"}, + {max380, "v3.8"}, } { for k, v := range comparison.cmp.filter(cfg.listener) { if v == -1 { @@ -520,6 +522,7 @@ func V3_4_0() *Versions { return zkBrokerOf(max340) } func V3_5_0() *Versions { return zkBrokerOf(max350) } func V3_6_0() *Versions { return zkBrokerOf(max360) } func V3_7_0() *Versions { return zkBrokerOf(max370) } +func V3_8_0() *Versions { return zkBrokerOf(max380) } func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} @@ -1158,8 +1161,37 @@ var max370 = nextMax(max360, func(v listenerKeys) listenerKeys { return v }) +var max380 = nextMax(max370, func(v listenerKeys) listenerKeys { + // KAFKA-16314 2e8d69b78ca52196decd851c8520798aa856c073 KIP-890 + // Then error rename in cf1ba099c0723f9cf65dda4cd334d36b7ede6327 + v[0].inc() // 11 produce + v[10].inc() // 5 find coordinator + v[22].inc() // 5 init producer id + v[24].inc() // 5 add partitions to txn + v[25].inc() // 4 add offsets to txn + v[26].inc() // 4 end txn + v[28].inc() // 4 txn offset commit + + // KAFKA-15460 68745ef21a9d8fe0f37a8c5fbc7761a598718d46 KIP-848 + v[16].inc() // 5 list groups + + // KAFKA-14509 90e646052a17e3f6ec1a013d76c1e6af2fbb756e KIP-848 added + // 7b0352f1bd9b923b79e60b18b40f570d4bfafcc0 + // b7c99e22a77392d6053fe231209e1de32b50a98b + // 68389c244e720566aaa8443cd3fc0b9d2ec4bb7a + // 5f410ceb04878ca44d2d007655155b5303a47907 stabilized + v = append(v, + k(zkBroker, rBroker), // 69 consumer group describe + ) + + // KAFKA-16265 b4e96913cc6c827968e47a31261e0bd8fdf677b5 KIP-994 (part 1) + v[66].inc() + + return v +}) + var ( - maxStable = max370 + maxStable = max380 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { return v })