Skip to content

Commit

Permalink
bump kafka clients version to 3.3.1 (#16)
Browse files Browse the repository at this point in the history
* - use kafka client version 3.3.1
- refresh dependencies versions
- use test-containers - based on @evg-tso work
- deprecate checksum from ConsumerRecord

* - bump major version - derives from the major kafka clients upgrade
- remove unused dependency
- update README.md

* add CHANGELOG.md

* formatting and wording of README.md and CHANGELOG.md

* changelog changes, java 8 -> 11 in build files

Co-authored-by: assaf.adato <[email protected]>
  • Loading branch information
assafadato and assaf.adato authored Jan 23, 2023
1 parent 690e1f5 commit 8751556
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dev-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: 8
java-version: 11
- name: Setup linter
uses: DeLaGuardo/setup-clj-kondo@master
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions/setup-java@v2
with:
distribution: adopt
java-version: 8
java-version: 11
- name: Lint
uses: DeLaGuardo/clojure-lint-action@master
with:
Expand Down
44 changes: 44 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

# Change Log
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [1.0.0] - 2023-01-10

### Added
- NA
### Changed
- Use Kafka clients version 3.3.1.
- **[Breaking changes](https://www.confluent.io/blog/apache-kafka-3-0-major-improvements-and-new-features/)** - Java clients library `org.apache.kafka/kafka-clients` upgraded from [2.5.1](https://kafka.apache.org/25/documentation.html) to [3.3.1](https://kafka.apache.org/33/documentation.html)
- Java 11 or higher are supported
- Refresh dependencies versions:

| Dependency | From version | To version |
|-----------------------------------|--------------|------------|
| `org.clojure/clojure` | 1.10.1 | 1.11.1 |
| `org.clojure/core.async` | 1.3.610 | 1.6.673 |
| `expound` | 0.8.5 | 0.9.0 |
| `org.apache.kafka/kafka-clients` | 2.5.1 | 3.3.1 |
| `org.slf4j/slf4j-api` | 1.7.32 | 2.0.6 |

- Upgrade internal dependencies

| Dependency | From version | To version |
|----------------------------------|--------------|------------|
| `lein-cloverage` | 1.2.2 | 1.2.4 |
| `org.clojure/tools.namespace` | 1.0.0 | 1.3.0 |
| `tortue/spy` | 2.0.0 | 2.13.0 |
| `commons-io/commons-io` | 2.6 | 2.11.0 |
| `ch.qos.logback/logback-classic` | 1.2.3 | 1.4.5 |
| `org.clojure/test.check` | 1.1.0 | 1.1.1 |
| `org.testcontainers/kafka` | 1.16.2 | 1.17.6 |
| `clj-test-containers` | 0.5.0 | 0.7.4 |

### Deprecated
- Deprecate checksum from Ketu source shape schema following deprecation notice from [ConsumerRecord](https://github.com/apache/kafka/pull/10470) version 3.0
- Java 8 support had been deprecated since Apache Kafka 3 - [here](https://kafka.apache.org/33/documentation.html#java)

### Fixed
- NA
38 changes: 20 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
A Clojure Apache Kafka client with core.async api

```clojure
[com.appsflyer/ketu "0.6.0"]
[com.appsflyer/ketu "1.0.0"]
```

## Features
Expand Down Expand Up @@ -61,30 +61,32 @@ Consume a name string from kafka and produce a greeting string for that name bac

Anything that is not documented is not supported and might change.

Read more about the default values used by the underlying Kafka clients v3.3.1 [here](https://kafka.apache.org/33/documentation.html)

Note: `int` is used for brevity but can also mean `long`. Don't worry about it.

#### Common options (both source and sink accept these)
| Key | Type | Req? | Notes |
|-----|------|------|-------|
| :brokers | string | required | Comma separated `host:port` values e.g "broker1:9092,broker2:9092" |
| :topic | string | required | |
| :name | string | required | Simple human-readable identifier, used in logs and thread names |
| :key-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring key serializer/deserializer |
| :value-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring value serializer/deserializer |
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |
| Key | Type | Req? | Notes |
|------------------|-------------------------|----------|----------------------------------------------------------------------------------|
| :brokers | string | required | Comma separated `host:port` values e.g "broker1:9092,broker2:9092" |
| :topic | string | required | |
| :name | string | required | Simple human-readable identifier, used in logs and thread names |
| :key-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring key serializer/deserializer |
| :value-type | `:string`,`:byte-array` | optional | Default `:byte-array`, used in configuring value serializer/deserializer |
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|-----|------|------|-------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| Key | Type | Req? | Notes |
|-------------|-----------------------------------------------------------------------------------------------|------------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |

#### Producer-sink options
| Key | Type | Req? | Notes |
|-----|------|------|-------|
| :shape | `:value`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) |
| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer |
| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer |
| Key | Type | Req? | Notes |
|-------------------|------------------------------------------------------------------------------------------------------------------|------------|------------------------------------------------------------------------------------------------|
| :shape | `:value`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) |
| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer |
| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer |

## Data shapes

Expand Down
27 changes: 13 additions & 14 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject com.appsflyer/ketu "0.6.1-SNAPSHOT"
(defproject com.appsflyer/ketu "1.0.0-SNAPSHOT"
:description "Clojure Apache Kafka client with core.async api"
:url "https://github.com/AppsFlyer/ketu"
:license {:name "Apache License, Version 2.0"
Expand All @@ -12,26 +12,25 @@
:username :env/clojars_user
:password :env/clojars_pass}]]

:dependencies [[org.clojure/clojure "1.10.1"]
[org.clojure/core.async "1.3.610"]
[expound "0.8.5"]
[org.apache.kafka/kafka-clients "2.5.1"]
[org.slf4j/slf4j-api "1.7.32"]]
:dependencies [[org.clojure/clojure "1.11.1"]
[org.clojure/core.async "1.6.673"]
[expound "0.9.0"]
[org.apache.kafka/kafka-clients "3.3.1"]
[org.slf4j/slf4j-api "2.0.6"]]

:profiles {;; REPL, development and testing
:dev
{:source-paths ["dev"]
:plugins [[lein-cloverage "1.2.4"]]
:plugins [[lein-cloverage "1.2.4"]]
:dependencies [[org.clojure/tools.namespace "1.3.0"] ;For repl refresh
[tortue/spy "2.0.0"]
[tortue/spy "2.13.0"]
[metosin/sieppari "0.0.0-alpha13"]
[commons-io/commons-io "2.6"]
[ch.qos.logback/logback-classic "1.2.3"]
[org.clojure/test.check "1.1.0"]

[commons-io/commons-io "2.11.0"]
[ch.qos.logback/logback-classic "1.4.5"]
[org.clojure/test.check "1.1.1"]
; Kafka (docker in docker)
[org.testcontainers/kafka "1.16.2"]
[clj-test-containers "0.5.0"]]
[org.testcontainers/kafka "1.17.6"]
[clj-test-containers "0.7.4"]]
:jvm-opts ["-Dlogback.configurationFile=dev-logback.xml"]}

;; Tests only, silent logs
Expand Down
2 changes: 1 addition & 1 deletion src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
:schema (s/cat :type (s/and ::->keyword #{:vector :map})
:fields (s/+ (s/and ::->keyword
#{:key :value :topic :partition :offset :timestamp :headers
:timestamp-type :leader-epoch :checksum
:timestamp-type :leader-epoch
:serialized-key-size :serialized-value-size})))
:custom (s/cat :type (s/and ::->keyword qualified-keyword?)
:fields (s/* any?))
Expand Down
5 changes: 2 additions & 3 deletions test/ketu/shape/consumer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
v "v"
timestamp (long 2)
timestamp-type (TimestampType/CREATE_TIME)
checksum 3
ksize 4
vsize 5
headers (RecordHeaders. (Collections/singletonList (RecordHeader. "header-key" ^bytes (.getBytes "header-val"))))
epoch (Optional/of (long 6))
r (ConsumerRecord. topic partition offset timestamp timestamp-type checksum ksize vsize k v headers epoch)]
r (ConsumerRecord. topic partition offset timestamp timestamp-type ksize vsize k v headers epoch)]

(testing "Convert ConsumerRecord to data"
(are [ks data] (= data (let [runtime-ks ks
Expand All @@ -38,7 +37,7 @@
[:vector :value :headers] [v headers]
[:vector :timestampType] [timestamp-type]
[:vector :timestamp-type] [timestamp-type]
[:vector :timestamp :timestamp-type :checksum :serialized-key-size :serialized-value-size :leader-epoch] [timestamp timestamp-type checksum ksize vsize epoch]
[:vector :timestamp :timestamp-type :serialized-key-size :serialized-value-size :leader-epoch] [timestamp timestamp-type ksize vsize epoch]
[:map :key :value] {:key k :value v}
[:map :timestamp :timestamp-type] {:timestamp timestamp :timestamp-type timestamp-type}))))

Expand Down

0 comments on commit 8751556

Please sign in to comment.