-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add drop field and convert SMT code (#1)
- Loading branch information
Showing
30 changed files
with
3,658 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# | ||
# https://help.github.com/articles/dealing-with-line-endings/ | ||
# | ||
# These are explicitly windows files and should use crlf | ||
*.bat text eol=crlf | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
.gradle | ||
build | ||
.vscode | ||
.idea | ||
bin | ||
.DS_STORE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,158 @@ | ||
# kafka-connect-drop-field-smt | ||
A Kafka Connect SMT to drop (nested) fields | ||
[![Build Status](https://dev.azure.com/bakdata/public/_apis/build/status/bakdata.kafka-connect-plugins?branchName=main)](https://dev.azure.com/bakdata/public/_build/latest?definitionId=35&branchName=main) | ||
[![Sonarcloud status](https://sonarcloud.io/api/project_badges/measure?project=com.bakdata.kafka%3Akafka-connect-plugins&metric=alert_status)](https://sonarcloud.io/project/overview?id=com.bakdata.kafka:kafka-connect-plugins) | ||
[![Code coverage](https://sonarcloud.io/api/project_badges/measure?project=com.bakdata.kafka%3Akafka-connect-plugins&metric=coverage)](https://sonarcloud.io/project/overview?id=com.bakdata.kafka:kafka-connect-plugins) | ||
[![Maven](https://img.shields.io/maven-central/v/com.bakdata.kafka-connect-plugins/kafka-connect-plugins.svg)](https://search.maven.org/search?q=g:com.bakdata.kafka-connect-plugins%20AND%20a:kafka-connect-plugins&core=gav) | ||
|
||
# Kafka Connect plugins | ||
|
||
A collection of Kafka Connect plugins. | ||
|
||
## Single Message Transforms (SMTs) | ||
|
||
### Convert | ||
|
||
#### Description | ||
|
||
Converts a byte record using the given converter class. | ||
The [MirrorMaker](https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md) | ||
connector uses byte array records. | ||
To apply other SMTs to these records, | ||
we need to convert them to the appropriate format first. | ||
|
||
Use the concrete transformation type designed for the record key (`com.bakdata.kafka.Convert$Key`) | ||
or value (`com.bakdata.kafka.Convert$Value`). | ||
|
||
#### Example | ||
|
||
This configuration snippet shows how to use `Convert`. | ||
It converts the value to a string schema. | ||
|
||
```yaml | ||
"transforms": "convert", | ||
"transforms.convert.type": "com.bakdata.kafka.Convert$Value", | ||
"transforms.convert.converter": "org.apache.kafka.connect.storage.StringConverter" | ||
``` | ||
#### Properties | ||
| Name | Description | Type | Default | Valid Values | Importance | | ||
|-------------|------------------------------|-------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|------------| | ||
| `converter` | Converter to apply to input. | class | ByteArrayConverter.class | All classes that implement the [Kafka Converter interface](https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/storage/Converter.html) | high | | ||
|
||
### Drop field | ||
|
||
#### Description | ||
|
||
Drop any (nested) field for a given path. | ||
|
||
Use the concrete transformation type designed for the record key (`com.bakdata.kafka.DropField$Key`) | ||
or value (`com.bakdata.kafka.DropField$Value`). | ||
|
||
#### Example | ||
|
||
This example shows how to configure and use `DropField`. | ||
|
||
Imagine you have the following record value: | ||
|
||
```json | ||
{ | ||
"collections": [ | ||
{ | ||
"complex_field": { | ||
"dropped_field": "This field will be dropped.", | ||
"kept_field": 1234 | ||
}, | ||
"boolean_field": true | ||
}, | ||
{ | ||
"complex_field": { | ||
"dropped_field": "This field will also be dropped.", | ||
"kept_field": 5678 | ||
}, | ||
"boolean_field": false | ||
} | ||
], | ||
"primitive_field": 9876 | ||
} | ||
``` | ||
|
||
This configuration snippet shows how to use `DropField` to exclude the field `dropped_field`. | ||
|
||
```yaml | ||
"transforms": "dropfield", | ||
"transforms.dropfield.type": "com.bakdata.kafka.DropField$Value", | ||
"transforms.dropfield.exclude": "collections.complex_field.dropped_field" | ||
``` | ||
|
||
The value would transform into this: | ||
|
||
```json | ||
{ | ||
"collections": [ | ||
{ | ||
"complex_field": { | ||
"kept_field": 1234 | ||
}, | ||
"boolean_field": true | ||
}, | ||
{ | ||
"complex_field": { | ||
"kept_field": 5678 | ||
}, | ||
"boolean_field": false | ||
} | ||
], | ||
"primitive_field": 9876 | ||
} | ||
``` | ||
|
||
#### Properties | ||
|
||
| Name | Description | Type | Default | Valid Values | Importance | | ||
|-----------|------------------------------------------|--------|---------|-----------------------------------------------------------|------------| | ||
| `exclued` | Field to path from the resulting Struct. | string | - | The path is separated by "." character. Example: `a.b.c`. | high | | ||
|
||
## Installation | ||
|
||
If you are using Docker to run Kafka Connect, | ||
you can install the SMT by adding the JAR file to your Kafka Connect image. | ||
For example: | ||
|
||
```dockerfile | ||
FROM confluentinc/cp-kafka-connect:latest | ||
# Install your source/sink connector(s) | ||
# ... | ||
ENV CONNECT_PLUGIN_PATH="/connect-plugins,/usr/share/java" | ||
# Clone the repo and build the project first. | ||
# Or download the JAR file from Sonatype. | ||
COPY ./build/libs/*.jar /connect-plugins/kafka-connect-transformations/ | ||
``` | ||
|
||
## Development | ||
|
||
If you want to contribute to this project, you can simply clone the repository and build it via Gradle. | ||
All dependencies should be included in the Gradle files, there are no external prerequisites. | ||
|
||
```bash | ||
clone >git [email protected]:bakdata/kafka-connect-plugins.git | ||
kafka-connect-plugins >cd && ./gradlew build | ||
``` | ||
|
||
Please note, that we have [code styles](https://github.com/bakdata/bakdata-code-styles) for Java. | ||
They are basically the Google style guide, with some small modifications. | ||
|
||
## Contributing | ||
|
||
We are happy if you want to contribute to this project. | ||
If you find any bugs or have suggestions for improvements, please open an issue. | ||
We are also happy to accept your PRs. | ||
Just open an issue beforehand and let us know what you want to do and why. | ||
|
||
## License | ||
|
||
This project is licensed under the MIT license. | ||
Have a look at the [LICENSE](https://github.com/bakdata/kafka-connect-plugins/blob/master/LICENSE) for more details. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
trigger: | ||
branches: | ||
include: | ||
- refs/heads/* | ||
- refs/tags/* | ||
pr: | ||
branches: | ||
include: | ||
- main | ||
|
||
variables: | ||
- group: sonarqube | ||
- group: sign | ||
- group: ossrh | ||
- group: changelog | ||
|
||
resources: | ||
repositories: | ||
- repository: templates | ||
type: github | ||
name: bakdata/bakdata-project-templates | ||
endpoint: bot | ||
|
||
jobs: | ||
- template: azure/gradle/build.yml@templates | ||
- template: azure/gradle/create_tag_version.yml@templates | ||
- template: azure/gradle/upload_release.yml@templates | ||
- template: azure/gradle/upload_snapshot.yml@templates |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
description = "A Kafka Connect SMT for removing nested fields in keys and values." | ||
|
||
plugins { | ||
`java-library` | ||
id("net.researchgate.release") version "3.0.2" | ||
id("com.bakdata.sonar") version "1.1.7" | ||
id("com.bakdata.sonatype") version "1.1.7" | ||
id("org.hildan.github.changelog") version "1.12.1" | ||
id("com.github.davidmc24.gradle.plugin.avro") version "1.6.0" | ||
id("io.freefair.lombok") version "6.6.1" | ||
} | ||
|
||
group = "com.bakdata.kafka" | ||
repositories { | ||
// Use Maven Central for resolving dependencies. | ||
mavenCentral() | ||
maven(url = "https://packages.confluent.io/maven/") | ||
} | ||
|
||
dependencies { | ||
val kafkaVersion: String by project | ||
compileOnly(group = "org.apache.kafka", name = "connect-transforms", version = kafkaVersion) | ||
compileOnly(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) { | ||
exclude(group = "org.slf4j", module = "slf4j-log4j12") | ||
} | ||
|
||
val log4jVersion: String by project | ||
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion) | ||
testImplementation(group = "org.apache.kafka", name = "connect-api", version = kafkaVersion) | ||
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.3.0") { | ||
exclude(group = "org.slf4j", module = "slf4j-log4j12") | ||
} | ||
testImplementation(group = "org.apache.kafka", name = "connect-file", version = kafkaVersion) | ||
|
||
val confluentVersion: String by project | ||
testImplementation(group = "io.confluent", name = "kafka-connect-avro-converter", version = confluentVersion) { | ||
exclude(group = "org.slf4j", module = "slf4j-log4j12") | ||
} | ||
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) { | ||
exclude(group = "org.slf4j", module = "slf4j-log4j12") | ||
} | ||
|
||
val avroVersion: String by project | ||
testImplementation(group = "org.apache.avro", name = "avro", version = avroVersion) | ||
|
||
val junitVersion: String by project | ||
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion) | ||
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) | ||
testImplementation(group = "org.assertj", name = "assertj-core", version = "3.24.2") | ||
|
||
testImplementation( | ||
group = "com.bakdata.fluent-kafka-streams-tests", | ||
name = "schema-registry-mock-junit5", | ||
version = "2.8.1" | ||
) | ||
} | ||
|
||
configure<JavaPluginExtension> { | ||
sourceCompatibility = JavaVersion.VERSION_11 | ||
targetCompatibility = JavaVersion.VERSION_11 | ||
} | ||
|
||
configure<com.bakdata.gradle.SonatypeSettings> { | ||
developers { | ||
developer { | ||
name.set("Ramin Gharib") | ||
id.set("raminqaf") | ||
} | ||
} | ||
} | ||
|
||
configure<org.hildan.github.changelog.plugin.GitHubChangelogExtension> { | ||
githubUser = "bakdata" | ||
futureVersionTag = findProperty("changelog.releaseVersion")?.toString() | ||
sinceTag = findProperty("changelog.sinceTag")?.toString() | ||
} | ||
|
||
tasks { | ||
compileJava { | ||
options.encoding = "UTF-8" | ||
} | ||
compileTestJava { | ||
options.encoding = "UTF-8" | ||
} | ||
test { | ||
useJUnitPlatform() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
version=1.0.0-SNAPSHOT | ||
org.gradle.caching=true | ||
org.gradle.parallel=true | ||
org.gradle.jvmargs=-Xmx2048m | ||
junitVersion=5.9.2 | ||
kafkaVersion=3.3.2 | ||
avroVersion=1.11.1 | ||
confluentVersion=7.3.1 | ||
log4jVersion=2.19.0 |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip | ||
networkTimeout=10000 | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
Oops, something went wrong.