From 875155606bc49ed0d97cae87ccd9ba5b28b98170 Mon Sep 17 00:00:00 2001 From: assafadato Date: Mon, 23 Jan 2023 13:50:46 +0200 Subject: [PATCH] bump kafka clients version to 3.3.1 (#16) * - use kafka client version 3.3.1 - refresh dependencies versions - use test-containers - based on @evg-tso work - deprecate checksum from ConsumerRecord * - bump major version - derives from the major kafka clients upgrade - remove unused dependency - update README.md * add CHANGELOG.md * formatting and wording of README.md and CHANGELOG.md * changelog changes, java 8 -> 11 in build files Co-authored-by: assaf.adato --- .github/workflows/dev-build.yml | 2 +- .github/workflows/main-build.yml | 2 +- CHANGELOG.md | 44 +++++++++++++++++++++++++++++++ README.md | 38 +++++++++++++------------- project.clj | 27 +++++++++---------- src/ketu/spec.clj | 2 +- test/ketu/shape/consumer_test.clj | 5 ++-- 7 files changed, 82 insertions(+), 38 deletions(-) create mode 100644 CHANGELOG.md diff --git a/.github/workflows/dev-build.yml b/.github/workflows/dev-build.yml index 9ecbc03..0787ffb 100644 --- a/.github/workflows/dev-build.yml +++ b/.github/workflows/dev-build.yml @@ -19,7 +19,7 @@ jobs: uses: actions/setup-java@v2 with: distribution: adopt - java-version: 8 + java-version: 11 - name: Setup linter uses: DeLaGuardo/setup-clj-kondo@master with: diff --git a/.github/workflows/main-build.yml b/.github/workflows/main-build.yml index 28e9add..1fcb055 100644 --- a/.github/workflows/main-build.yml +++ b/.github/workflows/main-build.yml @@ -19,7 +19,7 @@ jobs: uses: actions/setup-java@v2 with: distribution: adopt - java-version: 8 + java-version: 11 - name: Lint uses: DeLaGuardo/clojure-lint-action@master with: diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..84f3595 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,44 @@ + +# Change Log +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + +## [1.0.0] - 2023-01-10 + +### Added +- NA +### Changed +- Use Kafka clients version 3.3.1. + - **[Breaking changes](https://www.confluent.io/blog/apache-kafka-3-0-major-improvements-and-new-features/)** - Java clients library `org.apache.kafka/kafka-clients` upgraded from [2.5.1](https://kafka.apache.org/25/documentation.html) to [3.3.1](https://kafka.apache.org/33/documentation.html) + - Java 11 or higher are supported +- Refresh dependencies versions: + +| Dependency | From version | To version | +|-----------------------------------|--------------|------------| +| `org.clojure/clojure` | 1.10.1 | 1.11.1 | +| `org.clojure/core.async` | 1.3.610 | 1.6.673 | +| `expound` | 0.8.5 | 0.9.0 | +| `org.apache.kafka/kafka-clients` | 2.5.1 | 3.3.1 | +| `org.slf4j/slf4j-api` | 1.7.32 | 2.0.6 | + +- Upgrade internal dependencies + +| Dependency | From version | To version | +|----------------------------------|--------------|------------| +| `lein-cloverage` | 1.2.2 | 1.2.4 | +| `org.clojure/tools.namespace` | 1.0.0 | 1.3.0 | +| `tortue/spy` | 2.0.0 | 2.13.0 | +| `commons-io/commons-io` | 2.6 | 2.11.0 | +| `ch.qos.logback/logback-classic` | 1.2.3 | 1.4.5 | +| `org.clojure/test.check` | 1.1.0 | 1.1.1 | +| `org.testcontainers/kafka` | 1.16.2 | 1.17.6 | +| `clj-test-containers` | 0.5.0 | 0.7.4 | + +### Deprecated +- Deprecate checksum from Ketu source shape schema following deprecation notice from [ConsumerRecord](https://github.com/apache/kafka/pull/10470) version 3.0 +- Java 8 support had been deprecated since Apache Kafka 3 - [here](https://kafka.apache.org/33/documentation.html#java) + +### Fixed +- NA \ No newline at end of file diff --git a/README.md b/README.md index 8e0fe23..0067a26 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A Clojure Apache Kafka client with core.async api ```clojure -[com.appsflyer/ketu "0.6.0"] +[com.appsflyer/ketu "1.0.0"] ``` ## Features @@ -61,30 +61,32 @@ Consume a name string from kafka and produce a greeting string for that name bac Anything that is not documented is not supported and might change. +Read more about the default values used by the underlying Kafka clients v3.3.1 [here](https://kafka.apache.org/33/documentation.html) + Note: `int` is used for brevity but can also mean `long`. Don't worry about it. #### Common options (both source and sink accept these) -| Key | Type | Req? | Notes | -|-----|------|------|-------| -| :brokers | string | required | Comma separated `host:port` values e.g "broker1:9092,broker2:9092" | -| :topic | string | required | | -| :name | string | required | Simple human-readable identifier, used in logs and thread names | -| :key-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring key serializer/deserializer | -| :value-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring value serializer/deserializer | -| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | +| Key | Type | Req? | Notes | +|------------------|-------------------------|----------|----------------------------------------------------------------------------------| +| :brokers | string | required | Comma separated `host:port` values e.g "broker1:9092,broker2:9092" | +| :topic | string | required | | +| :name | string | required | Simple human-readable identifier, used in logs and thread names | +| :key-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring key serializer/deserializer | +| :value-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring value serializer/deserializer | +| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config | #### Consumer-source options -| Key | Type | Req? | Notes | -|-----|------|------|-------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| Key | Type | Req? | Notes | +|-------------|-----------------------------------------------------------------------------------------------|------------|---------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | #### Producer-sink options -| Key | Type | Req? | Notes | -|-----|------|------|-------| -| :shape | `:value`, `[:vector ]`,`[:map ]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) | -| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer | -| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer | +| Key | Type | Req? | Notes | +|-------------------|------------------------------------------------------------------------------------------------------------------|------------|------------------------------------------------------------------------------------------------| +| :shape | `:value`, `[:vector ]`,`[:map ]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) | +| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer | +| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer | ## Data shapes diff --git a/project.clj b/project.clj index 271cd88..d8148a9 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.appsflyer/ketu "0.6.1-SNAPSHOT" +(defproject com.appsflyer/ketu "1.0.0-SNAPSHOT" :description "Clojure Apache Kafka client with core.async api" :url "https://github.com/AppsFlyer/ketu" :license {:name "Apache License, Version 2.0" @@ -12,26 +12,25 @@ :username :env/clojars_user :password :env/clojars_pass}]] - :dependencies [[org.clojure/clojure "1.10.1"] - [org.clojure/core.async "1.3.610"] - [expound "0.8.5"] - [org.apache.kafka/kafka-clients "2.5.1"] - [org.slf4j/slf4j-api "1.7.32"]] + :dependencies [[org.clojure/clojure "1.11.1"] + [org.clojure/core.async "1.6.673"] + [expound "0.9.0"] + [org.apache.kafka/kafka-clients "3.3.1"] + [org.slf4j/slf4j-api "2.0.6"]] :profiles {;; REPL, development and testing :dev {:source-paths ["dev"] - :plugins [[lein-cloverage "1.2.4"]] + :plugins [[lein-cloverage "1.2.4"]] :dependencies [[org.clojure/tools.namespace "1.3.0"] ;For repl refresh - [tortue/spy "2.0.0"] + [tortue/spy "2.13.0"] [metosin/sieppari "0.0.0-alpha13"] - [commons-io/commons-io "2.6"] - [ch.qos.logback/logback-classic "1.2.3"] - [org.clojure/test.check "1.1.0"] - + [commons-io/commons-io "2.11.0"] + [ch.qos.logback/logback-classic "1.4.5"] + [org.clojure/test.check "1.1.1"] ; Kafka (docker in docker) - [org.testcontainers/kafka "1.16.2"] - [clj-test-containers "0.5.0"]] + [org.testcontainers/kafka "1.17.6"] + [clj-test-containers "0.7.4"]] :jvm-opts ["-Dlogback.configurationFile=dev-logback.xml"]} ;; Tests only, silent logs diff --git a/src/ketu/spec.clj b/src/ketu/spec.clj index d33221a..4e1db2a 100644 --- a/src/ketu/spec.clj +++ b/src/ketu/spec.clj @@ -40,7 +40,7 @@ :schema (s/cat :type (s/and ::->keyword #{:vector :map}) :fields (s/+ (s/and ::->keyword #{:key :value :topic :partition :offset :timestamp :headers - :timestamp-type :leader-epoch :checksum + :timestamp-type :leader-epoch :serialized-key-size :serialized-value-size}))) :custom (s/cat :type (s/and ::->keyword qualified-keyword?) :fields (s/* any?)) diff --git a/test/ketu/shape/consumer_test.clj b/test/ketu/shape/consumer_test.clj index ecf2689..8759ed3 100644 --- a/test/ketu/shape/consumer_test.clj +++ b/test/ketu/shape/consumer_test.clj @@ -15,12 +15,11 @@ v "v" timestamp (long 2) timestamp-type (TimestampType/CREATE_TIME) - checksum 3 ksize 4 vsize 5 headers (RecordHeaders. (Collections/singletonList (RecordHeader. "header-key" ^bytes (.getBytes "header-val")))) epoch (Optional/of (long 6)) - r (ConsumerRecord. topic partition offset timestamp timestamp-type checksum ksize vsize k v headers epoch)] + r (ConsumerRecord. topic partition offset timestamp timestamp-type ksize vsize k v headers epoch)] (testing "Convert ConsumerRecord to data" (are [ks data] (= data (let [runtime-ks ks @@ -38,7 +37,7 @@ [:vector :value :headers] [v headers] [:vector :timestampType] [timestamp-type] [:vector :timestamp-type] [timestamp-type] - [:vector :timestamp :timestamp-type :checksum :serialized-key-size :serialized-value-size :leader-epoch] [timestamp timestamp-type checksum ksize vsize epoch] + [:vector :timestamp :timestamp-type :serialized-key-size :serialized-value-size :leader-epoch] [timestamp timestamp-type ksize vsize epoch] [:map :key :value] {:key k :value v} [:map :timestamp :timestamp-type] {:timestamp timestamp :timestamp-type timestamp-type}))))