diff --git a/adapter/sarama/consumer_group_handler.go b/adapter/sarama/consumer_group_handler.go new file mode 100644 index 0000000..7d58dff --- /dev/null +++ b/adapter/sarama/consumer_group_handler.go @@ -0,0 +1,102 @@ +package sarama + +import ( + "fmt" + + "github.com/IBM/sarama" + "github.com/alebabai/go-kafka" + "github.com/alebabai/go-kafka/adapter" +) + +// ConsumerGroupHandler is an implementation of [sarama.ConsumerGroupHandler] using [kafka.Handler] for message processing. +type ConsumerGroupHandler struct { + handler kafka.Handler + + converter adapter.ToKafkaMessageConverterFunc[sarama.ConsumerMessage] + + setupHook ConsumerGroupHandlerHookFunc + cleanupHook ConsumerGroupHandlerHookFunc +} + +// ConsumerGroupHandlerHookFunc is a function for custom hooks across [sarama.ConsumerGroupSession] lifecycle events. +type ConsumerGroupHandlerHookFunc func(sarama.ConsumerGroupSession) error + +// ConsumerGroupHandlerEmptyHook is a no-op hook for [sarama.ConsumerGroupSession] lifecycle events. +func ConsumerGroupHandlerEmptyHook(_ sarama.ConsumerGroupSession) error { + return nil +} + +// NewConsumerGroupHandler returns a pointer to the new instance of [ConsumerGroupHandler] or an error. +func NewConsumerGroupHandler(h kafka.Handler, opts ...ConsumerGroupHandlerOption) (*ConsumerGroupHandler, error) { + cgh := &ConsumerGroupHandler{ + handler: h, + converter: ConvertConsumerMessageToKafkaMessage, + setupHook: ConsumerGroupHandlerEmptyHook, + cleanupHook: ConsumerGroupHandlerEmptyHook, + } + + for _, opt := range opts { + opt(cgh) + } + + if err := cgh.Validate(); err != nil { + return nil, err + } + + return cgh, nil +} + +// ConsumerGroupHandlerOption is a function type for setting optional parameters for the [ConsumerGroupHandler]. +type ConsumerGroupHandlerOption func(*ConsumerGroupHandler) + +// ConsumerGroupHandlerWithConverter is an option to set a customer message converter function. +func ConsumerGroupHandlerWithConverter(convFunc adapter.ToKafkaMessageConverterFunc[sarama.ConsumerMessage]) ConsumerGroupHandlerOption { + return func(cgh *ConsumerGroupHandler) { + cgh.converter = convFunc + } +} + +// ConsumerGroupHandlerWithSetupHook is an option to set a custom hook for the setup phase in [sarama.ConsumerGroupSession] lifecycle events. +func ConsumerGroupHandlerWithSetupHook(hookFunc ConsumerGroupHandlerHookFunc) ConsumerGroupHandlerOption { + return func(cgh *ConsumerGroupHandler) { + cgh.setupHook = hookFunc + } +} + +// ConsumerGroupHandlerWithCleanupHook is an option to set a custom hook for the cleanup phase in [sarama.ConsumerGroupSession] lifecycle events. +func ConsumerGroupHandlerWithCleanupHook(hookFunc ConsumerGroupHandlerHookFunc) ConsumerGroupHandlerOption { + return func(cgh *ConsumerGroupHandler) { + cgh.cleanupHook = hookFunc + } +} + +// Setup implements [sarama.ConsumerGroupHandler]. +func (cgh *ConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { + return cgh.setupHook(session) +} + +// Cleanup implements [sarama.ConsumerGroupHandler]. +func (cgh *ConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error { + return cgh.cleanupHook(session) +} + +// ConsumeClaim implements [sarama.ConsumerGroupHandler]. +func (cgh *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + ctx := session.Context() + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-claim.Messages(): + if !ok { + return nil + } + + if err := cgh.handler.Handle(ctx, cgh.converter(*msg)); err != nil { + return fmt.Errorf("failed to handle message: %w", err) + } + + session.MarkMessage(msg, "") + } + } +} diff --git a/adapter/sarama/consumer_group_handler_validation.go b/adapter/sarama/consumer_group_handler_validation.go new file mode 100644 index 0000000..bfaf9c6 --- /dev/null +++ b/adapter/sarama/consumer_group_handler_validation.go @@ -0,0 +1,27 @@ +package sarama + +import ( + "errors" +) + +// Validate validates [ConsumerGroupHandler] and returns an error if validation is failed. +func (cgh ConsumerGroupHandler) Validate() error { + errs := make([]error, 0) + if cgh.handler == nil { + errs = append(errs, errors.New("handler is required")) + } + + if cgh.converter == nil { + errs = append(errs, errors.New("converter is required")) + } + + if cgh.setupHook == nil { + errs = append(errs, errors.New("setupHook is required")) + } + + if cgh.cleanupHook == nil { + errs = append(errs, errors.New("cleanupHook is required")) + } + + return errors.Join(errs...) +} diff --git a/adapter/sarama/consumer_group_listener.go b/adapter/sarama/consumer_group_listener.go new file mode 100644 index 0000000..4229362 --- /dev/null +++ b/adapter/sarama/consumer_group_listener.go @@ -0,0 +1,67 @@ +package sarama + +import ( + "context" + "fmt" + + "github.com/IBM/sarama" +) + +// ConsumerGroupListener is responsible for message consumption of a [sarama.ConsumerGroup] via an infinite loop. +type ConsumerGroupListener struct { + consumerGroup sarama.ConsumerGroup + consumerGroupHandler sarama.ConsumerGroupHandler +} + +// NewConsumerGroupHandler returns a pointer to the new instance of [ConsumerGroupListener] or an error. +func NewConsumerGroupListener( + cg sarama.ConsumerGroup, + cgh sarama.ConsumerGroupHandler, + opts ...ConsumerGroupListenerOption, +) (*ConsumerGroupListener, error) { + cgl := &ConsumerGroupListener{ + consumerGroup: cg, + consumerGroupHandler: cgh, + } + + for _, opt := range opts { + opt(cgl) + } + + if err := cgl.Validate(); err != nil { + return nil, err + } + + return cgl, nil +} + +// ConsumerGroupListenerOption is a function type for setting optional parameters for the [ConsumerGroupListener]. +type ConsumerGroupListenerOption func(*ConsumerGroupListener) + +// Listen starts the message consumption process on the specified topics, using the provided [sarama.ConsumerGroupHandler] for processing messages. +func (cgl *ConsumerGroupListener) Listen(ctx context.Context, topics ...string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if err := cgl.consumerGroup.Consume(ctx, topics, cgl.consumerGroupHandler); err != nil { + return fmt.Errorf("failed to consume messages: %w", err) + } + } + } +} + +// Errors returns a channel for receiving errors from the [sarama.ConsumerGroup]. +func (cgl *ConsumerGroupListener) Errors() <-chan error { + return cgl.consumerGroup.Errors() +} + +// Close shuts down the [sarama.ConsumerGroup] and releases any associated resources. +func (cgl *ConsumerGroupListener) Close() error { + if err := cgl.consumerGroup.Close(); err != nil { + return fmt.Errorf("failed to close consumer group: %w", err) + } + + return nil +} diff --git a/adapter/sarama/consumer_group_listener_validation.go b/adapter/sarama/consumer_group_listener_validation.go new file mode 100644 index 0000000..eef56d5 --- /dev/null +++ b/adapter/sarama/consumer_group_listener_validation.go @@ -0,0 +1,19 @@ +package sarama + +import ( + "errors" +) + +// Validate validates [ConsumerGroupListener] and returns an error if validation is failed. +func (cgh ConsumerGroupListener) Validate() error { + errs := make([]error, 0) + if cgh.consumerGroup == nil { + errs = append(errs, errors.New("consumerGroup is required")) + } + + if cgh.consumerGroupHandler == nil { + errs = append(errs, errors.New("consumerGroupHandler is required")) + } + + return errors.Join(errs...) +} diff --git a/adapter/sarama/producer.go b/adapter/sarama/producer.go new file mode 100644 index 0000000..a00b026 --- /dev/null +++ b/adapter/sarama/producer.go @@ -0,0 +1,18 @@ +package sarama + +import ( + "github.com/IBM/sarama" + "github.com/alebabai/go-kafka" +) + +// SyncProducer is an adapter interface. +type SyncProducer interface { + sarama.SyncProducer + kafka.Handler +} + +// AsyncProducer is an adapter interface. +type AsyncProducer interface { + sarama.AsyncProducer + kafka.Handler +}