Skip to content

Commit

Permalink
kgo: allow PreTxnCommitFnContext to modify empty offsets
Browse files Browse the repository at this point in the history
This builds the TxnOffsetCommitRequest early so that the hook can modify
it. If the modified request has no topics to commit, then we abort as
though uncommitted was empty.
  • Loading branch information
iamnoah authored and twmb committed Oct 21, 2023
1 parent 39e28c0 commit 54a7418
Showing 1 changed file with 53 additions and 50 deletions.
103 changes: 53 additions & 50 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"sync"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/twmb/franz-go/pkg/kerr"
)

// TransactionEndTry is simply a named bool.
Expand Down Expand Up @@ -1060,7 +1061,13 @@ func (cl *Client) commitTransactionOffsets(
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), errNotGroup)
return nil
}
if len(uncommitted) == 0 {

req, err := g.prepareTxnOffsetCommit(ctx, uncommitted)
if err != nil {
onDone(req, kmsg.NewPtrTxnOffsetCommitResponse(), err)
return g
}
if len(req.Topics) == 0 {
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), nil)
return g
}
Expand Down Expand Up @@ -1088,7 +1095,7 @@ func (cl *Client) commitTransactionOffsets(
g.mu.Lock()
defer g.mu.Unlock()

g.commitTxn(ctx, uncommitted, unblockJoinSync)
g.commitTxn(ctx, req, unblockJoinSync)
return g
}

Expand Down Expand Up @@ -1139,18 +1146,10 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
// commitTxn is ALMOST EXACTLY THE SAME as commit, but changed for txn types
// and we avoid updateCommitted. We avoid updating because we manually
// SetOffsets when ending the transaction.
func (g *groupConsumer) commitTxn(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error),
) {
func (g *groupConsumer) commitTxn(ctx context.Context, req *kmsg.TxnOffsetCommitRequest, onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error)) {
if onDone == nil { // note we must always call onDone
onDone = func(_ *kmsg.TxnOffsetCommitRequest, _ *kmsg.TxnOffsetCommitResponse, _ error) {}
}
if len(uncommitted) == 0 { // only empty if called thru autocommit / default revoke
onDone(kmsg.NewPtrTxnOffsetCommitRequest(), kmsg.NewPtrTxnOffsetCommitResponse(), nil)
return
}

if g.commitCancel != nil {
g.commitCancel() // cancel any prior commit
Expand All @@ -1169,22 +1168,6 @@ func (g *groupConsumer) commitTxn(
g.commitCancel = commitCancel
g.commitDone = commitDone

// We issue this request even if the producer ID is failed; the request
// will fail if it is.
//
// The id must have been set at least once by this point because of
// addOffsetsToTxn.
id, epoch, _ := g.cl.producerID()
req := kmsg.NewPtrTxnOffsetCommitRequest()
req.TransactionalID = *g.cl.cfg.txnID
req.Group = g.cfg.group
req.ProducerID = id
req.ProducerEpoch = epoch
memberID, generation := g.memberGen.load()
req.Generation = generation
req.MemberID = memberID
req.InstanceID = g.cfg.instanceID

if ctx.Done() != nil {
go func() {
select {
Expand All @@ -1207,28 +1190,7 @@ func (g *groupConsumer) commitTxn(
<-priorDone // wait for any prior request to finish
}
}
g.cl.cfg.logger.Log(LogLevelDebug, "issuing txn offset commit", "uncommitted", uncommitted)

for topic, partitions := range uncommitted {
reqTopic := kmsg.NewTxnOffsetCommitRequestTopic()
reqTopic.Topic = topic
for partition, eo := range partitions {
reqPartition := kmsg.NewTxnOffsetCommitRequestTopicPartition()
reqPartition.Partition = partition
reqPartition.Offset = eo.Offset
reqPartition.LeaderEpoch = eo.Epoch
reqPartition.Metadata = &req.MemberID
reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
}
req.Topics = append(req.Topics, reqTopic)
}

if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if err := fn(req); err != nil {
onDone(req, nil, err)
return
}
}
g.cl.cfg.logger.Log(LogLevelDebug, "issuing txn offset commit", "uncommitted", req)

var resp *kmsg.TxnOffsetCommitResponse
var err error
Expand All @@ -1242,3 +1204,44 @@ func (g *groupConsumer) commitTxn(
onDone(req, resp, nil)
}()
}

func (g *groupConsumer) prepareTxnOffsetCommit(ctx context.Context, uncommitted map[string]map[int32]EpochOffset) (*kmsg.TxnOffsetCommitRequest, error) {
req := kmsg.NewPtrTxnOffsetCommitRequest()

// We're now generating the producerID before addOffsetsToTxn.
// We will not make this request until after addOffsetsToTxn, but it's possible to fail here due to a failed producerID.
id, epoch, err := g.cl.producerID()
if err != nil {
return req, err
}

req.TransactionalID = *g.cl.cfg.txnID
req.Group = g.cfg.group
req.ProducerID = id
req.ProducerEpoch = epoch
memberID, generation := g.memberGen.load()
req.Generation = generation
req.MemberID = memberID
req.InstanceID = g.cfg.instanceID

for topic, partitions := range uncommitted {
reqTopic := kmsg.NewTxnOffsetCommitRequestTopic()
reqTopic.Topic = topic
for partition, eo := range partitions {
reqPartition := kmsg.NewTxnOffsetCommitRequestTopicPartition()
reqPartition.Partition = partition
reqPartition.Offset = eo.Offset
reqPartition.LeaderEpoch = eo.Epoch
reqPartition.Metadata = &req.MemberID
reqTopic.Partitions = append(reqTopic.Partitions, reqPartition)
}
req.Topics = append(req.Topics, reqTopic)
}

if fn, ok := ctx.Value(txnCommitContextFn).(func(*kmsg.TxnOffsetCommitRequest) error); ok {
if err := fn(req); err != nil {
return req, err
}
}
return req, nil
}

0 comments on commit 54a7418

Please sign in to comment.