Skip to content

Latest commit

 

History

History
176 lines (151 loc) · 8.2 KB

File metadata and controls

176 lines (151 loc) · 8.2 KB

Quality Gate Status Coverage Security Rating Vulnerabilities Maintainability Rating Bugs Reliability Rating

kafka-connect-transform-tojsonstring - A simple Record to JSON String SMT

This is a very simple Kafka Connect SMT which takes the entire key or value record and transforms it to a new record which contains exactly one field with a JSON representation of the origin record.

Blog Post describing how we ended up developing this SMT can be found here

Use Cases

The reason why this SMT was built is the known limitation of the JDBC Sink Connector tohandle nested arrays. If you have schema which contains arrays you cannot really use the JDBC Sink Connector because this connector only supports primitive Data Types. But sometimes you just need also some arrays from the schema in the RDBMS. If your RDBMS is able to handle JSON Strings this SMT might be the saviour. You can use it to transform the whole record into a single JSON String which can be mapped by the JDBC Sink connector. Afterwards you can use the tools offered by the RDBMS to parse and process the JSON String.

But for sure there are also other use cases out there where this SMT might be helpful.

Restrictions

This SMT was built to transform Records with a Schema to a new Record with a Schema but with only one field. So this SMT does not work for Schemaless records.

It also was only tested with Avro Schemas backed by Confluent Schema Registry (but most likely will work for other schema variants too because the deserializer already converted the record to a Connect Record before so it shouldn't be schema specific)

Configuration

{
  ////
  "transforms": "tojson",
  "transforms.tojson.type": "com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter$Value",
  "transforms.tojson.json.string.field.name" : "myawesomejsonstring", // Optional 
  "transforms.tojson.post.processing.to.xml" : false, // Optional 
  "transforms.tojson.json.writer.handle.logical.types" : true, // Optional 
  "transforms.tojson.json.writer.datetime.logical.types.as" : "STRING", // Optional 
  "transforms.tojson.json.writer.datetime.pattern" : "", // Optional   
  "transforms.tojson.json.writer.datetime.zoneid" : "UTC" // Optional   
  ////
}

Parameters

ISO_INSTANT
NameDescriptionTypeDefaultValid ValuesImportance
json.string.field.name Output schema field name of field that contains the JSON Stringstringnon-empty stringhigh
json.writer.output.mode Output mode of the BSON Json WriterstringRELAXEDRELAXED, EXTENDED, STRICT or SHELLhigh
post.processing.to.xml Post Process JSON to XML. Some old RBDMS like Oracle 11 are not the best in handling JSON - for such scenarios this option can be used to transform the generated JSON into a schemaless XML String booleanfalsetrue/falsehigh
json.writer.handle.logical.types In BSON serialization, logical types (dates, times, timestamps, decimal, bytes) are embedded inside a $ field. Setting this configuration to true will remove the embeddings and add the value to the parent field. booleanfalsetrue/falsehigh
json.writer.datetime.logical.types.as Write the logical type field (of time, date or timestamp) either as a STRING or a LONG (epoc) value, only applicable if json.writer.handle.logical.types=true stringLONGLONG/STRINGhigh
json.writer.datetime.pattern The pattern (either a predefined constant or pattern letters) to use to format the date/time or timestamp as string, only applicable if json.writer.datetime.logical.types.as=STRING stringISO_DATE,ISO_DATE_TIME,ISO_INSTANT,ISO_TIME,ISO_LOCAL_DATE,ISO_LOCAL_DATE_TIME,ISO_LOCAL_TIME,RFC_1123_DATE_TIME,ISO_ZONED_DATE_TIME,ISO_OFFSET_DATE,ISO_OFFSET_DATE_TIME,ISO_OFFSET_TIME,BASIC_ISO_DATE,ISO_ORDINAL_DATE,ISO_WEEK_DATE,"pattern"high
json.writer.datetime.zoneid The ZoneId to use to format the date/time or timestamp as string, only applicable if json.writer.datetime.logical.types.as=STRING stringUTCa valid ZoneId string, such as Europe/Zurich, CET or UTChigh

Example

Input
  • Schema (avro syntax)
{
	"type": "record",
	"name": "MyEntity",
	"fields": [{
		"name": "id",
		"type": "string"
	},
	{
		"name": "name",
		"type": "string"
	},
	{
		"name": "subElements",
		"type": {
			"type": "array",
			"items": {
				"type": "record",
				"name": "element",
				"fields": [{
					"name": "id",
					"type": "string",
					
				}]
			}
		}
	}]
}
  • Value
-id:myobject
-name:awesomename
-subElements:
  -id:element1
  -id:element2 
Output
  • Schema
{
	"type": "record",
	"name": "jsonStringSchema",
	"fields": [{
		"name": "jsonstring",
		"type": "string"
	}]
}
  • Value (of the schema field "jsonstring")
{
	"id": "record",
	"name": "jsonStringSchema",
	"subElements": [{"id": "element1"},
                    {"id": "element2"}]
	}]
} 

Build, installation / deployment

You can build this project from sources via Maven.

Or download a pre-build release from Releases

Thanks and Acknowledgement

Basic structure of how to build a basic SMT was taken from kafka-connect-insert-uuid

Logic for transforming a Connect Record into a Json Document is build up on the awesome converter implemented in kafka-connect-mongodb which safed me a lot of time and nerves :)

License Information

This project is licensed according to Apache License Version 2.0

Copyright (c) 2021. Christian Edelsbrunner ([email protected]) 

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.