diff --git a/README.md b/README.md index ff651d7..4313097 100644 --- a/README.md +++ b/README.md @@ -78,11 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | Decorates the internal poll function. when provided the decorator `poll!` fn will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | #### Producer-sink options diff --git a/src/ketu/decorators/consumer/decorator.clj b/src/ketu/decorators/consumer/decorator.clj index 6d0a4a3..25e8cff 100644 --- a/src/ketu/decorators/consumer/decorator.clj +++ b/src/ketu/decorators/consumer/decorator.clj @@ -1,11 +1,11 @@ (ns ketu.decorators.consumer.decorator (:require [ketu.decorators.consumer.protocol :as cdp])) -(defn- validate [consumer-decorator consumer-opts] - (when (not (cdp/validate consumer-decorator consumer-opts)) +(defn- valid? [consumer-decorator consumer-opts] + (when (not (cdp/valid? consumer-decorator consumer-opts)) (throw (Exception. "Consumer decorator validation failed")))) (defn decorate-poll-fn [consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}] - (validate consumer-decorator consumer-opts) + (valid? consumer-decorator consumer-opts) #(cdp/poll! consumer-decorator consumer-ctx poll-fn)) diff --git a/src/ketu/decorators/consumer/protocol.clj b/src/ketu/decorators/consumer/protocol.clj index 472a2bb..95aad2e 100644 --- a/src/ketu/decorators/consumer/protocol.clj +++ b/src/ketu/decorators/consumer/protocol.clj @@ -1,5 +1,17 @@ (ns ketu.decorators.consumer.protocol) (defprotocol ConsumerDecorator - (poll! [this consumer-ctx poll-fn] "fn [consumer-context poll-fn] -> Iterable") - (validate [this consumer-opts] "Returns true if consumer-opts are valid according to the decorator logic")) + "Consumer decorator provides a way to extend the consumer source functionality. + The decorator runs in the context of the polling thread and allows custom control on the internal consumer instance" + (poll! [this consumer-ctx poll-fn] + "Decorates the internal consumer poll loop. + - Parameters: + - `consumer-ctx`: A map containing the consumer context, typically {:ketu.source/consumer consumer}. + - `poll-fn`: A function with no arguments that returns an Iterable of ConsumerRecord. + - Returns: An iterable collection of ConsumerRecord. + - The decorator should call the `poll-fn` on behalf of the consumer source.") + (valid? [this consumer-opts] + "Validates the consumer options according to the decorator logic. + - Parameters: + - `consumer-opts`: A map of consumer options to be validated. + - Returns: true if the consumer options are valid according to the decorator logic, false otherwise."))