Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka 3.8.0 #840

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ This library attempts to provide an intuitive API while interacting with Kafka t

## Features

- Feature complete client (Kafka >= 0.8.0 through v3.4+)
- Feature complete client (Kafka >= 0.8.0 through v3.8+) _minus_ the next generation group protocol
- Full Exactly-Once-Semantics (EOS)
- Idempotent & transactional producers
- Simple (legacy) consumer
Expand Down Expand Up @@ -403,11 +403,13 @@ generation.
| [KIP-841](https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft) — `AlterPartition.TopicID` | 3.3 | Supported |
| [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) — Next gen consumer rebalance protocol | 3.7 | Unsupported (proto supported) |
| [KIP-866](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) — ZK to Raft RPC changes | 3.4 | Supported |
| [KIP-890](https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense) — Transactions server side defense | 3.8 (partial) | Supported |
| [KIP-893](https://cwiki.apache.org/confluence/display/KAFKA/KIP-893%3A+The+Kafka+protocol+should+support+nullable+structs) — Nullable structs in the protocol | 3.5 | Supported |
| [KIP-899](https://cwiki.apache.org/confluence/display/KAFKA/KIP-899%3A+Allow+clients+to+rebootstrap) — Allow clients to rebootstrap | ? | Supported (`UpdateSeedBrokers`) |
| [KIP-903](https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR) — Stale broker epoch fencing | 3.5 | Supported (proto) |
| [KIP-919](https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration) — Admin client talk to KRaft , Controller registration | 3.7 | Supported (proto) |
| [KIP-951](https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client) — Leader discovery optimizations | 3.7 | Supported |
| [KIP-994](https://cwiki.apache.org/confluence/display/KAFKA/KIP-994%3A+Minor+Enhancements+to+ListTransactions+and+DescribeTransactions+APIs) — List/Describe transactions enhancements | 3.8 (partial) | Supported |

Missing from above but included in librdkafka is:

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/klauspost/compress v1.17.8
github.com/pierrec/lz4/v4 v4.1.21
github.com/twmb/franz-go/pkg/kmsg v1.8.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
golang.org/x/crypto v0.23.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
20 changes: 20 additions & 0 deletions pkg/kerr/kerr.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ var (
MismatchedEndpointType = &Error{"MISMATCHED_ENDPOINT_TYPE", 114, false, "The request was sent to an endpoint of the wrong type."}
UnsupportedEndpointType = &Error{"UNSUPPORTED_ENDPOINT_TYPE", 115, false, "This endpoint type is not supported yet."}
UnknownControllerID = &Error{"UNKNOWN_CONTROLLER_ID", 116, false, "This controller ID is not known"}

// UnknownSubscriptionID = &Error{"UNKNOWN_SUBSCRIPTION_ID", 117, false, "Client sent a push telemetry request with an invalid or outdated subscription ID."}
// TelemetryTooLarge = &Error{"TELEMETRY_TOO_LARGE", 118, false, "Client sent a push telemetry request larger than the maximum size the broker will accept."}
// InvalidRegistration = &Error{"INVALID_REGISTRATION", 119, false, "The controller has considered the broker registration to be invalid."}

TransactionAbortable = &Error{"TRANSACTION_ABORTABLE", 120, false, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID."}

// InvalidRecordState = &Error{"INVALID_RECORD_STATE", 121, false, "The record state is invalid. The acknowledgement of delivery could not be completed."}
// ShareSessionNowFound = &Error{"SHARE_SESSION_NOT_FOUND", 122, false, "The share session was not found."}
// InvalidShareSessionEpoch = &Error{"INVALID_SHARE_SESSION_EPOCH", 123, false, "The share session epoch is invalid."}
// FencedStateEpoch = &Error{"FENCED_STATE_EPOCH", 124, false, "The share coordinator rejected the request because the share-group state epoch did not match."}
// InvalidVoterKey = &Error{"INVALID_VOTER_KEY", 125, false, "The voter key doesn't match the receiving replica's key."}
// DuplicateVoter = &Error{"DUPLICATE_VOTER", 126, false, "The voter is already part of the set of voters."}
// VoterNotFound = &Error{"VOTER_NOT_FOUND", 127, false, "The voter is not part of the set of voters."}
// InvalidRegularExpression = &Error{"INVALID_REGULAR_EXPRESSION", 128, false, "The regular expression is not valid."}
)

var code2err = map[int16]error{
Expand Down Expand Up @@ -312,4 +327,9 @@ var code2err = map[int16]error{
115: UnsupportedEndpointType, // ""
116: UnknownControllerID, // ""

// 117: UnknownSubscriptionID, // KIP-714 f1819f448 KAFKA-15778 & KAFKA-15779
// 118: TelemetryTooLarge, // ""
// 119: InvalidRegistration, // KIP-858 f467f6bb4 KAFKA-15361

120: TransactionAbortable, // KIP-890 2e8d69b78 KAFKA-16314
}
11 changes: 8 additions & 3 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func (s *sink) produce(sem <-chan struct{}) bool {

if txnReq != nil {
// txnReq can fail from:
// - TransactionAbortable
// - retry failure
// - auth failure
// - producer id mapping / epoch errors
Expand All @@ -417,6 +418,10 @@ func (s *sink) produce(sem <-chan struct{}) bool {
batchesStripped, err := s.doTxnReq(req, txnReq)
if err != nil {
switch {
case errors.Is(err, kerr.TransactionAbortable):
// If we get TransactionAbortable, we continue into producing.
// The produce will fail with the same error, and this is the
// only way to notify the user to abort the txn.
case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err):
s.cl.bumpRepeatedLoadErr(err)
s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err)
Expand All @@ -431,8 +436,8 @@ func (s *sink) produce(sem <-chan struct{}) bool {
// with produce request vs. end txn (KAFKA-12671)
s.cl.failProducerID(id, epoch, err)
s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err)
return false
}
return false
}

// If we stripped everything, ensure we backoff to force a
Expand Down Expand Up @@ -563,7 +568,7 @@ func (s *sink) issueTxnReq(
continue
}
for _, partition := range topic.Partitions {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil {
if err := kerr.ErrorForCode(partition.ErrorCode); err != nil && err != kerr.TransactionAbortable { // see below for txn abortable
// OperationNotAttempted is set for all partitions that are authorized
// if any partition is unauthorized _or_ does not exist. We simply remove
// unattempted partitions and treat them as retryable.
Expand Down Expand Up @@ -2057,7 +2062,7 @@ func (b *recBatch) tryBuffer(pr promisedRec, produceVersion, maxBatchBytes int32
//////////////

func (*produceRequest) Key() int16 { return 0 }
func (*produceRequest) MaxVersion() int16 { return 10 }
func (*produceRequest) MaxVersion() int16 { return 11 }
func (p *produceRequest) SetVersion(v int16) { p.version = v }
func (p *produceRequest) GetVersion() int16 { return p.version }
func (p *produceRequest) IsFlexible() bool { return p.version >= 9 }
Expand Down
15 changes: 11 additions & 4 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator),
errors.Is(err, kerr.ConcurrentTransactions),
errors.Is(err, kerr.UnknownServerError):
errors.Is(err, kerr.UnknownServerError),
errors.Is(err, kerr.TransactionAbortable):
return true
}
return false
Expand Down Expand Up @@ -408,6 +409,11 @@ retry:
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.TransactionAbortable):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction returned TransactionAbortable; retrying as abort")
willTryCommit = false
goto retry

case errors.Is(endTxnErr, kerr.UnknownServerError):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying")
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
Expand Down Expand Up @@ -517,7 +523,7 @@ const (
// Deprecated: Kafka 3.6 removed support for the hacky behavior that
// this option was abusing. Thus, as of Kafka 3.6, this option does not
// work against Kafka. This option also has never worked for Redpanda
// becuse Redpanda always strictly validated that partitions were a
// because Redpanda always strictly validated that partitions were a
// part of a transaction. Later versions of Kafka and Redpanda will
// remove the need for AddPartitionsToTxn at all and thus this option
// ultimately will be unnecessary anyway.
Expand Down Expand Up @@ -820,8 +826,9 @@ func (cl *Client) UnsafeAbortBufferedRecords() {
//
// If the producer ID has an error and you are trying to commit, this will
// return with kerr.OperationNotAttempted. If this happened, retry
// EndTransaction with TryAbort. Not other error is retryable, and you should
// not retry with TryAbort.
// EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you
// can retry with TryAbort. No other error is retryable, and you should not
// retry with TryAbort.
//
// If records failed with UnknownProducerID and your Kafka version is at least
// 2.5, then aborting here will potentially allow the client to recover for
Expand Down
36 changes: 34 additions & 2 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var versions = []struct {
{"v3.5", V3_5_0()},
{"v3.6", V3_6_0()},
{"v3.7", V3_7_0()},
{"v3.8", V3_8_0()},
}

// VersionStrings returns all recognized versions, minus any patch, that can be
Expand Down Expand Up @@ -333,7 +334,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
//
// TODO: add introduced-version to differentiate some specific
// keys.
skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67},
skipKeys: []int16{4, 5, 6, 7, 27, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 64, 67, 74, 75},
}
for _, opt := range opts {
opt.apply(&cfg)
Expand Down Expand Up @@ -378,6 +379,7 @@ func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
{max350, "v3.5"},
{max360, "v3.6"},
{max370, "v3.7"},
{max380, "v3.8"},
} {
for k, v := range comparison.cmp.filter(cfg.listener) {
if v == -1 {
Expand Down Expand Up @@ -520,6 +522,7 @@ func V3_4_0() *Versions { return zkBrokerOf(max340) }
func V3_5_0() *Versions { return zkBrokerOf(max350) }
func V3_6_0() *Versions { return zkBrokerOf(max360) }
func V3_7_0() *Versions { return zkBrokerOf(max370) }
func V3_8_0() *Versions { return zkBrokerOf(max380) }

func zkBrokerOf(lks listenerKeys) *Versions {
return &Versions{lks.filter(zkBroker)}
Expand Down Expand Up @@ -1158,8 +1161,37 @@ var max370 = nextMax(max360, func(v listenerKeys) listenerKeys {
return v
})

var max380 = nextMax(max370, func(v listenerKeys) listenerKeys {
// KAFKA-16314 2e8d69b78ca52196decd851c8520798aa856c073 KIP-890
// Then error rename in cf1ba099c0723f9cf65dda4cd334d36b7ede6327
v[0].inc() // 11 produce
v[10].inc() // 5 find coordinator
v[22].inc() // 5 init producer id
v[24].inc() // 5 add partitions to txn
v[25].inc() // 4 add offsets to txn
v[26].inc() // 4 end txn
v[28].inc() // 4 txn offset commit

// KAFKA-15460 68745ef21a9d8fe0f37a8c5fbc7761a598718d46 KIP-848
v[16].inc() // 5 list groups

// KAFKA-14509 90e646052a17e3f6ec1a013d76c1e6af2fbb756e KIP-848 added
// 7b0352f1bd9b923b79e60b18b40f570d4bfafcc0
// b7c99e22a77392d6053fe231209e1de32b50a98b
// 68389c244e720566aaa8443cd3fc0b9d2ec4bb7a
// 5f410ceb04878ca44d2d007655155b5303a47907 stabilized
v = append(v,
k(zkBroker, rBroker), // 69 consumer group describe
)

// KAFKA-16265 b4e96913cc6c827968e47a31261e0bd8fdf677b5 KIP-994 (part 1)
v[66].inc()

return v
})

var (
maxStable = max370
maxStable = max380
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
return v
})
Expand Down