## Installation

If you are using Docker to run Kafka Connect,
you can install the SMT by adding the JAR file to your Kafka Connect image.
For example:

```dockerfile
FROM confluentinc/cp-kafka-connect:latest

# Install your source/sink connector(s)
# ...

ENV CONNECT_PLUGIN_PATH="/connect-plugins,/usr/share/java"

# Clone the repo and build the project first.
# Or download the JAR file from Sonatype.
COPY ./build/libs/*.jar /connect-plugins/kafka-connect-transformations/
```

## Development

If you want to contribute to this project, you can simply clone the repository and build it via Gradle.
All dependencies should be included in the Gradle files, there are no external prerequisites.

```bash
clone >git git@github.com:bakdata/kafka-connect-plugins.git
kafka-connect-plugins >cd && ./gradlew build

```

Please note, that we have [code styles](https://github.com/bakdata/bakdata-code-styles) for Java. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.converters.ByteArrayConverter; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +/** + * Converts a byte array schema using a given {@link Converter}. + * + * @param Record type + */ +public abstract class Convert> implements Transformation { + public static final String CONVERTER_FIELD = "converter"; + private static final String FIELD_DOCUMENTATION = "Converter to apply to input."; + + private static final Set BYTE_ARRAY_SCHEMAS = Set.of(Schema.OPTIONAL_BYTES_SCHEMA, Schema.BYTES_SCHEMA); + private static final ConfigDef CONFIG_DEF = + new ConfigDef().define(CONVERTER_FIELD, Type.CLASS, ByteArrayConverter.class, Importance.HIGH, + FIELD_DOCUMENTATION); + private Converter converter; + + @Override + public void configure(final Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + final Map converterConfigs = new HashMap<>(configs); + converterConfigs.put(ConverterConfig.TYPE_CONFIG, this.converterType().getName()); + this.converter = config.getConfiguredInstance(CONVERTER_FIELD, Converter.class, converterConfigs); + } + + @Override + public R apply(final R inputRecord) { + if (this.operatingValue(inputRecord) == null) { + return inputRecord; + } + final Schema schema = this.operatingSchema(inputRecord); + if (schema == null) { + throw new ConnectException("Schema should not be null."); + } + if (BYTE_ARRAY_SCHEMAS.contains(schema)) { + final byte[] value = (byte[]) this.operatingValue(inputRecord); + final SchemaAndValue schemaAndValue = this.converter.toConnectData(inputRecord.topic(), value); + return this.newRecord(inputRecord, schemaAndValue.schema(), schemaAndValue.value()); + } else { + throw new ConnectException("Unsupported Schema " + schema); + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + this.converter = null; + } + + protected abstract Schema operatingSchema(R inputRecord); + + protected abstract Object operatingValue(R inputRecord); + + protected abstract ConverterType converterType(); + + protected abstract R newRecord(R inputRecord, Schema updatedSchema, Object updatedValue); + + /** + * Implements the method for applying the SMT to the record key. + * + * @param Record type + */ + public static class Key> extends Convert { + @Override + protected Schema operatingSchema(final R inputRecord) { + return inputRecord.keySchema(); + } + + @Override + protected ConverterType converterType() { + return ConverterType.KEY; + } + + @Override + protected Object operatingValue(final R inputRecord) { + return inputRecord.key(); + } + + @Override + protected R newRecord(final R inputRecord, final Schema updatedSchema, final Object updatedValue) { + return inputRecord.newRecord(inputRecord.topic(), inputRecord.kafkaPartition(), updatedSchema, updatedValue, + inputRecord.valueSchema(), inputRecord.value(), inputRecord.timestamp()); + } + } + + /** + * Implements the method for applying the SMT to the record value. + * + * @param Record type + */ + public static class Value> extends Convert { + @Override + protected Schema operatingSchema(final R inputRecord) { + return inputRecord.valueSchema(); + } + + @Override + protected Object operatingValue(final R inputRecord) { + return inputRecord.value(); + } + + @Override + protected ConverterType converterType() { + return ConverterType.VALUE; + } + + @Override + protected R newRecord(final R inputRecord, final Schema updatedSchema, final Object updatedValue) { + return inputRecord.newRecord(inputRecord.topic(), inputRecord.kafkaPartition(), inputRecord.keySchema(), + inputRecord.key(), + updatedSchema, updatedValue, inputRecord.timestamp()); + } + } +} diff --git a/src/main/java/com/bakdata/kafka/DropField.java b/src/main/java/com/bakdata/kafka/DropField.java new file mode 100644 index 0000000..ba14575 --- /dev/null +++ b/src/main/java/com/bakdata/kafka/DropField.java @@ -0,0 +1,170 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static com.bakdata.kafka.StructFieldDropper.createStructFieldDropper; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +/** + * Drop any (nested) field for a given path. + * + * @param Record type + */ +public abstract class DropField> implements Transformation { + public static final String EXCLUDE_FIELD = "exclude"; + private static final String PURPOSE = "field deletion"; + private static final String FIELD_DOCUMENTATION = "Fields to exclude from the resulting Struct."; + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(EXCLUDE_FIELD, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, FIELD_DOCUMENTATION); + private static final Set STRING_SCHEMA = Set.of(Schema.OPTIONAL_STRING_SCHEMA, Schema.STRING_SCHEMA); + private Path excludePath; + + @Override + public void configure(final Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + final String exclude = config.getString(EXCLUDE_FIELD); + this.excludePath = Path.split(exclude); + } + + @Override + public R apply(final R inputRecord) { + if (this.operatingValue(inputRecord) == null) { + return inputRecord; + } else if (this.operatingSchema(inputRecord) != null) { + return this.applyWithSchema(inputRecord); + } else { + throw new ConnectException("This SMT can be applied only to records with a schema."); + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + protected abstract Schema operatingSchema(R inputRecord); + + protected abstract Object operatingValue(R inputRecord); + + protected abstract R newRecord(R inputRecord, Schema updatedSchema, Object updatedValue); + + private R applyWithSchema(final R inputRecord) { + final Schema schema = this.operatingSchema(inputRecord); + if (STRING_SCHEMA.contains(schema)) { + return this.applyToStringSchema(inputRecord, schema); + } + return this.applyToStruct(inputRecord); + } + + private R applyToStringSchema(final R inputRecord, final Schema schema) { + final JsonFieldDropper jsonFieldDropper = JsonFieldDropper.createJsonFieldDropper(this.excludePath); + final String value = (String) this.operatingValue(inputRecord); + final ObjectMapper objectMapper = new ObjectMapper(); + try { + final JsonNode jsonNode = objectMapper.readTree(value); + final ObjectNode dropped = jsonFieldDropper.processObject((ObjectNode) jsonNode); + final String writeValueAsString = objectMapper.writeValueAsString(dropped); + return this.newRecord(inputRecord, schema, writeValueAsString); + } catch (final JsonProcessingException e) { + throw new ConnectException(String.format("Could not process the input JSON: %s", e)); + } + } + + private R applyToStruct(final R inputRecord) { + final StructFieldDropper structFieldDropper = createStructFieldDropper(this.excludePath); + final Struct value = requireStruct(this.operatingValue(inputRecord), PURPOSE); + + final Struct updatedValue = structFieldDropper.updateStruct(value); + return this.newRecord(inputRecord, updatedValue.schema(), updatedValue); + } + + /** + * Implements the method for applying the SMT to the record key. + * + * @param Record type + */ + public static class Key> extends DropField { + + @Override + protected Schema operatingSchema(final R inputRecord) { + return inputRecord.keySchema(); + } + + @Override + protected Object operatingValue(final R inputRecord) { + return inputRecord.key(); + } + + @Override + protected R newRecord(final R inputRecord, final Schema updatedSchema, final Object updatedValue) { + return inputRecord.newRecord(inputRecord.topic(), inputRecord.kafkaPartition(), updatedSchema, updatedValue, + inputRecord.valueSchema(), inputRecord.value(), inputRecord.timestamp()); + } + } + + /** + * Implements the method for applying the SMT to the record value. + * + * @param Record type + */ + public static class Value> extends DropField { + @Override + protected Schema operatingSchema(final R inputRecord) { + return inputRecord.valueSchema(); + } + + @Override + protected Object operatingValue(final R inputRecord) { + return inputRecord.value(); + } + + @Override + protected R newRecord(final R inputRecord, final Schema updatedSchema, final Object updatedValue) { + return inputRecord.newRecord(inputRecord.topic(), inputRecord.kafkaPartition(), inputRecord.keySchema(), + inputRecord.key(), + updatedSchema, updatedValue, inputRecord.timestamp()); + } + } +} diff --git a/src/main/java/com/bakdata/kafka/JsonFieldDropper.java b/src/main/java/com/bakdata/kafka/JsonFieldDropper.java new file mode 100644 index 0000000..bc274ce --- /dev/null +++ b/src/main/java/com/bakdata/kafka/JsonFieldDropper.java @@ -0,0 +1,79 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import edu.umd.cs.findbugs.annotations.NonNull; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +class JsonFieldDropper { + @NonNull + private final PathTraverser pathTraverser; + + static JsonFieldDropper createJsonFieldDropper(final Path excludePath) { + return new JsonFieldDropper(PathTraverser.initialize(excludePath)); + } + + ObjectNode processObject(final ObjectNode value) { + final ObjectNode objectCopy = JsonNodeFactory.instance.objectNode(); + value.fields().forEachRemaining(field -> { + final String fieldName = field.getKey(); + final PathTraverser subPath = this.pathTraverser.getSubPath(fieldName); + if (subPath.isIncluded()) { + JsonNode fieldValue = field.getValue(); + if (subPath.isPrefix()) { + final JsonFieldDropper jsonFieldDropper = new JsonFieldDropper(subPath); + fieldValue = jsonFieldDropper.transform(fieldValue); + } + objectCopy.set(fieldName, fieldValue); + } + } + ); + return objectCopy; + } + + private JsonNode transform(final JsonNode value) { + switch (value.getNodeType()) { + case ARRAY: + return this.processArray(value); + case OBJECT: + return this.processObject((ObjectNode) value); + default: + return value; + } + } + + private ArrayNode processArray(final Iterable value) { + final ArrayNode arrayCopy = JsonNodeFactory.instance.arrayNode(); + for (final JsonNode jsonNode : value) { + arrayCopy.add(this.transform(jsonNode)); + } + return arrayCopy; + } +} diff --git a/src/main/java/com/bakdata/kafka/Path.java b/src/main/java/com/bakdata/kafka/Path.java new file mode 100644 index 0000000..fef77ee --- /dev/null +++ b/src/main/java/com/bakdata/kafka/Path.java @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +class Path { + private static final Pattern DOT_REGEX = Pattern.compile("\\."); + private final List splitPath; + + static Path split(final CharSequence exclude) { + return new Path(Arrays.asList(DOT_REGEX.split(exclude))); + } + + Path getSubPath(final String fieldName) { + final List strings = new ArrayList<>(this.splitPath); + strings.add(fieldName); + return new Path(strings); + } + + boolean isIncluded(final Path otherPath) { + return !otherPath.splitPath.equals(this.splitPath); + } + + boolean isPrefix(final Path otherPath) { + return this.splitPath.size() <= otherPath.splitPath.size() && + otherPath.splitPath.subList(0, this.splitPath.size()).equals(this.splitPath); + } +} diff --git a/src/main/java/com/bakdata/kafka/PathTraverser.java b/src/main/java/com/bakdata/kafka/PathTraverser.java new file mode 100644 index 0000000..3b33a0e --- /dev/null +++ b/src/main/java/com/bakdata/kafka/PathTraverser.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.Collections; +import lombok.AccessLevel; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +final class PathTraverser { + private final @NonNull Path excludePath; + private final @NonNull Path currentPath; + + static PathTraverser initialize(final Path excludePath) { + return new PathTraverser(excludePath, new Path(Collections.emptyList())); + } + + PathTraverser getSubPath(final String fieldName) { + return new PathTraverser(this.excludePath, this.currentPath.getSubPath(fieldName)); + } + + boolean isIncluded() { + return this.currentPath.isIncluded(this.excludePath); + } + + boolean isPrefix() { + return this.currentPath.isPrefix(this.excludePath); + } +} diff --git a/src/main/java/com/bakdata/kafka/SchemaDropper.java b/src/main/java/com/bakdata/kafka/SchemaDropper.java new file mode 100644 index 0000000..ca063fc --- /dev/null +++ b/src/main/java/com/bakdata/kafka/SchemaDropper.java @@ -0,0 +1,91 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.transforms.util.SchemaUtil; + +@RequiredArgsConstructor +class SchemaDropper { + private final @NonNull PathTraverser pathTraverser; + + static SchemaDropper createSchemaDropper(final Path excludePath) { + return new SchemaDropper(PathTraverser.initialize(excludePath)); + } + + Schema processSchema(final Schema schema) { + final SchemaBuilder schemaCopy = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + this.addFields(schema, schemaCopy); + return schemaCopy.build(); + } + + private void addFields(final Schema schema, final SchemaBuilder schemaCopy) { + schema.fields().forEach(field -> { + final String fieldName = field.name(); + final PathTraverser subPath = this.pathTraverser.getSubPath(fieldName); + if (subPath.isIncluded()) { + Schema fieldSchema = field.schema(); + if (subPath.isPrefix()) { + final SchemaDropper deleteSchema = new SchemaDropper(subPath); + fieldSchema = deleteSchema.transform(field.schema()); + } + schemaCopy.field(fieldName, fieldSchema); + } + }); + } + + private Schema transform(final Schema schema) { + switch (schema.type()) { + case MAP: + return this.processMap(schema); + case ARRAY: + return this.processArray(schema); + case STRUCT: + return this.processSchema(schema); + default: + return schema; + } + } + + private Schema processMap(final Schema schema) { + final Schema updatedValueSchema = this.transform(schema.valueSchema()); + final Schema updatedKeySchema = this.transform(schema.keySchema()); + return SchemaBuilder + .map(updatedKeySchema, updatedValueSchema) + .name(schema.name()) + .build(); + } + + private Schema processArray(final Schema schema) { + final Schema updatedSchema = this.transform(schema.valueSchema()); + return SchemaBuilder + .array(updatedSchema) + .name(schema.name()) + .build(); + } +} diff --git a/src/main/java/com/bakdata/kafka/StructDropper.java b/src/main/java/com/bakdata/kafka/StructDropper.java new file mode 100644 index 0000000..6ee0422 --- /dev/null +++ b/src/main/java/com/bakdata/kafka/StructDropper.java @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +@RequiredArgsConstructor +class StructDropper { + private final @NonNull PathTraverser pathTraverser; + + static StructDropper createStructDropper(final Path excludePath) { + return new StructDropper(PathTraverser.initialize(excludePath)); + } + + Struct processStruct(final Struct struct, final Schema updatedSchema) { + final Struct structCopy = new Struct(updatedSchema); + structCopy.schema().fields().forEach(field -> { + final String fieldName = field.name(); + final PathTraverser subPath = this.pathTraverser.getSubPath(fieldName); + if (subPath.isIncluded()) { + Object fieldValue = struct.get(fieldName); + if (subPath.isPrefix()) { + final StructDropper structDropper = new StructDropper(subPath); + fieldValue = structDropper.transform(struct.get(fieldName), field.schema()); + } + structCopy.put(fieldName, fieldValue); + } + } + ); + return structCopy; + } + + // Cast is safe because the types are checked in the switch + @SuppressWarnings("unchecked") + private Object transform(final Object struct, final Schema schema) { + switch (schema.type()) { + case ARRAY: + return this.processArray((List) struct, schema.valueSchema()); + case MAP: + return this.processMap((Map) struct, schema.valueSchema()); + case STRUCT: + return this.processStruct((Struct) struct, schema); + default: + return struct; + } + } + + private Map processMap(final Map value, final Schema schema) { + final Map mapValues = new HashMap<>(); + + for (final Entry entry : value.entrySet()) { + mapValues.put(entry.getKey(), this.transform(entry.getValue(), schema)); + } + return mapValues; + } + + private List processArray(final Iterable value, final Schema schema) { + final List arrayValues = new ArrayList<>(); + + for (final Object arrayValue : value) { + arrayValues.add(this.transform(arrayValue, schema)); + } + return arrayValues; + } +} diff --git a/src/main/java/com/bakdata/kafka/StructFieldDropper.java b/src/main/java/com/bakdata/kafka/StructFieldDropper.java new file mode 100644 index 0000000..a1601d3 --- /dev/null +++ b/src/main/java/com/bakdata/kafka/StructFieldDropper.java @@ -0,0 +1,60 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static com.bakdata.kafka.SchemaDropper.createSchemaDropper; +import static com.bakdata.kafka.StructDropper.createStructDropper; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +@RequiredArgsConstructor +final class StructFieldDropper { + private static final int CACHE_SIZE = 16; + private final Cache schemaUpdateCache; + private final SchemaDropper schemaDropper; + private final StructDropper structDropper; + + static StructFieldDropper createStructFieldDropper(final Path excludePath) { + final SchemaDropper schemaDropper = createSchemaDropper(excludePath); + final StructDropper structDropper = createStructDropper(excludePath); + return new StructFieldDropper(new SynchronizedCache<>(new LRUCache<>(CACHE_SIZE)), schemaDropper, + structDropper); + } + + Struct updateStruct(final Struct value) { + Schema updatedSchema = this.schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + updatedSchema = this.schemaDropper.processSchema(value.schema()); + this.schemaUpdateCache.put(value.schema(), updatedSchema); + } + + return this.structDropper.processStruct(value, updatedSchema); + } +} diff --git a/src/test/avro/NestedObject.avsc b/src/test/avro/NestedObject.avsc new file mode 100644 index 0000000..0b0b5fc --- /dev/null +++ b/src/test/avro/NestedObject.avsc @@ -0,0 +1,15 @@ +{ + "type": "record", + "name": "NestedObject", + "namespace": "com.bakdata.test.smt", + "fields": [ + { + "name": "complex_object", + "type": "PrimitiveObject" + }, + { + "name": "boolean_field", + "type": "boolean" + } + ] +} diff --git a/src/test/avro/PrimitiveObject.avsc b/src/test/avro/PrimitiveObject.avsc new file mode 100644 index 0000000..de51430 --- /dev/null +++ b/src/test/avro/PrimitiveObject.avsc @@ -0,0 +1,15 @@ +{ + "type": "record", + "name": "PrimitiveObject", + "namespace": "com.bakdata.test.smt", + "fields": [ + { + "name": "dropped_field", + "type": "string" + }, + { + "name": "kept_field", + "type": "int" + } + ] +} diff --git a/src/test/avro/RecordCollection.avsc b/src/test/avro/RecordCollection.avsc new file mode 100644 index 0000000..cd5c870 --- /dev/null +++ b/src/test/avro/RecordCollection.avsc @@ -0,0 +1,15 @@ +{ + "type": "record", + "name": "RecordCollection", + "namespace": "com.bakdata.test.smt", + "fields": [ + { + "name": "collections", + "type": { + "type": "array", + "items": "NestedObject" + }, + "default": [] + } + ] +} diff --git a/src/test/java/com/bakdata/kafka/ConvertIntegrationTest.java b/src/test/java/com/bakdata/kafka/ConvertIntegrationTest.java new file mode 100644 index 0000000..69d2200 --- /dev/null +++ b/src/test/java/com/bakdata/kafka/ConvertIntegrationTest.java @@ -0,0 +1,134 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.singletonList; +import static net.mguenther.kafka.junit.EmbeddedConnectConfig.kafkaConnect; +import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; +import static net.mguenther.kafka.junit.SendKeyValues.to; +import static net.mguenther.kafka.junit.TopicConfig.withName; +import static net.mguenther.kafka.junit.Wait.delay; +import static org.assertj.core.api.Assertions.assertThat; + +import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import net.mguenther.kafka.junit.EmbeddedKafkaCluster; +import net.mguenther.kafka.junit.KeyValue; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.converters.ByteArrayConverter; +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.StringConverter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ConvertIntegrationTest { + private static final String CONVERTER = "Converter"; + private static final String TOPIC = "input"; + @RegisterExtension + final SchemaRegistryMockExtension schemaRegistryMock = new SchemaRegistryMockExtension(); + private EmbeddedKafkaCluster kafkaCluster; + private Path outputFile; + + @BeforeEach + void setUp() throws IOException { + this.outputFile = Files.createTempFile("test", "temp"); + this.kafkaCluster = this.createCluster(); + this.kafkaCluster.start(); + this.kafkaCluster.createTopic(withName(TOPIC).build()); + } + + @AfterEach + void tearDown() throws IOException { + this.kafkaCluster.stop(); + Files.deleteIfExists(this.outputFile); + } + + @Test + void shouldConvert() throws InterruptedException, IOException { + final byte[] value = "foo".getBytes(); + + final List> records = singletonList(new KeyValue<>("k1", value)); + this.kafkaCluster.send(to(TOPIC, records) + .withAll(this.createProducerProperties()) + .build()); + + // makes sure that both records are processed + delay(2, TimeUnit.SECONDS); + final List output = Files.readAllLines(this.outputFile); + assertThat(output).containsExactly("foo"); + } + + private EmbeddedKafkaCluster createCluster() { + return EmbeddedKafkaCluster.provisionWith( + newClusterConfig() + .configure( + kafkaConnect() + .deployConnector(this.config()) + .build()) + .build()); + } + + private Properties config() { + final Properties properties = new Properties(); + properties.put(ConnectorConfig.NAME_CONFIG, "test"); + properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink"); + properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ByteArrayConverter.class.getName()); + properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMock.getUrl()); + + // SMT config + properties.put(ConnectorConfig.TRANSFORMS_CONFIG, CONVERTER); + properties.put(ConnectorConfig.TRANSFORMS_CONFIG + "." + CONVERTER + ".type", + Convert.Value.class.getName()); + properties.put(ConnectorConfig.TRANSFORMS_CONFIG + "." + CONVERTER + "." + Convert.CONVERTER_FIELD, + StringConverter.class.getName()); + + properties.put(SinkConnector.TOPICS_CONFIG, TOPIC); + properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString()); + return properties; + } + + private Properties createProducerProperties() { + final Properties properties = new Properties(); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList()); + properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMock.getUrl()); + return properties; + } +} diff --git a/src/test/java/com/bakdata/kafka/ConvertTest.java b/src/test/java/com/bakdata/kafka/ConvertTest.java new file mode 100644 index 0000000..41a369d --- /dev/null +++ b/src/test/java/com/bakdata/kafka/ConvertTest.java @@ -0,0 +1,117 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static com.bakdata.kafka.Convert.CONVERTER_FIELD; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.bakdata.kafka.Convert.Key; +import com.bakdata.kafka.Convert.Value; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.storage.StringConverter; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +class ConvertTest { + + private static final String TEST_TOPIC = "test-topic"; + + @InjectSoftAssertions + private SoftAssertions softly; + + private static SinkRecord getSinkRecord(final Schema keySchema, final Object keyValue, final Schema valueSchema, + final Object valueValue) { + return new SinkRecord(TEST_TOPIC, 0, keySchema, keyValue, valueSchema, valueValue, 0); + } + + @Test + void shouldReturnInputRecordWhenValueIsNull() { + final SinkRecord sinkRecord = getSinkRecord(null, "testKey".getBytes(StandardCharsets.UTF_8), + Schema.BYTES_SCHEMA, + null); + try (final Convert convert = new Value<>()) { + convert.configure(Map.of(CONVERTER_FIELD, StringConverter.class)); + final SinkRecord newRecord = convert.apply(sinkRecord); + this.softly.assertThat(newRecord.key()).isEqualTo("testKey".getBytes(StandardCharsets.UTF_8)); + this.softly.assertThat(newRecord.value()).isNull(); + } + } + + @Test + void shouldConvertValue() { + final SinkRecord sinkRecord = getSinkRecord(null, "testKey".getBytes(StandardCharsets.UTF_8), + Schema.BYTES_SCHEMA, "testValue".getBytes(StandardCharsets.UTF_8)); + try (final Convert convert = new Value<>()) { + convert.configure(Map.of(CONVERTER_FIELD, StringConverter.class)); + final SinkRecord converted = convert.apply(sinkRecord); + this.softly.assertThat(converted.key()).isEqualTo("testKey".getBytes(StandardCharsets.UTF_8)); + this.softly.assertThat(converted.value()).isEqualTo("testValue"); + } + } + + @Test + void shouldConvertKey() { + final SinkRecord sinkRecord = + getSinkRecord(Schema.OPTIONAL_BYTES_SCHEMA, "testKey".getBytes(StandardCharsets.UTF_8), + Schema.BYTES_SCHEMA, "testValue".getBytes(StandardCharsets.UTF_8)); + + try (final Convert convert = new Key<>()) { + convert.configure(Map.of(CONVERTER_FIELD, StringConverter.class)); + final SinkRecord converted = convert.apply(sinkRecord); + this.softly.assertThat(converted.key()).isEqualTo("testKey"); + this.softly.assertThat(converted.value()).isEqualTo("testValue".getBytes(StandardCharsets.UTF_8)); + } + } + + @Test + void shouldRaiseExceptionIfSchemaRecordDoesNotHaveSchema() { + final SinkRecord sinkRecord = getSinkRecord(null, "testKey".getBytes(StandardCharsets.UTF_8), + null, "testValue".getBytes(StandardCharsets.UTF_8)); + try (final Convert convert = new Value<>()) { + convert.configure(Map.of(CONVERTER_FIELD, StringConverter.class)); + assertThatThrownBy(() -> convert.apply(sinkRecord)).isInstanceOf(ConnectException.class) + .hasMessage("Schema should not be null."); + } + } + + @Test + void shouldRaiseExceptionIfSchemaTypeIsNotBytes() { + final SinkRecord sinkRecord = getSinkRecord(null, "testKey".getBytes(StandardCharsets.UTF_8), + Schema.INT32_SCHEMA, "testValue".getBytes(StandardCharsets.UTF_8)); + try (final Convert convert = new Value<>()) { + convert.configure(Map.of(CONVERTER_FIELD, StringConverter.class)); + assertThatThrownBy(() -> convert.apply(sinkRecord)).isInstanceOf(ConnectException.class) + .hasMessage("Unsupported Schema " + Schema.INT32_SCHEMA); + } + } +} diff --git a/src/test/java/com/bakdata/kafka/DropFieldIntegrationTest.java b/src/test/java/com/bakdata/kafka/DropFieldIntegrationTest.java new file mode 100644 index 0000000..cf7ff5a --- /dev/null +++ b/src/test/java/com/bakdata/kafka/DropFieldIntegrationTest.java @@ -0,0 +1,155 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.singletonList; +import static net.mguenther.kafka.junit.EmbeddedConnectConfig.kafkaConnect; +import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; +import static net.mguenther.kafka.junit.SendKeyValues.to; +import static net.mguenther.kafka.junit.TopicConfig.withName; +import static net.mguenther.kafka.junit.Wait.delay; +import static org.assertj.core.api.Assertions.assertThat; + +import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; +import com.bakdata.test.smt.NestedObject; +import com.bakdata.test.smt.PrimitiveObject; +import com.bakdata.test.smt.RecordCollection; +import io.confluent.connect.avro.AvroConverter; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import net.mguenther.kafka.junit.EmbeddedKafkaCluster; +import net.mguenther.kafka.junit.KeyValue; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.storage.StringConverter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class DropFieldIntegrationTest { + private static final String EXCLUDE_PATH = "collections.complex_object.dropped_field"; + private static final String DROP_NESTED_FIELD = "DropField"; + private static final String TOPIC = "input"; + @RegisterExtension + final SchemaRegistryMockExtension schemaRegistryMock = new SchemaRegistryMockExtension(); + private EmbeddedKafkaCluster kafkaCluster; + private Path outputFile; + + private static RecordCollection createValue() { + final PrimitiveObject primitiveObject = PrimitiveObject.newBuilder() + .setDroppedField("This field will also be dropped.") + .setKeptField(1234) + .build(); + final NestedObject nestedObject = new NestedObject(primitiveObject, true); + + final PrimitiveObject primitiveObject2 = PrimitiveObject.newBuilder() + .setDroppedField("This field will also be dropped.") + .setKeptField(5678) + .build(); + final NestedObject nestedObject2 = new NestedObject(primitiveObject2, false); + return new RecordCollection(List.of(nestedObject, nestedObject2)); + } + + @BeforeEach + void setUp() throws IOException { + this.outputFile = Files.createTempFile("test", "temp"); + this.kafkaCluster = this.createCluster(); + this.kafkaCluster.start(); + this.kafkaCluster.createTopic(withName(TOPIC).build()); + } + + @AfterEach + void tearDown() throws IOException { + this.kafkaCluster.stop(); + Files.deleteIfExists(this.outputFile); + } + + @Test + void shouldDeleteNestedField() throws InterruptedException, IOException { + final RecordCollection value = createValue(); + + final List> records = singletonList(new KeyValue<>("k1", value)); + this.kafkaCluster.send(to(TOPIC, records) + .withAll(this.createProducerProperties()) + .build()); + + // makes sure that both records are processed + delay(2, TimeUnit.SECONDS); + final List output = Files.readAllLines(this.outputFile); + assertThat(output).containsExactly( + "Struct{collections=[Struct{complex_object=Struct{kept_field=1234},boolean_field=true}, " + + "Struct{complex_object=Struct{kept_field=5678},boolean_field=false}]}"); + } + + private EmbeddedKafkaCluster createCluster() { + return EmbeddedKafkaCluster.provisionWith( + newClusterConfig() + .configure( + kafkaConnect() + .deployConnector(this.config()) + .build()) + .build()); + } + + private Properties config() { + final Properties properties = new Properties(); + properties.put(ConnectorConfig.NAME_CONFIG, "test"); + properties.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "FileStreamSink"); + properties.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, AvroConverter.class.getName()); + properties.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG + "." + + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMock.getUrl()); + + // SMT config + properties.put(ConnectorConfig.TRANSFORMS_CONFIG, DROP_NESTED_FIELD); + properties.put(ConnectorConfig.TRANSFORMS_CONFIG + "." + DROP_NESTED_FIELD + ".type", + DropField.Value.class.getName()); + properties.put(ConnectorConfig.TRANSFORMS_CONFIG + "." + DROP_NESTED_FIELD + "." + DropField.EXCLUDE_FIELD, + EXCLUDE_PATH); + + properties.put(SinkConnector.TOPICS_CONFIG, TOPIC); + properties.put(FileStreamSinkConnector.FILE_CONFIG, this.outputFile.toString()); + return properties; + } + + private Properties createProducerProperties() { + final Properties properties = new Properties(); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList()); + properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMock.getUrl()); + return properties; + } +} diff --git a/src/test/java/com/bakdata/kafka/DropFieldTest.java b/src/test/java/com/bakdata/kafka/DropFieldTest.java new file mode 100644 index 0000000..7c2db86 --- /dev/null +++ b/src/test/java/com/bakdata/kafka/DropFieldTest.java @@ -0,0 +1,231 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static com.bakdata.kafka.DropField.EXCLUDE_FIELD; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.bakdata.kafka.DropField.Key; +import com.bakdata.kafka.DropField.Value; +import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; +import com.bakdata.test.smt.NestedObject; +import com.bakdata.test.smt.PrimitiveObject; +import com.bakdata.test.smt.RecordCollection; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.connect.avro.AvroConverter; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.storage.Converter; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; + +@ExtendWith(SoftAssertionsExtension.class) +class DropFieldTest { + private static final String TEST_TOPIC = "test-topic"; + private static final String MESSAGE = "This SMT can be applied only to records with a schema."; + private final ObjectMapper mapper = new ObjectMapper(); + @RegisterExtension + final SchemaRegistryMockExtension schemaRegistryMock = new SchemaRegistryMockExtension(); + @InjectSoftAssertions + private SoftAssertions softly; + + private static SinkRecord getSinkRecord(final Schema keySchema, final Object keyValue, final Schema valueSchema, + final Object valueValue) { + return new SinkRecord(TEST_TOPIC, 0, keySchema, keyValue, valueSchema, valueValue, 0); + } + + private static RecordCollection createComplexKey() { + final PrimitiveObject primitiveObject = PrimitiveObject.newBuilder() + .setDroppedField("This field will also be dropped.") + .setKeptField(1234) + .build(); + final NestedObject nestedObject = new NestedObject(primitiveObject, true); + + final PrimitiveObject primitiveObject2 = PrimitiveObject.newBuilder() + .setDroppedField("This field will also be dropped.") + .setKeptField(5678) + .build(); + final NestedObject nestedObject2 = new NestedObject(primitiveObject2, false); + return new RecordCollection(List.of(nestedObject, nestedObject2)); + } + + @Test + void shouldReturnInputRecordWhenValueIsNull() { + final SchemaAndValue schemaAndValue = this.getSchemaAndValue(false, null); + final SinkRecord sinkRecord = getSinkRecord(null, "testKey".getBytes(StandardCharsets.UTF_8), + schemaAndValue.schema(), + schemaAndValue.value()); + try (final DropField dropField = new Value<>()) { + dropField.configure(Map.of(EXCLUDE_FIELD, "some.random.field")); + final SinkRecord newRecord = dropField.apply(sinkRecord); + this.softly.assertThat(newRecord.key()).isEqualTo("testKey".getBytes(StandardCharsets.UTF_8)); + this.softly.assertThat(newRecord.value()).isNull(); + } + } + + @Test + void shouldReturnInputRecordWhenKeyIsNull() { + final SchemaAndValue schemaAndValue = this.getSchemaAndValue(true, null); + final SinkRecord sinkRecord = getSinkRecord(schemaAndValue.schema(), + schemaAndValue.value(), null, "testKey".getBytes(StandardCharsets.UTF_8)); + try (final DropField dropField = new Key<>()) { + dropField.configure(Map.of(EXCLUDE_FIELD, "some.random.field")); + final SinkRecord newRecord = dropField.apply(sinkRecord); + this.softly.assertThat(newRecord.key()).isNull(); + this.softly.assertThat(newRecord.value()).isEqualTo("testKey".getBytes(StandardCharsets.UTF_8)); + } + } + + @Test + void shouldDropFieldIfValueIsJsonString() throws JsonProcessingException { + final String value = this.createValue(); + final SinkRecord sinkRecord = getSinkRecord(null, "test", Schema.STRING_SCHEMA, value); + try (final DropField dropField = new Value<>()) { + dropField.configure(Map.of(EXCLUDE_FIELD, "lastName")); + final SinkRecord newRecord = dropField.apply(sinkRecord); + this.softly.assertThat(newRecord.key()).isEqualTo("test"); + this.softly.assertThat(newRecord.value()).isEqualTo("{\"firstName\":\"Jack\",\"age\":25}"); + } + } + + @Test + void shouldThrowExceptionWhenValueDoesNotHaveSchema() { + final PrimitiveObject keyObject = new PrimitiveObject("test", 1234); + final SchemaAndValue schemaAndValue = this.getSchemaAndValue(true, keyObject); + final SinkRecord sinkRecord = getSinkRecord(schemaAndValue.schema(), + schemaAndValue.value(), null, "testKey".getBytes(StandardCharsets.UTF_8)); + try (final DropField dropField = new Value<>()) { + dropField.configure(Map.of(EXCLUDE_FIELD, "some.random.field")); + assertThatThrownBy(() -> dropField.apply(sinkRecord)).isInstanceOf(ConnectException.class) + .hasMessage(MESSAGE); + } + } + + @Test + void shouldThrowExceptionWhenKeyDoesNotHaveSchema() { + final PrimitiveObject valueObject = new PrimitiveObject("test", 1234); + final SchemaAndValue schemaAndValue = this.getSchemaAndValue(false, valueObject); + final SinkRecord sinkRecord = getSinkRecord(null, "testKey".getBytes(StandardCharsets.UTF_8), + schemaAndValue.schema(), + schemaAndValue.value()); + try (final DropField dropField = new Key<>()) { + dropField.configure(Map.of(EXCLUDE_FIELD, "some.random.field")); + assertThatThrownBy(() -> dropField.apply(sinkRecord)) + .isInstanceOf(ConnectException.class) + .hasMessage(MESSAGE); + } + } + + @Test + void shouldDropNestedValueFromKey() { + final RecordCollection complexKey = createComplexKey(); + final SchemaAndValue schemaAndValue = this.getSchemaAndValue(true, complexKey); + final SinkRecord sinkRecord = getSinkRecord(schemaAndValue.schema(), + schemaAndValue.value(), null, "testKey".getBytes(StandardCharsets.UTF_8)); + try (final DropField dropField = new Key<>()) { + dropField.configure(Map.of(EXCLUDE_FIELD, "collections.complex_object.dropped_field")); + final SinkRecord newRecord = dropField.apply(sinkRecord); + this.softly.assertThat(newRecord.key()) + .isInstanceOfSatisfying(Struct.class, + newKey -> this.softly.assertThat(newKey.getArray("collections")) + .hasSize(2) + .satisfies(array -> { + this.softly.assertThat(array) + .first() + .isInstanceOfSatisfying(Struct.class, struct -> { + final Integer fieldValue = struct + .getStruct("complex_object") + .getInt32("kept_field"); + this.softly.assertThat(fieldValue).isEqualTo(1234); + this.softly.assertThat(struct.getBoolean("boolean_field")).isTrue(); + }); + this.softly.assertThat(array.get(1)) + .isInstanceOfSatisfying(Struct.class, struct -> { + final Integer fieldValue = struct + .getStruct("complex_object") + .getInt32("kept_field"); + this.softly.assertThat(fieldValue).isEqualTo(5678); + + this.softly.assertThat(struct.getBoolean("boolean_field")) + .isFalse(); + }); + }).allSatisfy(array -> this.softly.assertThat(array) + .isInstanceOfSatisfying(Struct.class, struct -> { + final Field field = struct + .getStruct("complex_object") + .schema() + .field("dropped_field"); + this.softly.assertThat(field).isNull(); + }))); + this.softly.assertThat(newRecord.value()).isEqualTo("testKey".getBytes(StandardCharsets.UTF_8)); + } + } + + private SchemaAndValue getSchemaAndValue(final boolean isKey, final T primitiveObject) { + final Map schemaRegistryUrlConfig = Map + .of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMock.getUrl()); + try (final Serializer serializer = new SpecificAvroSerializer<>()) { + final Converter avroConverter = new AvroConverter(); + avroConverter.configure(schemaRegistryUrlConfig, isKey); + serializer.configure(schemaRegistryUrlConfig, isKey); + final byte[] valueBytes = serializer.serialize(TEST_TOPIC, primitiveObject); + return avroConverter.toConnectData(TEST_TOPIC, valueBytes); + } + } + + private String createValue() throws JsonProcessingException { + final User user = new User("Jack", "Java", 25); + final JsonNode newNode = this.mapper.convertValue(user, JsonNode.class); + + return this.mapper.writeValueAsString(newNode); + } + + @Data + @AllArgsConstructor + private static class User { + private String firstName; + private String lastName; + private int age; + } +} diff --git a/src/test/java/com/bakdata/kafka/JsonFieldDropperTest.java b/src/test/java/com/bakdata/kafka/JsonFieldDropperTest.java new file mode 100644 index 0000000..92baddc --- /dev/null +++ b/src/test/java/com/bakdata/kafka/JsonFieldDropperTest.java @@ -0,0 +1,573 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +class JsonFieldDropperTest { + @InjectSoftAssertions + private SoftAssertions softly; + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "boolean_field": true,
+     *              "complex_field": {
+     *                  "kept_field": 1234
+     *              }
+     *          }
+     *     }
+     * 
+ * + * Exclude path: boolean_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "complex_field": {
+     *                  "kept_field": 1234
+     *              },
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropNotNestedField() { + final JsonFieldDropper computerStruct = JsonFieldDropper.createJsonFieldDropper(Path.split("boolean_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("boolean_field", JsonNodeFactory.instance.booleanNode(true)); + complexObject.set("complex_field", primitiveObject); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + this.softly.assertThat(newJson.get("complex_field").get("kept_field").intValue()).isEqualTo(1234); + this.softly.assertThat(newJson.get("boolean_field")).isNull(); + + } + + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "dropped_field": "This field should stay here",
+     *              "complex_field": {
+     *                  "dropped_field": "This field will be dropped",
+     *                  "kept_field": 1234
+     *              }
+     *          }
+     *     }
+     * 
+ * + * Exclude path: complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "dropped_field": "This field should stay here",
+     *              "complex_field": {
+     *                  "kept_field": 1234
+     *              },
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropCorrectFieldIfNamesAreDuplicate() { + final JsonFieldDropper computerStruct = + JsonFieldDropper.createJsonFieldDropper(Path.split("complex_field.dropped_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This field will be dropped")); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("dropped_field", JsonNodeFactory.instance.textNode("This field should stay here")); + complexObject.set("complex_field", primitiveObject); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + final JsonNode complexField = newJson.get("complex_field"); + this.softly.assertThat(complexField.get("dropped_field")).isNull(); + this.softly.assertThat(complexField.get("kept_field").intValue()).isEqualTo(1234); + this.softly.assertThat(newJson.get("dropped_field").textValue()).isEqualTo("This field should stay here"); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "complex_field": {
+     *                  "dropped_field": "This field will be dropped."
+     *                  "kept_field": 1234
+     *              },
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ * + * Exclude path: complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *             "complex_field": {
+     *                  "kept_field": 1234
+     *              },
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropNestedFieldInStruct() { + final JsonFieldDropper computerStruct = + JsonFieldDropper.createJsonFieldDropper(Path.split("complex_field.dropped_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This value will be dropped.")); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("complex_field", primitiveObject); + complexObject.set("boolean_field", JsonNodeFactory.instance.booleanNode(true)); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + final JsonNode complexField = newJson.get("complex_field"); + + this.softly.assertThat(complexField.size()).isEqualTo(1); + this.softly.assertThat(complexField.get("kept_field").intValue()).isEqualTo(1234); + this.softly.assertThat(complexField.get("dropped_field")).isNull(); + this.softly.assertThat(newJson.get("boolean_field").booleanValue()).isTrue(); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "complex_field": {
+     *                  "dropped_field": "This field will be dropped."
+     *              },
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ * + * Exclude path: complex_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropStruct() { + final JsonFieldDropper computerStruct = JsonFieldDropper.createJsonFieldDropper(Path.split("complex_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This field will be dropped")); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("complex_field", primitiveObject); + complexObject.set("boolean_field", JsonNodeFactory.instance.booleanNode(true)); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + this.softly.assertThat(newJson.get("complex_field")).isNull(); + this.softly.assertThat(newJson.get("boolean_field").booleanValue()).isTrue(); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "dropped_field": "This field will be dropped.",
+     *                  "kept_field": 1234
+     *                },
+     *                {
+     *                  "dropped_field": "This field will also be dropped.",
+     *                  "kept_field": 5678
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ * + * Exclude path: collections.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "kept_field": 1234
+     *                },
+     *                {
+     *                  "kept_field": 5678
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInArray() { + final JsonFieldDropper computerStruct = + JsonFieldDropper.createJsonFieldDropper(Path.split("collections.dropped_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This value will be dropped.")); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ObjectNode primitiveObject2 = JsonNodeFactory.instance.objectNode(); + primitiveObject2.set("dropped_field", JsonNodeFactory.instance.textNode("This field will also be dropped.")); + primitiveObject2.set("kept_field", JsonNodeFactory.instance.numberNode(5678)); + + final ArrayNode collections = JsonNodeFactory.instance.arrayNode(); + collections.add(primitiveObject); + collections.add(primitiveObject2); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("collections", collections); + complexObject.set("primitive_field", JsonNodeFactory.instance.numberNode(9876)); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + this.softly.assertThat(newJson.get("collections")) + .hasSize(2) + .isInstanceOfSatisfying(ArrayNode.class, array -> { + this.softly.assertThat(array) + .first() + .isInstanceOfSatisfying(ObjectNode.class, primitiveStruct -> { + final int keptField = primitiveStruct.get("kept_field").intValue(); + this.softly.assertThat(keptField).isEqualTo(1234); + } + ); + this.softly.assertThat(array.get(1)) + .isInstanceOfSatisfying(ObjectNode.class, primitiveStruct -> { + final int keptField = primitiveStruct.get("kept_field").intValue(); + this.softly.assertThat(keptField).isEqualTo(5678); + } + ); + this.softly.assertThat(array).allSatisfy( + arrayNode -> this.softly.assertThat(arrayNode) + .isInstanceOfSatisfying(ObjectNode.class, objectNode -> { + final JsonNode droppedField = objectNode.get("dropped_field"); + this.softly.assertThat(droppedField).isNull(); + } + ) + ); + } + ); + this.softly.assertThat(newJson.get("primitive_field").intValue()).isEqualTo(9876); + } + + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "dropped_field": "This field will be dropped.",
+     *                  "kept_field": 1234
+     *                },
+     *                {
+     *                  "dropped_field": "This field will also be dropped.",
+     *                  "kept_field": 5678
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ * + * Exclude path: collections + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropArray() { + final JsonFieldDropper computerStruct = JsonFieldDropper.createJsonFieldDropper(Path.split("collections")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This value will be dropped.")); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ObjectNode primitiveObject2 = JsonNodeFactory.instance.objectNode(); + primitiveObject2.set("dropped_field", JsonNodeFactory.instance.textNode("This field will also be dropped.")); + primitiveObject2.set("kept_field", JsonNodeFactory.instance.numberNode(5678)); + + final ArrayNode collections = JsonNodeFactory.instance.arrayNode(); + collections.add(primitiveObject); + collections.add(primitiveObject2); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("collections", collections); + complexObject.set("primitive_field", JsonNodeFactory.instance.numberNode(9876)); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + this.softly.assertThat(newJson.get("collections")).isNull(); + this.softly.assertThat(newJson.get("primitive_field").intValue()).isEqualTo(9876); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "complex_field": {
+     *                    "dropped_field": "This field will be dropped.",
+     *                    "kept_field": 1234
+     *                  },
+     *                  "boolean_field": true
+     *                },
+     *                {
+     *                  "complex_field": {
+     *                    "dropped_field": "This field will also be dropped.",
+     *                    "kept_field": 5678
+     *                  },
+     *                  "boolean_field": false
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ * + * Exclude path: collections.complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "complex_field": {
+     *                    "kept_field": 1234
+     *                  },
+     *                  "boolean_field": true
+     *                },
+     *                {
+     *                  "complex_field": {
+     *                    "kept_field": 5678
+     *                  },
+     *                  "boolean_field": false
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInStructArray() { + final JsonFieldDropper computerStruct = JsonFieldDropper + .createJsonFieldDropper(Path.split("collections.complex_field.dropped_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This value will be dropped.")); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ObjectNode primitiveObject2 = JsonNodeFactory.instance.objectNode(); + primitiveObject2.set("dropped_field", JsonNodeFactory.instance.textNode("This field will also be dropped.")); + primitiveObject2.set("kept_field", JsonNodeFactory.instance.numberNode(5678)); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("complex_field", primitiveObject); + complexObject.set("boolean_field", JsonNodeFactory.instance.booleanNode(true)); + + final ObjectNode complexObject2 = JsonNodeFactory.instance.objectNode(); + complexObject2.set("complex_field", primitiveObject2); + complexObject2.set("boolean_field", JsonNodeFactory.instance.booleanNode(false)); + + final ArrayNode collections = JsonNodeFactory.instance.arrayNode(); + collections.add(complexObject); + collections.add(complexObject2); + + final ObjectNode recordCollection = JsonNodeFactory.instance.objectNode(); + recordCollection.set("collections", collections); + recordCollection.set("primitive_field", JsonNodeFactory.instance.numberNode(9876)); + + final ObjectNode newJson = computerStruct.processObject(recordCollection); + + this.softly.assertThat(newJson.get("primitive_field").intValue()).isEqualTo(9876); + this.softly.assertThat(newJson.get("collections")) + .hasSize(2) + .isInstanceOfSatisfying(ArrayNode.class, array -> { + this.softly.assertThat(array) + .first() + .isInstanceOfSatisfying(ObjectNode.class, struct -> { + final int fieldValue = struct + .get("complex_field") + .get("kept_field").intValue(); + this.softly.assertThat(fieldValue).isEqualTo(1234); + this.softly.assertThat(struct.get("boolean_field").booleanValue()).isTrue(); + } + ); + this.softly.assertThat(array.get(1)) + .isInstanceOfSatisfying(ObjectNode.class, struct -> { + final int fieldValue = struct + .get("complex_field") + .get("kept_field").intValue(); + this.softly.assertThat(fieldValue).isEqualTo(5678); + this.softly.assertThat(struct.get("boolean_field").booleanValue()).isFalse(); + } + ); + this.softly.assertThat(array).allSatisfy(arrayNode -> { + final JsonNode field = arrayNode + .get("complex_field") + .get("dropped_field"); + this.softly.assertThat(field).isNull(); + }); + } + ); + this.softly.assertThat(newJson.get("primitive_field").intValue()).isEqualTo(9876); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *             "collections": [
+     *                 [
+     *                      "complex_field": {
+     *                         "dropped_field": "This field will also be dropped.",
+     *                         "kept_field": 1234
+     *                     }
+     *                ],
+     *                [
+     *                    "complex_field": {
+     *                         "dropped_field": "This field will also be dropped.",
+     *                         "kept_field": 5678
+     *                    }
+     *                ]
+     *             ]
+     *           }
+     *     }
+     * 
+ * + * Exclude path: collections.complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *             "collections": [
+     *                 [
+     *                     "complex_field": {
+     *                         "kept_field": 1234
+     *                     }
+     *                 ],
+     *                 [
+     *                    "complex_field": {
+     *                        "kept_field": 5678
+     *                    }
+     *                 ]
+     *             ]
+     *           }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInMultipleNestedArray() { + final JsonFieldDropper computerStruct = JsonFieldDropper + .createJsonFieldDropper(Path.split("collections.dropped_field")); + + final ObjectNode primitiveObject = JsonNodeFactory.instance.objectNode(); + primitiveObject.set("dropped_field", JsonNodeFactory.instance.textNode("This value will be dropped.")); + primitiveObject.set("kept_field", JsonNodeFactory.instance.numberNode(1234)); + + final ArrayNode deeperCollections = JsonNodeFactory.instance.arrayNode(); + deeperCollections.add(primitiveObject); + + final ArrayNode deeperCollections2 = JsonNodeFactory.instance.arrayNode(); + deeperCollections2.add(primitiveObject); + + final ArrayNode collection = JsonNodeFactory.instance.arrayNode(); + collection.add(deeperCollections); + collection.add(deeperCollections2); + + final ObjectNode complexObject = JsonNodeFactory.instance.objectNode(); + complexObject.set("collections", collection); + complexObject.set("primitive_field", JsonNodeFactory.instance.numberNode(9876)); + + final ObjectNode newJson = computerStruct.processObject(complexObject); + + final ArrayNode arrayNode = (ArrayNode) newJson.get("collections"); + this.softly.assertThat(arrayNode) + .hasSize(2) + .allSatisfy(innerArray -> { + this.softly.assertThat(innerArray) + .hasSize(1) + .first() + .satisfies(field -> this.softly.assertThat(field.get("dropped_field")).isNull()); + }); + } +} diff --git a/src/test/java/com/bakdata/kafka/PathTest.java b/src/test/java/com/bakdata/kafka/PathTest.java new file mode 100644 index 0000000..125278d --- /dev/null +++ b/src/test/java/com/bakdata/kafka/PathTest.java @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +class PathTest { + @InjectSoftAssertions + private SoftAssertions softly; + + @Test + void shouldBeIncluded() { + final Path path = Path.split("a.b.c"); + final Path otherPath = path.getSubPath("d"); + + this.softly.assertThat(otherPath.isIncluded(path)).isTrue(); + + final Path excludePath = Path.split("a.b.c"); + this.softly.assertThat(excludePath.isIncluded(path)).isFalse(); + } + + @Test + void shouldBePrefixOfPath() { + final Path path = Path.split("a.b.c"); + final Path otherPath = path.getSubPath("d"); + + this.softly.assertThat(path.isPrefix(otherPath)).isTrue(); + this.softly.assertThat(otherPath.isPrefix(path)).isFalse(); + } +} diff --git a/src/test/java/com/bakdata/kafka/StructFieldDropperTest.java b/src/test/java/com/bakdata/kafka/StructFieldDropperTest.java new file mode 100644 index 0000000..c2438bf --- /dev/null +++ b/src/test/java/com/bakdata/kafka/StructFieldDropperTest.java @@ -0,0 +1,926 @@ +/* + * MIT License + * + * Copyright (c) 2023 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +class StructFieldDropperTest { + @InjectSoftAssertions + private SoftAssertions softly; + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "boolean_field": true,
+     *              "complex_field": {
+     *                  "kept_field": 1234
+     *              }
+     *          }
+     *     }
+     * 
+ * + * Exclude path: boolean_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "complex_field": {
+     *                  "kept_field": 1234
+     *              },
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropNotNestedField() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("boolean_field")); + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveObject = new Struct(primitiveSchema); + primitiveObject.put("kept_field", 1234); + + final Schema complexSchema = SchemaBuilder + .struct() + .field("boolean_field", Schema.BOOLEAN_SCHEMA) + .field("complex_field", primitiveSchema) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("boolean_field", true); + complexObject.put("complex_field", primitiveObject); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + final Field complexField = newStruct.schema().field("complex_field"); + this.softly.assertThat(complexField).isNotNull(); + this.softly.assertThat(newStruct.getStruct("complex_field").getInt32("kept_field")).isEqualTo(1234); + this.softly.assertThat(newStruct.schema().field("boolean_field")).isNull(); + + } + + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "dropped_field": "This field should stay here",
+     *              "complex_field": {
+     *                  "dropped_field": "This field will be dropped",
+     *                  "kept_field": 1234
+     *              }
+     *          }
+     *     }
+     * 
+ * + * Exclude path: complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "dropped_field": "This field should stay here",
+     *              "complex_field": {
+     *                  "kept_field": 1234
+     *              },
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropCorrectFieldIfNamesAreDuplicate() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("complex_field.dropped_field")); + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveObject = new Struct(primitiveSchema); + primitiveObject.put("dropped_field", "This field will be dropped"); + primitiveObject.put("kept_field", 1234); + + final Schema complexSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("complex_field", primitiveSchema) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("dropped_field", "This field should stay here"); + complexObject.put("complex_field", primitiveObject); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + final Field complexField = newStruct.schema().field("complex_field"); + this.softly.assertThat(complexField).isNotNull(); + this.softly.assertThat(complexField.schema().field("dropped_field")).isNull(); + this.softly.assertThat(newStruct.getStruct("complex_field").getInt32("kept_field")).isEqualTo(1234); + this.softly.assertThat(newStruct.schema().field("dropped_field")).isNotNull(); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "complex_field": {
+     *                  "dropped_field": "This field will be dropped."
+     *                  "kept_field": 1234
+     *              },
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ * + * Exclude path: complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *             "complex_field": {
+     *                  "kept_field": 1234
+     *              },
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropNestedFieldInStruct() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("complex_field.dropped_field")); + final Schema innerSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct innerStruct = new Struct(innerSchema); + innerStruct.put("dropped_field", "This value will be dropped."); + innerStruct.put("kept_field", 1234); + + final Schema complexSchema = SchemaBuilder.struct() + .field("complex_field", innerSchema) + .field("boolean_field", Schema.BOOLEAN_SCHEMA) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("complex_field", innerStruct); + complexObject.put("boolean_field", true); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + final Struct complexField = newStruct.getStruct("complex_field"); + this.softly.assertThat(complexField.schema().fields()).hasSize(1); + this.softly.assertThat(complexField.schema().field("kept_field")).isNotNull(); + this.softly.assertThat(complexField.schema().field("dropped_field")).isNull(); + this.softly.assertThat(complexField.getInt32("kept_field")).isEqualTo(1234); + this.softly.assertThat(newStruct.get("boolean_field")).isEqualTo(true); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "complex_field": {
+     *                  "dropped_field": "This field will be dropped."
+     *              },
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ * + * Exclude path: complex_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "boolean_field": true
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropStruct() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("complex_field")); + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .build(); + + final Struct primitiveObject = new Struct(primitiveSchema); + primitiveObject.put("dropped_field", "This value will be dropped."); + + final Schema complexSchema = SchemaBuilder.struct() + .field("complex_field", primitiveSchema) + .field("boolean_field", Schema.BOOLEAN_SCHEMA) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("complex_field", primitiveObject); + complexObject.put("boolean_field", true); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + final Field complexField = newStruct.schema().field("complex_field"); + this.softly.assertThat(complexField).isNull(); + this.softly.assertThat(newStruct.getBoolean("boolean_field")).isTrue(); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "dropped_field": "This field will be dropped.",
+     *                  "kept_field": 1234
+     *                },
+     *                {
+     *                  "dropped_field": "This field will also be dropped.",
+     *                  "kept_field": 5678
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ * + * Exclude path: collections.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "kept_field": 1234
+     *                },
+     *                {
+     *                  "kept_field": 5678
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInArray() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("collections.dropped_field")); + + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveObject = new Struct(primitiveSchema); + primitiveObject.put("dropped_field", "This field will be dropped."); + primitiveObject.put("kept_field", 1234); + + final Struct primitiveObject2 = new Struct(primitiveSchema); + primitiveObject2.put("dropped_field", "This field will also be dropped."); + primitiveObject2.put("kept_field", 5678); + + final Schema collectionsSchema = SchemaBuilder + .array(primitiveSchema) + .build(); + final Schema complexSchema = SchemaBuilder + .struct() + .field("collections", collectionsSchema) + .field("primitive_field", Schema.INT32_SCHEMA) + .build(); + + final List structList = List.of(primitiveObject, primitiveObject2); + final Struct complexObject = new Struct(complexSchema); + complexObject.put("collections", structList); + complexObject.put("primitive_field", 9876); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + final Field collectionField = newStruct.schema().field("collections"); + this.softly.assertThat(collectionField).isNotNull(); + this.softly.assertThat(collectionField.schema().valueSchema().fields()).hasSize(1); + this.softly.assertThat(newStruct.getArray("collections")) + .hasSize(2) + .satisfies(array -> { + this.softly.assertThat(array) + .first() + .isInstanceOfSatisfying(Struct.class, primitiveStruct -> { + final Integer keptField = primitiveStruct.getInt32("kept_field"); + this.softly.assertThat(keptField).isEqualTo(1234); + final Field droppedField = primitiveStruct.schema().field("dropped_field"); + this.softly.assertThat(droppedField).isNull(); + }); + this.softly.assertThat(array.get(1)).isInstanceOfSatisfying(Struct.class, primitiveStruct -> { + final Integer keptField = primitiveStruct.getInt32("kept_field"); + this.softly.assertThat(keptField).isEqualTo(5678); + final Field droppedField = primitiveStruct.schema().field("dropped_field"); + this.softly.assertThat(droppedField).isNull(); + }); + }); + this.softly.assertThat(newStruct.get("primitive_field")).isEqualTo(9876); + } + + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "dropped_field": "This field will be dropped.",
+     *                  "kept_field": 1234
+     *                },
+     *                {
+     *                  "dropped_field": "This field will also be dropped.",
+     *                  "kept_field": 5678
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ * + * Exclude path: collections + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropArray() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("collections")); + + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveStruct = new Struct(primitiveSchema); + primitiveStruct.put("dropped_field", "This field will be dropped."); + primitiveStruct.put("kept_field", 1234); + + final Struct primitiveStruct2 = new Struct(primitiveSchema); + primitiveStruct2.put("dropped_field", "This field will also be dropped."); + primitiveStruct2.put("kept_field", 5678); + + final Schema collectionSchema = SchemaBuilder + .array(primitiveSchema) + .build(); + final Schema complexSchema = SchemaBuilder + .struct() + .field("collections", collectionSchema) + .field("primitive_field", Schema.INT32_SCHEMA) + .build(); + + final List structList = List.of(primitiveStruct, primitiveStruct2); + final Struct complexObject = new Struct(complexSchema); + complexObject.put("collections", structList); + complexObject.put("primitive_field", 9876); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + this.softly.assertThat(newStruct.schema().field("collections")).isNull(); + this.softly.assertThat(newStruct.get("primitive_field")).isEqualTo(9876); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "complex_field": {
+     *                    "dropped_field": "This field will be dropped.",
+     *                    "kept_field": 1234
+     *                  },
+     *                  "boolean_field": true
+     *                },
+     *                {
+     *                  "complex_field": {
+     *                    "dropped_field": "This field will also be dropped.",
+     *                    "kept_field": 5678
+     *                  },
+     *                  "boolean_field": false
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ * + * Exclude path: collections.complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "collections": [
+     *                {
+     *                  "complex_field": {
+     *                    "kept_field": 1234
+     *                  },
+     *                  "boolean_field": true
+     *                },
+     *                {
+     *                  "complex_field": {
+     *                    "kept_field": 5678
+     *                  },
+     *                  "boolean_field": false
+     *                }
+     *              ],
+     *              "primitive_field": 9876
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInStructArray() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("collections.complex_field.dropped_field")); + + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveStruct = new Struct(primitiveSchema); + primitiveStruct.put("dropped_field", "This field will be dropped."); + primitiveStruct.put("kept_field", 1234); + + final Struct primitiveStruct2 = new Struct(primitiveSchema); + primitiveStruct2.put("dropped_field", "This field will also be dropped."); + primitiveStruct2.put("kept_field", 5678); + + final Schema complexSchema = SchemaBuilder.struct() + .field("complex_field", primitiveSchema) + .field("boolean_field", Schema.BOOLEAN_SCHEMA) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("complex_field", primitiveStruct); + complexObject.put("boolean_field", true); + + final Struct complexObject2 = new Struct(complexSchema); + complexObject2.put("complex_field", primitiveStruct2); + complexObject2.put("boolean_field", false); + + final Schema collectionsSchema = SchemaBuilder + .array(complexSchema) + .build(); + + final Schema recordCollectionsSchema = SchemaBuilder + .struct() + .field("collections", collectionsSchema) + .field("primitive_field", Schema.INT32_SCHEMA) + .build(); + + final List structList = List.of(complexObject, complexObject2); + final Struct recordCollection = new Struct(recordCollectionsSchema); + recordCollection.put("collections", structList); + recordCollection.put("primitive_field", 9876); + + final Struct newStruct = computerStruct.updateStruct(recordCollection); + + final Field collectionField = newStruct.schema().field("collections"); + + this.softly.assertThat(newStruct.getInt32("primitive_field")).isEqualTo(9876); + this.softly.assertThat(collectionField).isNotNull(); + this.softly.assertThat(newStruct.getArray("collections")) + .hasSize(2) + .satisfies(array -> { + this.softly.assertThat(array) + .first() + .isInstanceOfSatisfying(Struct.class, struct -> { + final Integer fieldValue = struct + .getStruct("complex_field") + .getInt32("kept_field"); + this.softly.assertThat(fieldValue).isEqualTo(1234); + this.softly.assertThat(struct.getBoolean("boolean_field")).isTrue(); + } + ); + this.softly.assertThat(array.get(1)) + .isInstanceOfSatisfying(Struct.class, struct -> { + final Integer fieldValue = struct + .getStruct("complex_field") + .getInt32("kept_field"); + this.softly.assertThat(fieldValue).isEqualTo(5678); + this.softly.assertThat(struct.getBoolean("boolean_field")).isFalse(); + } + ); + } + ).allSatisfy(array -> this.softly.assertThat(array) + .isInstanceOfSatisfying(Struct.class, struct -> { + final Field field = struct + .getStruct("complex_field") + .schema() + .field("dropped_field"); + this.softly.assertThat(field).isNull(); + } + ) + ); + this.softly.assertThat(newStruct.get("primitive_field")).isEqualTo(9876); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *             "collections": [
+     *               {
+     *                 "deeper_collections": [
+     *                   {
+     *                      "complex_field": {
+     *                          "dropped_field": "This field will also be dropped.",
+     *                          "kept_field": 1234
+     *                      }
+     *                   }
+     *                 ]
+     *               },
+     *               {
+     *                 "deeper_collections": [
+     *                   {
+     *                      "complex_field": {
+     *                          "dropped_field": "This field will also be dropped.",
+     *                          "kept_field": 5678
+     *                      }
+     *                   }
+     *                 ]
+     *               }
+     *             ],
+     *             "primitive_field": true
+     *           }
+     *     }
+     * 
+ * + * Exclude path: collections.deeper_collections.complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *             "collections": [
+     *               {
+     *                 "deeper_collections": [
+     *                   {
+     *                      "complex_field": {
+     *                          "kept_field": 1234
+     *                      }
+     *                   }
+     *                 ]
+     *               },
+     *               {
+     *                 "deeper_collections": [
+     *                   {
+     *                      "complex_field": {
+     *                          "kept_field": 5678
+     *                      }
+     *                   }
+     *                 ]
+     *               }
+     *             ],
+     *             "primitive_field": true
+     *           }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInMultipleNestedStructArray() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper( + Path.split("collections.deeper_collections.complex_field.dropped_field")); + + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveStruct = new Struct(primitiveSchema); + primitiveStruct.put("dropped_field", "This field will be dropped."); + primitiveStruct.put("kept_field", 1234); + + final Struct primitiveStruct2 = new Struct(primitiveSchema); + primitiveStruct2.put("dropped_field", "This field will also be dropped."); + primitiveStruct2.put("kept_field", 5678); + + final Schema complexSchema = SchemaBuilder.struct() + .field("complex_field", primitiveSchema) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("complex_field", primitiveStruct); + + final Struct complexObject2 = new Struct(complexSchema); + complexObject2.put("complex_field", primitiveStruct2); + + final Schema deeperCollectionsSchema = SchemaBuilder + .array(complexSchema) + .build(); + + final Schema deeperCollectionsRecords = SchemaBuilder + .struct() + .field("deeper_collections", deeperCollectionsSchema) + .build(); + + final Struct deeperCollection1 = new Struct(deeperCollectionsRecords); // <- inner schema + deeperCollection1.put("deeper_collections", List.of(complexObject)); + + final Struct deeperCollection2 = new Struct(deeperCollectionsRecords); + deeperCollection2.put("deeper_collections", List.of(complexObject2)); + + final List deeperCollectionList = List.of(deeperCollection1, deeperCollection2); + + final Schema collectionsSchema = SchemaBuilder + .array(deeperCollectionsRecords) + .build(); + + final Schema recordCollectionsSchema = SchemaBuilder + .struct() + .field("collections", collectionsSchema) + .field("primitive_field", Schema.INT32_SCHEMA) + .build(); + + final Struct recordCollection = new Struct(recordCollectionsSchema); + recordCollection.put("collections", deeperCollectionList); + recordCollection.put("primitive_field", 9876); + + final Struct newStruct = computerStruct.updateStruct(recordCollection); + + final List newCollections = newStruct.getArray("collections"); + this.softly.assertThat(newCollections) + .hasSize(2) + .first() + .satisfies(collectionValues -> { + final List deeperCollections = collectionValues.getArray("deeper_collections"); + this.softly.assertThat(deeperCollections) + .hasSize(1) + .first() + .satisfies(deeperCollectionsValues -> { + final Integer keptFieldValue = deeperCollectionsValues + .getStruct("complex_field") + .getInt32("kept_field"); + this.softly.assertThat(keptFieldValue).isEqualTo(1234); + } + ); + } + ); + this.softly.assertThat(newCollections.get(1)) + .satisfies(collectionValues -> { + final List deeperCollections = collectionValues.getArray("deeper_collections"); + this.softly.assertThat(deeperCollections) + .hasSize(1) + .first() + .satisfies(deeperCollectionsValues -> { + final Integer keptFieldValue = deeperCollectionsValues + .getStruct("complex_field") + .getInt32("kept_field"); + this.softly.assertThat(keptFieldValue).isEqualTo(5678); + } + ); + } + ); + this.softly.assertThat(newStruct.schema().field("dropped_field")).isNull(); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *             "collections": [
+     *                 [
+     *                         "complex_field": {
+     *                         "dropped_field": "This field will also be dropped.",
+     *                         "kept_field": 1234
+     *                     }
+     *                ],
+     *                [
+     *                    "complex_field": {
+     *                         "dropped_field": "This field will also be dropped.",
+     *                         "kept_field": 5678
+     *                    }
+     *                ]
+     *             ]
+     *           }
+     *     }
+     * 
+ * + * Exclude path: collections.complex_field.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *             "collections": [
+     *                 [
+     *                     "complex_field": {
+     *                         "kept_field": 1234
+     *                     }
+     *                 ],
+     *                 [
+     *                    "complex_field": {
+     *                        "kept_field": 5678
+     *                    }
+     *                 ]
+     *             ]
+     *           }
+     *     }
+     * 
+ */ + @Test + void shouldDropFieldInMultipleNestedArrays() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("collections.complex_field.dropped_field")); + + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveStruct = new Struct(primitiveSchema); + primitiveStruct.put("dropped_field", "This field will be dropped."); + primitiveStruct.put("kept_field", 1234); + + final Struct primitiveStruct2 = new Struct(primitiveSchema); + primitiveStruct2.put("dropped_field", "This field will also be dropped."); + primitiveStruct2.put("kept_field", 5678); + + final Schema complexSchema = SchemaBuilder.struct() + .field("complex_field", primitiveSchema) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("complex_field", primitiveStruct); + + final Struct complexObject2 = new Struct(complexSchema); + complexObject2.put("complex_field", primitiveStruct2); + + final Schema deeperCollectionsSchema = SchemaBuilder + .array(complexSchema) + .build(); + + final List deeperCollectionList = List.of(complexObject); + final List deeperCollectionList2 = List.of(complexObject2); + + final Schema collectionsSchema = SchemaBuilder + .array(deeperCollectionsSchema) + .build(); + + final Schema recordCollectionsSchema = SchemaBuilder + .struct() + .field("collections", collectionsSchema) + .build(); + + final Struct recordCollection = new Struct(recordCollectionsSchema); + recordCollection.put("collections", List.of(deeperCollectionList, deeperCollectionList2)); + + final Struct newStruct = computerStruct.updateStruct(recordCollection); + + final List> newCollections = newStruct.getArray("collections"); + this.softly.assertThat(newCollections) + .hasSize(2) + .first() + .satisfies(collectionValues -> { + final int keptFieldValue = collectionValues + .get(0) + .getStruct("complex_field") + .getInt32("kept_field"); + this.softly.assertThat(keptFieldValue).isEqualTo(1234); + } + ); + this.softly.assertThat(newCollections.get(1)) + .satisfies(collectionValues -> { + final int keptFieldValue = collectionValues + .get(0) + .getStruct("complex_field") + .getInt32("kept_field"); + this.softly.assertThat(keptFieldValue).isEqualTo(5678); + } + ); + this.softly.assertThat(newStruct.schema().field("dropped_field")).isNull(); + } + + /** + * Before: + *
+     *     {@code
+     *          {
+     *              "map": "key1": {
+     *                  "dropped_field": "This field will be dropped.",
+     *                  "kept_field": 1234
+     *                }
+     *          }
+     *     }
+     * 
+ * + * Exclude path: map.dropped_field + *

+ * After: + *

+     *     {@code
+     *          {
+     *              "map": "key1": {
+     *                  "kept_field": 1234
+     *                }
+     *          }
+     *     }
+     * 
+ */ + @Test + void shouldDropMap() { + final StructFieldDropper computerStruct = + StructFieldDropper.createStructFieldDropper(Path.split("map.dropped_field")); + + final Schema primitiveSchema = SchemaBuilder + .struct() + .field("dropped_field", Schema.STRING_SCHEMA) + .field("kept_field", Schema.INT32_SCHEMA) + .build(); + + final Struct primitiveStruct = new Struct(primitiveSchema); + primitiveStruct.put("dropped_field", "This field will be dropped."); + primitiveStruct.put("kept_field", 1234); + + final Schema collectionSchema = SchemaBuilder + .map(Schema.STRING_SCHEMA, primitiveSchema) + .build(); + + final Schema complexSchema = SchemaBuilder + .struct() + .field("map", collectionSchema) + .build(); + + final Struct complexObject = new Struct(complexSchema); + complexObject.put("map", Map.of("key1", primitiveStruct)); + + final Struct newStruct = computerStruct.updateStruct(complexObject); + + final Map newStructMap = newStruct.getMap("map"); + final Struct struct = newStructMap.get("key1"); + this.softly.assertThat(struct.schema().field("dropped_field")).isNull(); + this.softly.assertThat(struct.get("kept_field")).isEqualTo(1234); + } +} diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml new file mode 100644 index 0000000..ff426ac --- /dev/null +++ b/src/test/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + +