This is a very simple Kafka Connect SMT which takes the entire key or value record and transforms it to a new record which contains exactly one field with a JSON representation of the origin record.
Blog Post describing how we ended up developing this SMT can be found here
The reason why this SMT was built is the known limitation of the JDBC Sink Connector tohandle nested arrays. If you have schema which contains arrays you cannot really use the JDBC Sink Connector because this connector only supports primitive Data Types. But sometimes you just need also some arrays from the schema in the RDBMS. If your RDBMS is able to handle JSON Strings this SMT might be the saviour. You can use it to transform the whole record into a single JSON String which can be mapped by the JDBC Sink connector. Afterwards you can use the tools offered by the RDBMS to parse and process the JSON String.
But for sure there are also other use cases out there where this SMT might be helpful.
This SMT was built to transform Records with a Schema to a new Record with a Schema but with only one field. So this SMT does not work for Schemaless records.
It also was only tested with Avro Schemas backed by Confluent Schema Registry (but most likely will work for other schema variants too because the deserializer already converted the record to a Connect Record before so it shouldn't be schema specific)
{
////
"transforms": "tojson",
"transforms.tojson.type": "com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter$Value",
"transforms.tojson.json.string.field.name" : "myawesomejsonstring", // Optional
"transforms.tojson.post.processing.to.xml" : false, // Optional
"transforms.tojson.json.writer.handle.logical.types" : true, // Optional
"transforms.tojson.json.writer.datetime.logical.types.as" : "STRING", // Optional
"transforms.tojson.json.writer.datetime.pattern" : "", // Optional
"transforms.tojson.json.writer.datetime.zoneid" : "UTC" // Optional
////
}
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
json.string.field.name | Output schema field name of field that contains the JSON String | string | non-empty string | high | |
json.writer.output.mode | Output mode of the BSON Json Writer | string | RELAXED | RELAXED, EXTENDED, STRICT or SHELL | high |
post.processing.to.xml | Post Process JSON to XML. Some old RBDMS like Oracle 11 are not the best in handling JSON - for such scenarios this option can be used to transform the generated JSON into a schemaless XML String | boolean | false | true/false | high |
json.writer.handle.logical.types | In BSON serialization, logical types (dates, times, timestamps, decimal, bytes) are embedded inside a $ field. Setting this configuration to true will remove the embeddings and add the value to the parent field. | boolean | false | true/false | high |
json.writer.datetime.logical.types.as | Write the logical type field (of time, date or timestamp) either as a STRING or a LONG (epoc) value, only applicable if json.writer.handle.logical.types=true | string | LONG | LONG/STRING | high |
json.writer.datetime.pattern | The pattern (either a predefined constant or pattern letters) to use to format the date/time or timestamp as string, only applicable if json.writer.datetime.logical.types.as=STRING | string | ISO_DATE,ISO_DATE_TIME,ISO_INSTANT,ISO_TIME,ISO_LOCAL_DATE,ISO_LOCAL_DATE_TIME,ISO_LOCAL_TIME,RFC_1123_DATE_TIME,ISO_ZONED_DATE_TIME,ISO_OFFSET_DATE,ISO_OFFSET_DATE_TIME,ISO_OFFSET_TIME,BASIC_ISO_DATE,ISO_ORDINAL_DATE,ISO_WEEK_DATE,"pattern" | high | |
json.writer.datetime.zoneid | The ZoneId to use to format the date/time or timestamp as string, only applicable if json.writer.datetime.logical.types.as=STRING | string | UTC | a valid ZoneId string, such as Europe/Zurich, CET or UTC | high |
- Schema (avro syntax)
{
"type": "record",
"name": "MyEntity",
"fields": [{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "subElements",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "element",
"fields": [{
"name": "id",
"type": "string",
}]
}
}
}]
}
- Value
-id:myobject
-name:awesomename
-subElements:
-id:element1
-id:element2
- Schema
{
"type": "record",
"name": "jsonStringSchema",
"fields": [{
"name": "jsonstring",
"type": "string"
}]
}
- Value (of the schema field "jsonstring")
{
"id": "record",
"name": "jsonStringSchema",
"subElements": [{"id": "element1"},
{"id": "element2"}]
}]
}
You can build this project from sources via Maven.
Or download a pre-build release from Releases
Basic structure of how to build a basic SMT was taken from kafka-connect-insert-uuid
Logic for transforming a Connect Record into a Json Document is build up on the awesome converter implemented in kafka-connect-mongodb which safed me a lot of time and nerves :)
This project is licensed according to Apache License Version 2.0
Copyright (c) 2021. Christian Edelsbrunner ([email protected])
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.