-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(adapter.confluent): ConsumerListener
- Loading branch information
Showing
2 changed files
with
150 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package confluent | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"time" | ||
|
||
"github.com/alebabai/go-kafka" | ||
"github.com/alebabai/go-kafka/adapter" | ||
ckafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" | ||
) | ||
|
||
type consumer interface { | ||
ReadMessage(timeout time.Duration) (*ckafka.Message, error) | ||
CommitMessage(m *ckafka.Message) ([]ckafka.TopicPartition, error) | ||
io.Closer | ||
} | ||
|
||
// ConsumerListener is responsible for message consumption from [ckafka.Consumer] via an infinite loop. | ||
type ConsumerListener struct { | ||
consumer consumer | ||
handler kafka.Handler | ||
|
||
converter adapter.ToKafkaMessageConverterFunc[ckafka.Message] | ||
|
||
transportErrorHandler kafka.ErrorHandler | ||
manualCommit bool | ||
} | ||
|
||
// NewConsumerListener returns a pointer to the new instance of [ConsumerListener] or an error. | ||
func NewConsumerListener( | ||
c consumer, | ||
h kafka.Handler, | ||
opts ...ConsumerListenerOption, | ||
) (*ConsumerListener, error) { | ||
cl := &ConsumerListener{ | ||
consumer: c, | ||
handler: h, | ||
converter: ConvertMessageToKafkaMessage, | ||
transportErrorHandler: kafka.ErrorHandlerFunc(DefaultTransportErrorHandler), | ||
} | ||
|
||
for _, opt := range opts { | ||
opt(cl) | ||
} | ||
|
||
if err := cl.Validate(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return cl, nil | ||
} | ||
|
||
// ConsumerListenerOption is a function type for setting optional parameters for the [ConsumerListener]. | ||
type ConsumerListenerOption func(*ConsumerListener) | ||
|
||
// ConsumerListenerWithConverter is an option to set a custom message converter function. | ||
func ConsumerListenerWithConverter(convFunc adapter.ToKafkaMessageConverterFunc[ckafka.Message]) ConsumerListenerOption { | ||
return func(cl *ConsumerListener) { | ||
cl.converter = convFunc | ||
} | ||
} | ||
|
||
// ConsumerListenerWithTransportErrorHandler is an option to set a custom transport error handler. | ||
func ConsumerListenerWithTransportErrorHandler(eh kafka.ErrorHandler) ConsumerListenerOption { | ||
return func(cl *ConsumerListener) { | ||
cl.transportErrorHandler = eh | ||
} | ||
} | ||
|
||
// ConsumerListenerWithManualCommit is an option to set manual commit flag. | ||
func ConsumerListenerWithManualCommit(manualCommit bool) ConsumerListenerOption { | ||
return func(cl *ConsumerListener) { | ||
cl.manualCommit = manualCommit | ||
} | ||
} | ||
|
||
// Listen starts the message consumption from consumer with specified read timeout, using the provided [kafka.Handler] for processing messages. | ||
func (cl *ConsumerListener) Listen(ctx context.Context, timeout time.Duration) error { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
msg, err := cl.consumer.ReadMessage(timeout) | ||
if err != nil { | ||
if err := cl.transportErrorHandler.Handle(ctx, err); err != nil { | ||
return fmt.Errorf("failed to read message: %w", err) | ||
} | ||
} | ||
|
||
if err := cl.handler.Handle(ctx, cl.converter(*msg)); err != nil { | ||
return fmt.Errorf("failed to handle message: %w", err) | ||
} | ||
|
||
if cl.manualCommit { | ||
if _, err := cl.consumer.CommitMessage(msg); err != nil { | ||
return fmt.Errorf("failed to commit message: %w", err) | ||
} | ||
|
||
} | ||
} | ||
} | ||
} | ||
|
||
// Close shuts down the [ckafka.Consumer] and releases any associated resources. | ||
func (cl *ConsumerListener) Close() error { | ||
if err := cl.consumer.Close(); err != nil { | ||
return fmt.Errorf("failed to close consumer: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// DefaultTransportErrorHandler simply checks for an error and propagates it. | ||
func DefaultTransportErrorHandler(_ context.Context, err error) error { | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package confluent | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
// Validate validates [ConsumerListener] and returns an error if validation is failed. | ||
func (cl ConsumerListener) Validate() error { | ||
errs := make([]error, 0) | ||
if cl.consumer == nil { | ||
errs = append(errs, errors.New("consumer is required")) | ||
} | ||
|
||
if cl.handler == nil { | ||
errs = append(errs, errors.New("handler is required")) | ||
} | ||
|
||
if cl.converter == nil { | ||
errs = append(errs, errors.New("converter is required")) | ||
} | ||
|
||
if cl.transportErrorHandler == nil { | ||
errs = append(errs, errors.New("transportErrorHandler is required")) | ||
} | ||
|
||
return errors.Join(errs...) | ||
} |