Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Sep 5, 2024
1 parent 49d85f6 commit 099e6d6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | Decorates the internal poll function. when provided the decorator `poll!` fn will be called with the following params:<br/>consumer-context: {:ketu.source/consumer consumer}<br/>pool-fn: fn [] -> Iterable<ConsumerRecord> <br/>Returns an iterable collection of consumerRecord.<br/>The decorator should call the poll-fn on behalf of the consumer source.<br/> |
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) |

#### Producer-sink options

Expand Down
6 changes: 3 additions & 3 deletions src/ketu/decorators/consumer/decorator.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
(ns ketu.decorators.consumer.decorator
(:require [ketu.decorators.consumer.protocol :as cdp]))

(defn- validate [consumer-decorator consumer-opts]
(when (not (cdp/validate consumer-decorator consumer-opts))
(defn- valid? [consumer-decorator consumer-opts]
(when (not (cdp/valid? consumer-decorator consumer-opts))
(throw (Exception. "Consumer decorator validation failed"))))

(defn decorate-poll-fn
[consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}]
(validate consumer-decorator consumer-opts)
(valid? consumer-decorator consumer-opts)
#(cdp/poll! consumer-decorator consumer-ctx poll-fn))
16 changes: 14 additions & 2 deletions src/ketu/decorators/consumer/protocol.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
(ns ketu.decorators.consumer.protocol)

(defprotocol ConsumerDecorator
(poll! [this consumer-ctx poll-fn] "fn [consumer-context poll-fn] -> Iterable<ConsumerRecord>")
(validate [this consumer-opts] "Returns true if consumer-opts are valid according to the decorator logic"))
"Consumer decorator provides a way to extend the consumer source functionality.
The decorator runs in the context of the polling thread and allows custom control on the internal consumer instance"
(poll! [this consumer-ctx poll-fn]
"Decorates the internal consumer poll loop.
- Parameters:
- `consumer-ctx`: A map containing the consumer context, typically {:ketu.source/consumer consumer}.
- `poll-fn`: A function with no arguments that returns an Iterable of ConsumerRecord.
- Returns: An iterable collection of ConsumerRecord.
- The decorator should call the `poll-fn` on behalf of the consumer source.")
(valid? [this consumer-opts]
"Validates the consumer options according to the decorator logic.
- Parameters:
- `consumer-opts`: A map of consumer options to be validated.
- Returns: true if the consumer options are valid according to the decorator logic, false otherwise."))

0 comments on commit 099e6d6

Please sign in to comment.