-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_listener.go
123 lines (101 loc) · 3.36 KB
/
consumer_listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package confluent
import (
"context"
"fmt"
"io"
"time"
"github.com/alebabai/go-kafka"
"github.com/alebabai/go-kafka/adapter"
ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
type consumer interface {
ReadMessage(timeout time.Duration) (*ckafka.Message, error)
CommitMessage(m *ckafka.Message) ([]ckafka.TopicPartition, error)
io.Closer
}
// ConsumerListener is responsible for message consumption from [ckafka.Consumer] via an infinite loop.
type ConsumerListener struct {
consumer consumer
handler kafka.Handler
converter adapter.ToKafkaMessageConverterFunc[ckafka.Message]
transportErrorHandler kafka.ErrorHandler
manualCommit bool
}
// NewConsumerListener returns a pointer to the new instance of [ConsumerListener] or an error.
func NewConsumerListener(
c consumer,
h kafka.Handler,
opts ...ConsumerListenerOption,
) (*ConsumerListener, error) {
cl := &ConsumerListener{
consumer: c,
handler: h,
converter: ConvertMessageToKafkaMessage,
transportErrorHandler: kafka.ErrorHandlerFunc(DefaultTransportErrorHandler),
}
for _, opt := range opts {
opt(cl)
}
if err := cl.Validate(); err != nil {
return nil, err
}
return cl, nil
}
// ConsumerListenerOption is a function type for setting optional parameters for the [ConsumerListener].
type ConsumerListenerOption func(*ConsumerListener)
// ConsumerListenerWithConverter is an option to set a custom message converter function.
func ConsumerListenerWithConverter(convFunc adapter.ToKafkaMessageConverterFunc[ckafka.Message]) ConsumerListenerOption {
return func(cl *ConsumerListener) {
cl.converter = convFunc
}
}
// ConsumerListenerWithTransportErrorHandler is an option to set a custom transport error handler.
func ConsumerListenerWithTransportErrorHandler(eh kafka.ErrorHandler) ConsumerListenerOption {
return func(cl *ConsumerListener) {
cl.transportErrorHandler = eh
}
}
// ConsumerListenerWithManualCommit is an option to set manual commit flag.
func ConsumerListenerWithManualCommit(manualCommit bool) ConsumerListenerOption {
return func(cl *ConsumerListener) {
cl.manualCommit = manualCommit
}
}
// Listen starts the message consumption from consumer with specified read timeout, using the provided [kafka.Handler] for processing messages.
func (cl *ConsumerListener) Listen(ctx context.Context, timeout time.Duration) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
msg, err := cl.consumer.ReadMessage(timeout)
if err != nil {
if err := cl.transportErrorHandler.Handle(ctx, err); err != nil {
return fmt.Errorf("failed to read message: %w", err)
}
}
if err := cl.handler.Handle(ctx, cl.converter(*msg)); err != nil {
return fmt.Errorf("failed to handle message: %w", err)
}
if cl.manualCommit {
if _, err := cl.consumer.CommitMessage(msg); err != nil {
return fmt.Errorf("failed to commit message: %w", err)
}
}
}
}
}
// Close shuts down the [ckafka.Consumer] and releases any associated resources.
func (cl *ConsumerListener) Close() error {
if err := cl.consumer.Close(); err != nil {
return fmt.Errorf("failed to close consumer: %w", err)
}
return nil
}
// DefaultTransportErrorHandler simply checks for an error and propagates it.
func DefaultTransportErrorHandler(_ context.Context, err error) error {
if err != nil {
return err
}
return nil
}