From cf14bf6a9966baf4b581526a2778e51b086ce574 Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Mon, 2 Sep 2024 22:23:52 +0300 Subject: [PATCH 1/6] rebase --- CHANGELOG.md | 4 + README.md | 30 ++++---- project.clj | 2 +- src/ketu/async/source.clj | 9 +-- src/ketu/decorators/consumer/decorator.clj | 11 +++ src/ketu/decorators/consumer/protocol.clj | 5 ++ src/ketu/spec.clj | 6 +- test/ketu/async/integration_test.clj | 86 +++++++++++++--------- 8 files changed, 97 insertions(+), 56 deletions(-) create mode 100644 src/ketu/decorators/consumer/decorator.clj create mode 100644 src/ketu/decorators/consumer/protocol.clj diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fe0331..261716c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [[2.0.0]] () - 2024-09-03 +### Changed +- consumer decorator API breaking change - use ConsumerDecorator protocol instead of `consumer-decorator` function. + ## [[1.1.0]](https://github.com/AppsFlyer/ketu/pull/18) - 2024-07-29 ### Added diff --git a/README.md b/README.md index 8899605..ff651d7 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A Clojure Apache Kafka client with core.async api ```clojure -[com.appsflyer/ketu "1.1.0"] +[com.appsflyer/ketu "2.0.0"] ``` ## Features @@ -78,12 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options - -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | Decorates the internal poll function. when provided the decorator `poll!` fn will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| #### Producer-sink options @@ -151,6 +150,7 @@ The decorator processes all immediately available commands in the commands-chan, (ns consumer-decorator-example (:require [clojure.core.async :as async] [ketu.async.source :as source] + [ketu.decorators.consumer.protocol :refer [ConsumerDecorator]] [ketu.async.sink :as sink])) (let [commands-chan (async/chan 10) @@ -161,12 +161,16 @@ The decorator processes all immediately available commands in the commands-chan, :group-id "example" :value-type :string :shape :value - :ketu.source/consumer-decorator (fn [consumer-ctx poll-fn] - (loop [] - (when-let [command (async/poll! commands-chan)] - (command consumer-ctx) - (recur))) - (poll-fn))} + :ketu.source/consumer-decorator (reify ConsumerDecorator + (poll! [consumer-ctx poll-fn] + (loop [] + (when-let [command (async/poll! commands-chan)] + (command consumer-ctx) + (recur))) + (poll-fn)) + (validate [this opts] + ;custom validation logic of the consumer options can be added here + true))} source (source/source consumer-chan consumer-opts) producer-chan (async/chan 10) diff --git a/project.clj b/project.clj index 65604aa..439b94f 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/ketu "1.1.0" +(defproject com.appsflyer/ketu "2.0.0-SNAPSHOT" :description "Clojure Apache Kafka client with core.async api" :url "https://github.com/AppsFlyer/ketu" :license {:name "Apache License, Version 2.0" diff --git a/src/ketu/async/source.clj b/src/ketu/async/source.clj index 5918b6b..051db46 100644 --- a/src/ketu/async/source.clj +++ b/src/ketu/async/source.clj @@ -4,6 +4,7 @@ [ketu.clients.consumer :as consumer] [ketu.shape.consumer :as shape] [ketu.spec] + [ketu.decorators.consumer.decorator :as consumer-decorator] [ketu.util.log :as log]) (:import (java.time Duration) (org.apache.kafka.clients.consumer Consumer) @@ -104,16 +105,12 @@ ^long close-consumer? (:ketu.source/close-consumer? opts) consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts) should-poll? (volatile! true) - decorator-fn (some-> (:ketu.source/consumer-decorator opts) - (partial {:ketu.source/consumer consumer})) - abort-pending-put (async/chan) done-putting (async/chan) - subscribe! (or (subscribe-fn opts) (assign-fn opts)) poll-impl (poll-fn consumer should-poll? opts) - poll! (if (some? decorator-fn) - (partial decorator-fn poll-impl) + poll! (if (some? (:ketu.source/consumer-decorator opts)) + (consumer-decorator/decorate-poll-fn {:ketu.source/consumer consumer} poll-impl opts) poll-impl) ->data (->data-fn opts) put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put)) diff --git a/src/ketu/decorators/consumer/decorator.clj b/src/ketu/decorators/consumer/decorator.clj new file mode 100644 index 0000000..6d0a4a3 --- /dev/null +++ b/src/ketu/decorators/consumer/decorator.clj @@ -0,0 +1,11 @@ +(ns ketu.decorators.consumer.decorator + (:require [ketu.decorators.consumer.protocol :as cdp])) + +(defn- validate [consumer-decorator consumer-opts] + (when (not (cdp/validate consumer-decorator consumer-opts)) + (throw (Exception. "Consumer decorator validation failed")))) + +(defn decorate-poll-fn + [consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}] + (validate consumer-decorator consumer-opts) + #(cdp/poll! consumer-decorator consumer-ctx poll-fn)) diff --git a/src/ketu/decorators/consumer/protocol.clj b/src/ketu/decorators/consumer/protocol.clj new file mode 100644 index 0000000..472a2bb --- /dev/null +++ b/src/ketu/decorators/consumer/protocol.clj @@ -0,0 +1,5 @@ +(ns ketu.decorators.consumer.protocol) + +(defprotocol ConsumerDecorator + (poll! [this consumer-ctx poll-fn] "fn [consumer-context poll-fn] -> Iterable") + (validate [this consumer-opts] "Returns true if consumer-opts are valid according to the decorator logic")) diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index abe78a3..0f707a9 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -2,9 +2,9 @@ (:require [clojure.set] [clojure.spec.alpha :as s] [clojure.string] - [expound.alpha :as expound] - [clojure.core.async.impl.protocols]) + [expound.alpha :as expound]) (:import (java.util.regex Pattern) + (ketu.decorators.consumer.protocol ConsumerDecorator) (org.apache.kafka.clients.producer Callback) (org.apache.kafka.common.serialization Deserializer Serializer))) @@ -28,7 +28,7 @@ (s/def :ketu.source/close-out-chan? boolean?) (s/def :ketu.source/close-consumer? boolean?) (s/def :ketu.source/create-rebalance-listener-obj fn?) -(s/def :ketu.source/consumer-decorator fn?) +(s/def :ketu.source/consumer-decorator #(instance? ConsumerDecorator %)) (s/def :ketu.source.assign/topic :ketu/topic) (s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?)) (s/def :ketu.source/assign-single-topic-partitions diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index bec34a5..0c853b8 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -6,6 +6,7 @@ [clojure.core.async :as async] [ketu.clients.consumer :as consumer] [ketu.clients.producer :as producer] + [ketu.decorators.consumer.protocol :as cdp] [ketu.async.source :as source] [ketu.async.sink :as sink] [ketu.test.kafka-setup :as kafka-setup]) @@ -213,37 +214,56 @@ (.close ^AdminClient admin-client))))) (deftest consumer-decorator - (let [consumer-chan (async/chan 10) - result-chan (async/chan 100) - clicks-consumer-opts {:name "clicks-consumer" - :brokers (kafka-setup/get-bootstrap-servers) - :topic "clicks" - :group-id "clicks-test-consumer" - :auto-offset-reset "earliest" - :shape :value - :ketu.source/consumer-decorator (fn [{_consumer :ketu.source/consumer} poll-fn] - (let [records (poll-fn)] - (doseq [^ConsumerRecord record records] - (async/>!! result-chan (String. ^"[B" (.value record)))) - records))} - source (source/source consumer-chan clicks-consumer-opts) - clicks-producer-opts {:name "clicks-producer" - :brokers (kafka-setup/get-bootstrap-servers) - :topic "clicks" - :key-type :string - :internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"} - :shape [:vector :key :value]} - producer-chan (async/chan 10) - sink (sink/sink producer-chan clicks-producer-opts) - input-values #{"1" "2" "3"}] - (try - (doseq [value input-values] - (async/>!! producer-chan ["1" value])) - (is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan))))) - (is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan))))) - (finally - (Thread/sleep 2000) - (source/stop! source) - (async/close! producer-chan) - (sink/stop! sink))))) + (testing "consumer decorator functionality" + (let [consumer-chan (async/chan 10) + result-chan (async/chan 100) + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :shape :value + :ketu.source/consumer-decorator (reify cdp/ConsumerDecorator + (poll! [_ {_consumer :ketu.source/consumer} poll-fn] + (let [records (poll-fn)] + (doseq [^ConsumerRecord record records] + (async/>!! result-chan (String. ^"[B" (.value record)))) + records)) + (validate [_ opts] + true))} + source (source/source consumer-chan clicks-consumer-opts) + clicks-producer-opts {:name "clicks-producer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :key-type :string + :internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"} + :shape [:vector :key :value]} + producer-chan (async/chan 10) + sink (sink/sink producer-chan clicks-producer-opts) + input-values #{"1" "2" "3"}] + (try + (doseq [value input-values] + (async/>!! producer-chan ["1" value])) + (is (= input-values (into #{} (repeatedly 3 #(u/try-take! result-chan))))) + (is (= input-values (into #{} (map #(String. ^"[B" %)) (repeatedly 3 #(u/try-take! consumer-chan))))) + (finally + (Thread/sleep 2000) + (source/stop! source) + (async/close! producer-chan) + (sink/stop! sink)))) + (testing "consumer decorator validation failure" + (let [consumer-chan (async/chan 10) + clicks-consumer-opts {:name "clicks-consumer" + :brokers (kafka-setup/get-bootstrap-servers) + :topic "clicks" + :group-id "clicks-test-consumer" + :auto-offset-reset "earliest" + :shape :value + :ketu.source/consumer-decorator (reify cdp/ConsumerDecorator + (poll! [_ _ _] + nil) + (validate [_ opts] + false))}] + (is (thrown-with-msg? Exception #"Consumer decorator validation failed" + (source/source consumer-chan clicks-consumer-opts))))))) From dd0c9dc1f32668b381ebb6cca46331f3dcda9796 Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Tue, 3 Sep 2024 10:22:29 +0300 Subject: [PATCH 2/6] require --- src/ketu/spec.clj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index 0f707a9..2311364 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -2,6 +2,7 @@ (:require [clojure.set] [clojure.spec.alpha :as s] [clojure.string] + [ketu.decorators.consumer.protocol] [expound.alpha :as expound]) (:import (java.util.regex Pattern) (ketu.decorators.consumer.protocol ConsumerDecorator) From 1c0ed6de2c22161721cf73ed7d91ad23c12f560f Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Tue, 3 Sep 2024 10:26:32 +0300 Subject: [PATCH 3/6] linting --- test/ketu/async/integration_test.clj | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index 0c853b8..df58b4d 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -6,7 +6,7 @@ [clojure.core.async :as async] [ketu.clients.consumer :as consumer] [ketu.clients.producer :as producer] - [ketu.decorators.consumer.protocol :as cdp] + [ketu.decorators.consumer.protocol :refer [ConsumerDecorator]] [ketu.async.source :as source] [ketu.async.sink :as sink] [ketu.test.kafka-setup :as kafka-setup]) @@ -223,13 +223,13 @@ :group-id "clicks-test-consumer" :auto-offset-reset "earliest" :shape :value - :ketu.source/consumer-decorator (reify cdp/ConsumerDecorator + :ketu.source/consumer-decorator (reify ConsumerDecorator (poll! [_ {_consumer :ketu.source/consumer} poll-fn] (let [records (poll-fn)] (doseq [^ConsumerRecord record records] (async/>!! result-chan (String. ^"[B" (.value record)))) records)) - (validate [_ opts] + (validate [_ _] true))} source (source/source consumer-chan clicks-consumer-opts) clicks-producer-opts {:name "clicks-producer" @@ -260,10 +260,10 @@ :group-id "clicks-test-consumer" :auto-offset-reset "earliest" :shape :value - :ketu.source/consumer-decorator (reify cdp/ConsumerDecorator + :ketu.source/consumer-decorator (reify ConsumerDecorator (poll! [_ _ _] nil) - (validate [_ opts] + (validate [_ _] false))}] (is (thrown-with-msg? Exception #"Consumer decorator validation failed" (source/source consumer-chan clicks-consumer-opts))))))) From 49d85f600eed47c05e37c05458acee5e7531a162 Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Tue, 3 Sep 2024 10:28:23 +0300 Subject: [PATCH 4/6] [skip ci] add PR to changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 261716c..9873af7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## [[2.0.0]] () - 2024-09-03 +## [[2.0.0]] (https://github.com/AppsFlyer/ketu/pull/19) - 2024-09-03 ### Changed - consumer decorator API breaking change - use ConsumerDecorator protocol instead of `consumer-decorator` function. From 099e6d6e379983149ec5164992a26e1e44588f00 Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Thu, 5 Sep 2024 14:40:24 +0300 Subject: [PATCH 5/6] CR comments --- README.md | 10 +++++----- src/ketu/decorators/consumer/decorator.clj | 6 +++--- src/ketu/decorators/consumer/protocol.clj | 16 ++++++++++++++-- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index ff651d7..4313097 100644 --- a/README.md +++ b/README.md @@ -78,11 +78,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. | :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | Decorates the internal poll function. when provided the decorator `poll!` fn will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `ConsumerDecorator` | optional | [Protocol](#ketu.decorators.consumer.protocol) | #### Producer-sink options diff --git a/src/ketu/decorators/consumer/decorator.clj b/src/ketu/decorators/consumer/decorator.clj index 6d0a4a3..25e8cff 100644 --- a/src/ketu/decorators/consumer/decorator.clj +++ b/src/ketu/decorators/consumer/decorator.clj @@ -1,11 +1,11 @@ (ns ketu.decorators.consumer.decorator (:require [ketu.decorators.consumer.protocol :as cdp])) -(defn- validate [consumer-decorator consumer-opts] - (when (not (cdp/validate consumer-decorator consumer-opts)) +(defn- valid? [consumer-decorator consumer-opts] + (when (not (cdp/valid? consumer-decorator consumer-opts)) (throw (Exception. "Consumer decorator validation failed")))) (defn decorate-poll-fn [consumer-ctx poll-fn {:keys [ketu.source/consumer-decorator] :as consumer-opts}] - (validate consumer-decorator consumer-opts) + (valid? consumer-decorator consumer-opts) #(cdp/poll! consumer-decorator consumer-ctx poll-fn)) diff --git a/src/ketu/decorators/consumer/protocol.clj b/src/ketu/decorators/consumer/protocol.clj index 472a2bb..95aad2e 100644 --- a/src/ketu/decorators/consumer/protocol.clj +++ b/src/ketu/decorators/consumer/protocol.clj @@ -1,5 +1,17 @@ (ns ketu.decorators.consumer.protocol) (defprotocol ConsumerDecorator - (poll! [this consumer-ctx poll-fn] "fn [consumer-context poll-fn] -> Iterable") - (validate [this consumer-opts] "Returns true if consumer-opts are valid according to the decorator logic")) + "Consumer decorator provides a way to extend the consumer source functionality. + The decorator runs in the context of the polling thread and allows custom control on the internal consumer instance" + (poll! [this consumer-ctx poll-fn] + "Decorates the internal consumer poll loop. + - Parameters: + - `consumer-ctx`: A map containing the consumer context, typically {:ketu.source/consumer consumer}. + - `poll-fn`: A function with no arguments that returns an Iterable of ConsumerRecord. + - Returns: An iterable collection of ConsumerRecord. + - The decorator should call the `poll-fn` on behalf of the consumer source.") + (valid? [this consumer-opts] + "Validates the consumer options according to the decorator logic. + - Parameters: + - `consumer-opts`: A map of consumer options to be validated. + - Returns: true if the consumer options are valid according to the decorator logic, false otherwise.")) From 110d21f731ec05bfc98b0652ae0a1408e8a1cd75 Mon Sep 17 00:00:00 2001 From: Idan Tavor Date: Thu, 5 Sep 2024 14:45:51 +0300 Subject: [PATCH 6/6] fix tests --- test/ketu/async/integration_test.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index df58b4d..99f294e 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -229,7 +229,7 @@ (doseq [^ConsumerRecord record records] (async/>!! result-chan (String. ^"[B" (.value record)))) records)) - (validate [_ _] + (valid? [_ _] true))} source (source/source consumer-chan clicks-consumer-opts) clicks-producer-opts {:name "clicks-producer" @@ -263,7 +263,7 @@ :ketu.source/consumer-decorator (reify ConsumerDecorator (poll! [_ _ _] nil) - (validate [_ _] + (valid? [_ _] false))}] (is (thrown-with-msg? Exception #"Consumer decorator validation failed" (source/source consumer-chan clicks-consumer-opts)))))))