diff --git a/README.md b/README.md index 62f32c8..11b0f81 100644 --- a/README.md +++ b/README.md @@ -28,23 +28,23 @@ Consume a name string from kafka and produce a greeting string for that name bac (:require [clojure.core.async :refer [chan close! !!]] [ketu.async.source :as source] [ketu.async.sink :as sink])) - -(let [greets (chan 10) - sink-opts {:name "greeter-producer" - :brokers "broker2:9091" - :topic "greetings" - :value-type :string - :shape :value} - sink (sink/sink >greets sink-opts)] + >greets (chan 10) + sink-opts {:name "greeter-producer" + :brokers "broker2:9091" + :topic "greetings" + :value-type :string + :shape :value} + sink (sink/sink >greets sink-opts)] ;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline. (->> (]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | fucntion | optional | ..... | +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `fn [consumer-context pool-fn] -> [ConsumerRecord]` | optional | Decorates the internal pool function. when provided the decorator will be called with the following params:
consumer-context: {:consumer-source/consumer consumer}
pool-fn: fn [] -> ConsumerRecords
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| #### Producer-sink options -| Key | Type | Req? | Notes | -|-------------------|------------------------------------------------------------------------------------------------------------------|------------|------------------------------------------------------------------------------------------------| -| :shape | `:value`, `[:vector ]`,`[:map ]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) | -| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer | -| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer | + +| Key | Type | Req? | Notes | +|-------------------|------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------| +| :shape | `:value`, `[:vector ]`,`[:map ]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) | +| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer | +| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer | ## Data shapes You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape: + ```clojure ; Value only: -{:topic "names" - :key-type :string +{:topic "names" + :key-type :string :value-type :string - :shape :value} + :shape :value} ( "v" @@ -113,12 +117,14 @@ To get a clojure data structure with any of the ConsumerRecord fields, configure ( {:key "k", :value "v", :topic "names"} ``` + Similarly, to put a clojure data structure on the producer channel: + ```clojure ; Value only: -{:key-type :string +{:key-type :string :value-type :string - :shape :value} + :shape :value} (>!! producer-chan "v") ; Vector: @@ -131,77 +137,84 @@ Similarly, to put a clojure data structure on the producer channel: (>!! producer-chan ["k2" "v2" "events"]) ``` -## Example of using the custom commands channel +## Consumer Decorator +The consumer decorator allows running custom logic on the consumer polling thread. +This allows custom control on the consumer behavior including manual offset management. +Custom decorator logic may require different consumer configurations. for example when managing the offset manually, auto-commit should usually set to false. In this example we demonstare how to enable pause/resume of the consumer: - ```clojure -(ns custom-commands-channel-example - (:require [clojure.core.async :as async] +(ns consumer-decorator-example + (:require [clojure.core.async :as async] [ketu.async.source :as source] [ketu.async.sink :as sink])) (let [commands-chan (async/chan 10) consumer-chan (async/chan 10) - consumer-opts {:name "consumer-example" - :brokers "broker1:9092" - :topic "example" - :group-id "example" - :value-type :string - :shape :value - :ketu.source/consumer-commands-chan commands-chan} - source (source/source consumer-chan consumer-opts) + consumer-opts {:name "consumer-example" + :brokers "broker1:9092" + :topic "example" + :group-id "example" + :value-type :string + :shape :value + :ketu.source/consumer-decorator (fn [consumer-ctx poll-fn] + (loop [] + (when-let [command (async/poll! commands-chan)] + (command consumer-ctx) + (recur))) + (poll-fn))} + source (source/source consumer-chan consumer-opts) producer-chan (async/chan 10) - sink-opts {:name "producer-example" - :brokers "broker1:9092" - :topic "example" - :value-type :string - :shape :value} - sink (sink/sink producer-chan sink-opts) + sink-opts {:name "producer-example" + :brokers "broker1:9092" + :topic "example" + :value-type :string + :shape :value} + sink (sink/sink producer-chan sink-opts) ; periodically produce data to the topic - producing (future - (dotimes [i 20] - (async/>!! producer-chan (str i)) - (Thread/sleep 300)) - (async/>!! producer-chan "done") - (async/close! producer-chan)) + producing (future + (dotimes [i 20] + (async/>!! producer-chan (str i)) + (Thread/sleep 300)) + (async/>!! producer-chan "done") + (async/close! producer-chan)) ; read from the consumer channel and print to the screen - processing (future - (loop [] - (let [message (async/!! commands-chan (fn [{consumer :ketu.source/consumer}] - (.pause consumer (.assignment consumer)) - (deliver paused true))) - - @paused - (println "consumer is paused") - (Thread/sleep 2000) - - ; Send the commands channel a function that will resume the consumer - (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] - (.resume consumer (.paused consumer)) - (deliver resumed true))) - - @resumed - (println "consumer is resumed") - - ; Wait for all futures to finish - @producing - @processing) - (finally - (source/stop! source)))) + processing (future + (loop [] + (let [message (async/!! commands-chan (fn [{consumer :ketu.source/consumer}] + (.pause consumer (.assignment consumer)) + (deliver paused true))) + + @paused + (println "consumer is paused") + (Thread/sleep 2000) + + ; Send the commands channel a function that will resume the consumer + (async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] + (.resume consumer (.paused consumer)) + (deliver resumed true))) + + @resumed + (println "consumer is resumed") + + ; Wait for all futures to finish + @producing + @processing) + (finally + (source/stop! source)))) ``` ## Development & Contribution