diff --git a/doc/changes/changelog.md b/doc/changes/changelog.md index 5b071be..d84800d 100644 --- a/doc/changes/changelog.md +++ b/doc/changes/changelog.md @@ -1,3 +1,4 @@ # Releases -* [v0.1.0](changes_0.1.0.md) +* [0.2.0](changes_0.2.0.md) +* [0.1.0](changes_0.1.0.md) diff --git a/doc/changes/changes_0.2.0.md b/doc/changes/changes_0.2.0.md new file mode 100644 index 0000000..de33ad1 --- /dev/null +++ b/doc/changes/changes_0.2.0.md @@ -0,0 +1,27 @@ +# Kafka Connector Extension 0.2.0, released 2020-10-DD + +## Summary + +This release includes Avro Complex (Array, Map, Nested Record) and +Logical (BigDecimal, Date, Timestamp) data type mapping support. In +addition, it fixes a bug related to the missing logging dependency. + +## Bug Fixes + +* #13: Fixed logging issue (PR #15) + +## Features + +* #17: Added Support for Avro Complex and Logical Types (PR #15) + +## Dependency Updates + +### Test Dependency Updates + +* Updated `org.mockito:mockito-core` from `3.5.10` to `3.6.0`. + +### Plugin Updates + +* Updated `com.github.cb372:sbt-explicit-dependencies` from `0.2.13` to `0.2.15`. +* Updated `org.wartremover:sbt-wartremover` from `2.4.10` to `2.4.12`. +* Updated `org.wartremover:sbt-wartremover-contib` from `1.3.8` to `1.3.10`. diff --git a/doc/user_guide/user_guide.md b/doc/user_guide/user_guide.md index fc30239..d78b3d4 100644 --- a/doc/user_guide/user_guide.md +++ b/doc/user_guide/user_guide.md @@ -5,6 +5,16 @@ import Avro formatted data from Kafka topics. Using the connector you can import data from a Kafka topic into an Exasol table. +## Table of Contents + +- [Getting Started](#getting-started) +- [Deployment](#deployment) +- [Prepare Exasol Table](#prepare-exasol-table) +- [Avro Data Mapping](#avro-data-mapping) +- [Import From Kafka Cluster](#import-from-kafka-cluster) +- [Secure Connection to Kafka Cluster](#secure-connection-to-kafka-cluster) +- [Kafka Consumer Properties](#kafka-consumer-properties) + ## Getting Started We assume you already have running Exasol cluster with version `6.0` or later @@ -178,6 +188,66 @@ and record offset inside a partition: - KAFKA_PARTITION DECIMAL(18,0) - KAFKA_OFFSET DECIMAL(36, 0) +## Avro Data Mapping + +[Avro][avro-spec] supports several primitive and complex type. The following +table shows how they are mapped to the Exasol types. + +| Avro Data Type | Avro Logical Attribute | Recommended Exasol Column Types | +|:--------------:|:----------------------:|:-------------------------------:| +| boolean | | BOOLEAN | +| int | | INT, INTEGER, DECIMAL(18, 0) | +| int | date | DATE | +| long | | BIGINT, DECIMAL(36, 0) | +| long | timestamp-millis | TIMESTAMP | +| long | timestamp-micros | TIMESTAMP | +| float | | FLOAT | +| double | | DOUBLE, DOUBLE PRECISION | +| bytes | | VARCHAR(n), CHAR(n) | +| bytes | decimal(p, s) | DECIMAL(p, s) | +| fixed | | VARCHAR(n), CHAR(n) | +| fixed | decimal(p, s) | DECIMAL(p, s) | +| string | | VARCHAR(n), CHAR(n) | +| enum | | VARCHAR(n), CHAR(n) | +| union | | Corresponding Non Null Type | +| array | | VARCHAR(n), CHAR(n) | +| map | | VARCHAR(n), CHAR(n) | +| record | | VARCHAR(n), CHAR(n) | + +You can also enrich regular Avro types with logical type attributes, and use the +suggested [Exasol column types][exasol-types] when preparing the table. + +For example, given the following Avro record schema, + +```json +{ + "type": "record", + "name": "KafkaExasolAvroRecord", + "fields": [ + { "name": "product", "type": "string" }, + { "name": "price", "type": { "type": "bytes", "precision": 4, "scale": 2, "logicalType": "decimal" }} + { "name": "sale_time", "type": { "type": "long", "logicalType": "timestamp-millis" }} + ] +} +``` + +you can define the following Exasol table with column types mapped respectively. + +```sql +CREATE OR REPLACE TABLE . ( + PRODUCT VARCHAR(500), + PRICE DECIMAL(4, 2), + SALE_TIME TIMESTAMP, + + KAFKA_PARTITION DECIMAL(18, 0), + KAFKA_OFFSET DECIMAL(36, 0) +); +``` + +Please notice that we convert Avro complex types to the JSON Strings. Use Exasol +`VARCHAR(n)` column type to store them. Depending on the size of complex type, +set the number of characters in the VARCHAR type accordingly. + ## Import From Kafka Cluster Several property values are required to access the Kafka @@ -389,3 +459,5 @@ Kafka clusters. [kafka-security]: https://kafka.apache.org/documentation/#security [kafka-secure-clients]: https://kafka.apache.org/documentation/#security_configclients [kafka-consumer-configs]: https://kafka.apache.org/documentation/#consumerconfigs +[avro-spec]: https://avro.apache.org/docs/current/spec.html +[exasol-types]: https://docs.exasol.com/sql_references/data_types/datatypesoverview.htm diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f38f536..becc679 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,16 +7,14 @@ import sbt.librarymanagement.InclExclRule object Dependencies { // Runtime dependencies versions - private val ExasolVersion = "6.1.7" - private val ImportExportUDFVersion = "0.1.0" + private val ImportExportUDFVersion = "0.2.0" private val KafkaClientsVersion = "2.5.0" private val KafkaAvroSerializerVersion = "5.4.0" - private val TypesafeLoggingVersion = "3.9.2" // Test dependencies versions private val ScalaTestVersion = "3.2.2" private val ScalaTestPlusVersion = "1.0.0-M2" - private val MockitoCoreVersion = "3.5.10" + private val MockitoCoreVersion = "3.6.0" private val KafkaSchemaRegistryVersion = "5.4.0" val Resolvers: Seq[Resolver] = Seq( @@ -25,12 +23,7 @@ object Dependencies { ) lazy val RuntimeDependencies: Seq[ModuleID] = Seq( - "com.exasol" % "exasol-script-api" % ExasolVersion, "com.exasol" %% "import-export-udf-common-scala" % ImportExportUDFVersion, - "com.typesafe.scala-logging" %% "scala-logging" % TypesafeLoggingVersion - exclude ("org.slf4j", "slf4j-api") - exclude ("org.scala-lang", "scala-library") - exclude ("org.scala-lang", "scala-reflect"), "org.apache.kafka" % "kafka-clients" % KafkaClientsVersion, "io.confluent" % "kafka-avro-serializer" % KafkaAvroSerializerVersion exclude ("org.slf4j", "slf4j-api") diff --git a/project/plugins.sbt b/project/plugins.sbt index 957e429..fa316c3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,10 @@ // Adds a `wartremover` a flexible Scala code linting tool // http://github.com/puffnfresh/wartremover -addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.10") +addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.12") // Adds Contrib Warts // http://github.com/wartremover/wartremover-contrib/ -addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.8") +addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.10") // Adds a `assembly` task to create a fat JAR with all of its // dependencies @@ -47,7 +47,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0") // Adds a `sbt-explicit-dependencies` plugin // https://github.com/cb372/sbt-explicit-dependencies -addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.13") +addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.15") // Setup this and project/project/plugins.sbt for formatting // project/*.scala files with scalafmt diff --git a/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerQueryGenerator.scala b/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerQueryGenerator.scala index 4a4134e..b437e1f 100644 --- a/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerQueryGenerator.scala +++ b/src/main/scala/com/exasol/cloudetl/kafka/KafkaConsumerQueryGenerator.scala @@ -5,6 +5,8 @@ import scala.collection.JavaConverters._ import com.exasol.ExaImportSpecification import com.exasol.ExaMetadata +import com.typesafe.scalalogging.LazyLogging + /** * This object is referenced from the UDF script that is called from the * Exasol's {@code IMPORT FROM SCRIPT} SQL statement. @@ -12,7 +14,7 @@ import com.exasol.ExaMetadata * It returns an SQL query that is run to import data from a Kafka * topic. */ -object KafkaConsumerQueryGenerator { +object KafkaConsumerQueryGenerator extends LazyLogging { /** * An entry point for the Exasol IMPORT FROM SCRIPT user-defined @@ -35,6 +37,7 @@ object KafkaConsumerQueryGenerator { "from multiple topics is not supported." ) } + logger.info(s"Generating a SQL query to import from '$topic'.") val kvPairs = kafkaProperties.mkString() val scriptSchema = metadata.getScriptSchema diff --git a/src/main/scala/com/exasol/cloudetl/kafka/KafkaTopicMetadataReader.scala b/src/main/scala/com/exasol/cloudetl/kafka/KafkaTopicMetadataReader.scala index 7e5790c..313f344 100644 --- a/src/main/scala/com/exasol/cloudetl/kafka/KafkaTopicMetadataReader.scala +++ b/src/main/scala/com/exasol/cloudetl/kafka/KafkaTopicMetadataReader.scala @@ -34,6 +34,7 @@ object KafkaTopicMetadataReader extends LazyLogging { val kafkaConsumer = KafkaConsumerFactory(kafkaProperties, metadata) val topicPartitions = kafkaConsumer.partitionsFor(topic).asScala.toList.map(_.partition()) + logger.info(s"Reading metadata for '${topicPartitions.mkString(",")}' topic partitions") try { topicPartitions.foreach { partitionId => val offset: JLong = seenPartitionOffsets.getOrElse(partitionId, -1L)