Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve rebalance example by simulating a user transaction an random failures #1037

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 186 additions & 23 deletions examples/consumer_rebalance_example/consumer_rebalance_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@

// Example high-level Apache Kafka consumer demonstrating use of rebalance
// callback along with manual commit.
// It processes a batch of messages in parallel and pairs Kafka commits with
// a simulated user transaction. Random failures are present
// to show how it must behave in each case.
package main

import (
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

var processing []chan error

func main() {
if len(os.Args) < 4 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <group> <topics..>\n",
Expand All @@ -37,9 +45,10 @@ func main() {
bootstrapServers := os.Args[1]
group := os.Args[2]
topics := os.Args[3:]
exitCode := 0

sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
Expand All @@ -50,25 +59,38 @@ func main() {
// for this group.
"auto.offset.reset": "earliest",
// Whether or not we commit offsets automatically.
"enable.auto.commit": false,
"enable.auto.commit": false,
"enable.partition.eof": true,
})
defer func() {
fmt.Printf("%% Closing consumer\n")
c.Close()
os.Exit(exitCode)
}()

if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
exitCode = 1
return
}

fmt.Printf("%% Created Consumer %v\n", c)

// Subscribe to topics, call the rebalanceCallback on assignment/revoke.
// The rebalanceCallback can be triggered from c.Poll() and c.Close().
err = c.SubscribeTopics(topics, rebalanceCallback)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed subscribing to topics: %s\n", err)
exitCode = 1
return
}

run := true
for run == true {
for run {
select {
case sig := <-sigchan:
fmt.Printf("%% Caught signal %v: terminating\n", sig)
commit(c)
run = false
default:
ev := c.Poll(100)
Expand All @@ -79,12 +101,8 @@ func main() {
if err = processEvent(c, ev); err != nil {
fmt.Fprintf(os.Stderr, "Failed to process event: %s\n", err)
}

}
}

fmt.Printf("%% Closing consumer\n")
c.Close()
}

// processEvent processes the message/error received from the kafka Consumer's
Expand All @@ -94,12 +112,20 @@ func processEvent(c *kafka.Consumer, ev kafka.Event) error {

case *kafka.Message:
fmt.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value))
processMessage(e)

// Handle manual commit since enable.auto.commit is unset.
if err := maybeCommit(c, e.TopicPartition); err != nil {
return err
}

case kafka.PartitionEOF:
fmt.Printf("%% EOF for topic %s partition %d, committing\n", *e.Topic,
e.Partition)
if err := commit(c); err != nil {
return err
}

case kafka.Error:
// Errors should generally be considered informational, the client
// will try to automatically recover.
Expand All @@ -113,6 +139,42 @@ func processEvent(c *kafka.Consumer, ev kafka.Event) error {
return nil
}

// processMessage starts parallel processing of a message and
// appends the returned channel to the processing slice.
func processMessage(message *kafka.Message) error {
processing = append(processing, parallelProcessMessage(message))
return nil
}

// parallelProcessMessage starts parallel processing of a message and
// returns a channel were the corresponding error will be produced.
func parallelProcessMessage(message *kafka.Message) chan error {
channel := make(chan error)
go func() {
time.Sleep(100 * time.Millisecond)
channel <- randomFailure()
close(channel)
}()
return channel
}

// completeProcessing awaits all processing tasks and
// returns an error if at least one of them failed.
func completeProcessing() error {
fmt.Printf("%% Complete pending tasks\n")
var err error = nil
// Awaits a result from all the processing tasks
for _, channel := range processing {
currentErr := <-channel
if err == nil {
err = currentErr
}
}
// Clear the processing slice
processing = processing[:0]
return err
}

// maybeCommit is called for each message we receive from a Kafka topic.
// This method can be used to apply some arbitary logic/processing to the
// offsets, write the offsets into some external storage, and finally, to
Expand All @@ -126,7 +188,32 @@ func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
return nil
}

commitedOffsets, err := c.Commit()
fmt.Printf("%% maybeCommit: do commit\n")
return commit(c)
}

// commit completes current parallel processing,
// commits user transaction and offsets on Kafka.
// If something fails, calls abort to abort the transaction.
func commit(c *kafka.Consumer) error {
fmt.Printf("%% Committing transaction\n")

var err error
err = completeProcessing()
if err != nil {
fmt.Fprintf(os.Stderr, "Processing tasks failed, aborting\n")
abort(c)
return err
}

err = userTransactionCommit()
if err != nil {
fmt.Fprintf(os.Stderr, "User commit failed, aborting\n")
abort(c)
return err
}

committedOffsets, err := c.Commit()

// ErrNoOffset occurs when there are no stored offsets to commit. This
// can happen if we haven't stored anything since the last commit.
Expand All @@ -135,10 +222,89 @@ func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
// handling is illustrative of how to handle it in cases we call Commit()
// in another way, for example, every N seconds.
if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
fmt.Fprintf(os.Stderr, "Fatal: Committed offsets to user transaction but not to Kafka: %s\n", err.Error())
return err
}

fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets)
fmt.Printf("%% Committed offsets to Kafka: %v\n", committedOffsets)
return nil
}

// userTransactionCommit models committing a user transaction, that can be a
// DBMS transaction of something else.
// Returns a random failure
func userTransactionCommit() error {
fmt.Printf("%% Committing user transaction\n")
return randomFailure()
}

// userTransactionAbort models aborting a user transaction, that can be a
// DBMS transaction of something else.
// Returns a random failure
func userTransactionAbort() error {
fmt.Printf("%% Aborting user transaction\n")
return randomFailure()
}

// Returns an error with probability 0.05 or no error with probability 0.95
func randomFailure() error {
if rand.Float64() < 0.05 {
return fmt.Errorf("random failure")
}
return nil
}

// abort completes current parallel processing,
// aborts user transaction and rewinds consumer to committed offsets.
func abort(c *kafka.Consumer) {
fmt.Printf("%% Aborting transaction\n")
completeProcessing()
// Ignore error, transaction is aborting anyway

var err error
// Continue retrying, if it cannot abort a transaction or seek assigned
// partitions, probably it cannot communicate with one of the two components,
// so it's not worth trying anything else.
for {
err = userTransactionAbort()
if err != nil {
fmt.Fprintf(os.Stderr, "userTransactionAbort failed, retry: %s\n", err.Error())
continue
}
err = rewindConsumerPosition(c)
if err == nil {
return
}
fmt.Fprintf(os.Stderr, "rewindConsumerPosition failed, retry: %s\n", err.Error())
// Pause between retries
time.Sleep(3 * time.Second)
}
}

// rewindConsumerPosition Rewinds consumer's position to the
// previous committed offset
func rewindConsumerPosition(c *kafka.Consumer) error {
fmt.Printf("%% Rewind to committed offsets\n")
assignment, err := c.Assignment()
if err != nil {
return err
}

committed, err := c.Committed(assignment, 30*1000 /* 30s */)
if err != nil {
return err
}

for _, tp := range committed {
if tp.Offset < 0 {
tp.Offset = kafka.OffsetBeginning
tp.LeaderEpoch = nil
}
err := c.Seek(tp, 1)
if err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -175,6 +341,11 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
fmt.Printf("%% %s rebalance: %d partition(s) revoked: %v\n",
c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions)

if c.IsClosed() {
// This is last revoke before closing, cannot commit here
// must be done before closing.
return nil
}
// Usually, the rebalance callback for `RevokedPartitions` is called
// just before the partitions are revoked. We can be certain that a
// partition being revoked is not yet owned by any other consumer.
Expand All @@ -183,28 +354,20 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
// However, there can be cases where the assignment is lost
// involuntarily. In this case, the partition might already be owned
// by another consumer, and operations including committing
// offsets may not work.
// offsets may not work. We abort user transaction too in this case.
if c.AssignmentLost() {
// Our consumer has been kicked out of the group and the
// entire assignment is thus lost.
fmt.Fprintln(os.Stderr, "Assignment lost involuntarily, commit may fail")
}

// Since enable.auto.commit is unset, we need to commit offsets manually
// before the partition is revoked.
commitedOffsets, err := c.Commit()

if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
fmt.Fprintf(os.Stderr, "Failed to commit offsets: %s\n", err)
return err
abort(c)
} else {
return commit(c)
}
fmt.Printf("%% Commited offsets to Kafka: %v\n", commitedOffsets)

// Similar to Assign, client automatically calls Unassign() unless the
// callback has already called that method. Here, we don't call it.

default:
fmt.Fprintf(os.Stderr, "Unxpected event type: %v\n", event)
fmt.Fprintf(os.Stderr, "Unexpected event type: %v\n", event)
}

return nil
Expand Down