Skip to content

Commit

Permalink
#17: Added latest import export common library (#15)
Browse files Browse the repository at this point in the history
Added a new version of the common library that includes Avro complex and logical types mapping and the missing SLF4J logging dependency.

Fixes #13.
Fixes #17.
  • Loading branch information
morazow authored Oct 30, 2020
1 parent 5fe35ce commit cde914f
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 14 deletions.
3 changes: 2 additions & 1 deletion doc/changes/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Releases

* [v0.1.0](changes_0.1.0.md)
* [0.2.0](changes_0.2.0.md)
* [0.1.0](changes_0.1.0.md)
27 changes: 27 additions & 0 deletions doc/changes/changes_0.2.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Kafka Connector Extension 0.2.0, released 2020-10-DD

## Summary

This release includes Avro Complex (Array, Map, Nested Record) and
Logical (BigDecimal, Date, Timestamp) data type mapping support. In
addition, it fixes a bug related to the missing logging dependency.

## Bug Fixes

* #13: Fixed logging issue (PR #15)

## Features

* #17: Added Support for Avro Complex and Logical Types (PR #15)

## Dependency Updates

### Test Dependency Updates

* Updated `org.mockito:mockito-core` from `3.5.10` to `3.6.0`.

### Plugin Updates

* Updated `com.github.cb372:sbt-explicit-dependencies` from `0.2.13` to `0.2.15`.
* Updated `org.wartremover:sbt-wartremover` from `2.4.10` to `2.4.12`.
* Updated `org.wartremover:sbt-wartremover-contib` from `1.3.8` to `1.3.10`.
72 changes: 72 additions & 0 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ import Avro formatted data from Kafka topics.

Using the connector you can import data from a Kafka topic into an Exasol table.

## Table of Contents

- [Getting Started](#getting-started)
- [Deployment](#deployment)
- [Prepare Exasol Table](#prepare-exasol-table)
- [Avro Data Mapping](#avro-data-mapping)
- [Import From Kafka Cluster](#import-from-kafka-cluster)
- [Secure Connection to Kafka Cluster](#secure-connection-to-kafka-cluster)
- [Kafka Consumer Properties](#kafka-consumer-properties)

## Getting Started

We assume you already have running Exasol cluster with version `6.0` or later
Expand Down Expand Up @@ -178,6 +188,66 @@ and record offset inside a partition:
- KAFKA_PARTITION DECIMAL(18,0)
- KAFKA_OFFSET DECIMAL(36, 0)

## Avro Data Mapping

[Avro][avro-spec] supports several primitive and complex type. The following
table shows how they are mapped to the Exasol types.

| Avro Data Type | Avro Logical Attribute | Recommended Exasol Column Types |
|:--------------:|:----------------------:|:-------------------------------:|
| boolean | | BOOLEAN |
| int | | INT, INTEGER, DECIMAL(18, 0) |
| int | date | DATE |
| long | | BIGINT, DECIMAL(36, 0) |
| long | timestamp-millis | TIMESTAMP |
| long | timestamp-micros | TIMESTAMP |
| float | | FLOAT |
| double | | DOUBLE, DOUBLE PRECISION |
| bytes | | VARCHAR(n), CHAR(n) |
| bytes | decimal(p, s) | DECIMAL(p, s) |
| fixed | | VARCHAR(n), CHAR(n) |
| fixed | decimal(p, s) | DECIMAL(p, s) |
| string | | VARCHAR(n), CHAR(n) |
| enum | | VARCHAR(n), CHAR(n) |
| union | | Corresponding Non Null Type |
| array | | VARCHAR(n), CHAR(n) |
| map | | VARCHAR(n), CHAR(n) |
| record | | VARCHAR(n), CHAR(n) |

You can also enrich regular Avro types with logical type attributes, and use the
suggested [Exasol column types][exasol-types] when preparing the table.

For example, given the following Avro record schema,

```json
{
"type": "record",
"name": "KafkaExasolAvroRecord",
"fields": [
{ "name": "product", "type": "string" },
{ "name": "price", "type": { "type": "bytes", "precision": 4, "scale": 2, "logicalType": "decimal" }}
{ "name": "sale_time", "type": { "type": "long", "logicalType": "timestamp-millis" }}
]
}
```

you can define the following Exasol table with column types mapped respectively.

```sql
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
PRODUCT VARCHAR(500),
PRICE DECIMAL(4, 2),
SALE_TIME TIMESTAMP,

KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0)
);
```

Please notice that we convert Avro complex types to the JSON Strings. Use Exasol
`VARCHAR(n)` column type to store them. Depending on the size of complex type,
set the number of characters in the VARCHAR type accordingly.

## Import From Kafka Cluster

Several property values are required to access the Kafka
Expand Down Expand Up @@ -389,3 +459,5 @@ Kafka clusters.
[kafka-security]: https://kafka.apache.org/documentation/#security
[kafka-secure-clients]: https://kafka.apache.org/documentation/#security_configclients
[kafka-consumer-configs]: https://kafka.apache.org/documentation/#consumerconfigs
[avro-spec]: https://avro.apache.org/docs/current/spec.html
[exasol-types]: https://docs.exasol.com/sql_references/data_types/datatypesoverview.htm
11 changes: 2 additions & 9 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ import sbt.librarymanagement.InclExclRule
object Dependencies {

// Runtime dependencies versions
private val ExasolVersion = "6.1.7"
private val ImportExportUDFVersion = "0.1.0"
private val ImportExportUDFVersion = "0.2.0"
private val KafkaClientsVersion = "2.5.0"
private val KafkaAvroSerializerVersion = "5.4.0"
private val TypesafeLoggingVersion = "3.9.2"

// Test dependencies versions
private val ScalaTestVersion = "3.2.2"
private val ScalaTestPlusVersion = "1.0.0-M2"
private val MockitoCoreVersion = "3.5.10"
private val MockitoCoreVersion = "3.6.0"
private val KafkaSchemaRegistryVersion = "5.4.0"

val Resolvers: Seq[Resolver] = Seq(
Expand All @@ -25,12 +23,7 @@ object Dependencies {
)

lazy val RuntimeDependencies: Seq[ModuleID] = Seq(
"com.exasol" % "exasol-script-api" % ExasolVersion,
"com.exasol" %% "import-export-udf-common-scala" % ImportExportUDFVersion,
"com.typesafe.scala-logging" %% "scala-logging" % TypesafeLoggingVersion
exclude ("org.slf4j", "slf4j-api")
exclude ("org.scala-lang", "scala-library")
exclude ("org.scala-lang", "scala-reflect"),
"org.apache.kafka" % "kafka-clients" % KafkaClientsVersion,
"io.confluent" % "kafka-avro-serializer" % KafkaAvroSerializerVersion
exclude ("org.slf4j", "slf4j-api")
Expand Down
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Adds a `wartremover` a flexible Scala code linting tool
// http://github.com/puffnfresh/wartremover
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.10")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.12")

// Adds Contrib Warts
// http://github.com/wartremover/wartremover-contrib/
addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.8")
addSbtPlugin("org.wartremover" % "sbt-wartremover-contrib" % "1.3.10")

// Adds a `assembly` task to create a fat JAR with all of its
// dependencies
Expand Down Expand Up @@ -47,7 +47,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")

// Adds a `sbt-explicit-dependencies` plugin
// https://github.com/cb372/sbt-explicit-dependencies
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.13")
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.15")

// Setup this and project/project/plugins.sbt for formatting
// project/*.scala files with scalafmt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import scala.collection.JavaConverters._
import com.exasol.ExaImportSpecification
import com.exasol.ExaMetadata

import com.typesafe.scalalogging.LazyLogging

/**
* This object is referenced from the UDF script that is called from the
* Exasol's {@code IMPORT FROM SCRIPT} SQL statement.
*
* It returns an SQL query that is run to import data from a Kafka
* topic.
*/
object KafkaConsumerQueryGenerator {
object KafkaConsumerQueryGenerator extends LazyLogging {

/**
* An entry point for the Exasol IMPORT FROM SCRIPT user-defined
Expand All @@ -35,6 +37,7 @@ object KafkaConsumerQueryGenerator {
"from multiple topics is not supported."
)
}
logger.info(s"Generating a SQL query to import from '$topic'.")
val kvPairs = kafkaProperties.mkString()
val scriptSchema = metadata.getScriptSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object KafkaTopicMetadataReader extends LazyLogging {

val kafkaConsumer = KafkaConsumerFactory(kafkaProperties, metadata)
val topicPartitions = kafkaConsumer.partitionsFor(topic).asScala.toList.map(_.partition())
logger.info(s"Reading metadata for '${topicPartitions.mkString(",")}' topic partitions")
try {
topicPartitions.foreach { partitionId =>
val offset: JLong = seenPartitionOffsets.getOrElse(partitionId, -1L)
Expand Down

0 comments on commit cde914f

Please sign in to comment.