Exasol Kafka Connector Extension allows you to connect to Apache Kafka and import Apache Avro, JSON or String formatted data from Kafka topics.
Using the connector you can import data from a Kafka topic into an Exasol table.
- Getting Started
- Deployment
- Record Format Configuration
- Preparing Exasol Table
- Avro Data Mapping
- JSON Data Mapping
- Importing Records
- Secure Connection to Kafka Cluster
- Importing Data From Azure Event Hubs
- Kafka Consumer Properties
We assume you already have running Exasol cluster with version 6.0
or later
and Apache Kafka cluster with version 2.0.0
or later.
If you use the Confluent Kafka distribution, it should have the version 5.0.0
or later.
The connector accesses Kafka topic data in parallel. It starts parallel running importer processes that are defined by the Kafka topic partitions.
That is, when importing data from a Kafka topic, we will be importing from each topic partition in parallel. Therefore, it is recommended to configure your Kafka topics with several partitions.
Kafka connector requires a Schema Registry when importing Avro messages that are serialized with a Confluent Schema Registry on the producer side.
Schema Registry is used to store, serve and manage Avro schemas for each Kafka topic. Thus, it allows you to obtain the latest schema for a given Kafka topic. As a result, you should set it up together with a Kafka cluster when you are importing Avro records.
This section describes how to deploy and prepare the user-defined functions (UDFs) for Kafka integration.
Please download and save the latest assembled (with all dependencies included) jar file from the Github Releases.
Please ensure that the SHA256 sum of the downloaded jar is the same as the checksum provided together with the jar file.
To check the SHA256 sum of the downloaded jar, run the command:
sha256sum exasol-kafka-connector-extension-1.7.9.jar
Additionally, you can build the assembled jar from the source. This allows you to use the latest commits that may not be released yet.
Clone the repository,
git clone https://github.com/exasol/kafka-connector-extension
cd kafka-connector-extension
Create an assembled jar file,
sbt assembly
The packaged jar file should be located at
target/scala-2.12/exasol-kafka-connector-extension-1.7.9.jar
.
To store the connector jar, we need to create a bucket in the Exasol Bucket File System (BucketFS).
Please see the section "The synchronous cluster file system BucketFS" in the EXASolution User Manual for more details about BucketFS.
This allows us to reference the jar file in the UDF scripts.
Now you can upload the jar file to the bucket. However, before uploading the jar, please make sure the BucketFS ports are open.
In this guide, we use the port number
2580
for the HTTP protocol.
Upload the jar file using the curl
command:
curl -X PUT -T exasol-kafka-connector-extension-1.7.9.jar \
http://w:<WRITE_PASSWORD>@<EXASOL_DATANODE>:2580/<BUCKET_NAME>/
Please ensure that the file was uploaded.
Check the bucket contents:
curl -X GET http://r:<READ_PASSWORD>@<EXASOL_DATANODE>:2580/<BUCKET_NAME>/
Create the UDF scripts that help with importing the data.
First, create a schema that will contain the UDF scripts.
CREATE SCHEMA KAFKA_EXTENSION;
Run the following SQL statements to create the Kafka extension UDF scripts.
OPEN SCHEMA KAFKA_EXTENSION;
CREATE OR REPLACE JAVA SET SCRIPT KAFKA_CONSUMER(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.kafka.KafkaConsumerQueryGenerator;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.9.jar;
/
CREATE OR REPLACE JAVA SET SCRIPT KAFKA_IMPORT(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.kafka.KafkaTopicDataImporter;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.9.jar;
/
CREATE OR REPLACE JAVA SET SCRIPT KAFKA_METADATA(
params VARCHAR(2000),
kafka_partition DECIMAL(18, 0),
kafka_offset DECIMAL(36, 0)
)
EMITS (partition_index DECIMAL(18, 0), max_offset DECIMAL(36,0)) AS
%scriptclass com.exasol.cloudetl.kafka.KafkaTopicMetadataReader;
%jar /buckets/bfsdefault/<BUCKET>/exasol-kafka-connector-extension-1.7.9.jar;
/
Please do not change the UDF script names.
Similarly, do not forget to change the bucket name or the latest jar version according to your deployment setup.
The connector extension can handle different Kafka record formats, currently they are:
- Avro
- JSON
- String
The format of the records can be set for both key and value with the following configuration values:
RECORD_KEY_FORMAT=avro|json|string
RECORD_VALUE_FORMAT=avro|json|string
It should match the format of the records on the topic you are importing. The connector can extract fields from the key and value of the records and insert them into the target table the import is running against.
The configuration setting RECORD_FIELDS
controls the list of values which are
inserted into the table.
Please note that when using Avro format, you are required to provide Confluent Schema Registry URL address.
The following table illustrates the possible values and support for the serialization formats.
Record Field Specification | Value | Avro | JSON | String |
---|---|---|---|---|
value._field1_ |
The field field1 of the record value | yes | yes | no |
key._field2_ |
The field field2 of the record key | yes | yes | no |
value.* |
All fields from the record value | yes | no (order not deterministic) | no |
key.* |
All fields from the record key | yes | no (order not deterministic) | no |
value |
The record value as string | yes (as JSON) | yes (as JSON) | yes |
key |
The record key as string | yes (as JSON) | yes (as JSON) | yes |
timestamp |
The record timestamp | yes | yes | yes |
Given a record that has the following Avro value (JSON representation)
{
"firstName": "John",
"lastName": "Smith",
"isAlive": true,
"age": 27
}
and this key (also Avro, as JSON representation)
{
"id": 123
}
Then you can configure the connector with parameters as shown below:
RECORD_KEY_FORMAT=avro
RECORD_VALUE_FORMAT=avro
RECORD_FIELDS=key.id,timestamp,value.lastName,value.age
This imports field from the record key, the fields lastName
and age
from
value and the record timestamp metadata.
You should create a corresponding table in Exasol that stores the data from a Kafka topic.
The table column names and types should match the fields specified in the
RECORD_FIELDS
parameter. Please make sure the number of columns and their
order is correct — otherwise the import will lead to an error as there is
no by-name mapping possible.
Additionally, add two extra columns to the end of the table. These columns store the Kafka metadata and help to keep track of the already imported records.
For example, given the following Avro record schema,
{
"type": "record",
"name": "KafkaExasolAvroRecord",
"fields": [
{ "name": "product", "type": "string" },
{ "name": "price", "type": { "type": "bytes", "precision": 4, "scale": 2, "logicalType": "decimal" }},
{ "name": "sale_time", "type": { "type": "long", "logicalType": "timestamp-millis" }}
]
}
and the setting
RECORD_FIELDS=value.product,value.price,value.sale_time
then, you should define the following Exasol table with column types mapped respectively.
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
PRODUCT VARCHAR(500),
PRICE DECIMAL(4, 2),
SALE_TIME TIMESTAMP,
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0)
);
The last two columns are used to store the metadata about Kafka topic partition and record offset inside a partition:
- KAFKA_PARTITION DECIMAL(18,0)
- KAFKA_OFFSET DECIMAL(36, 0)
Avro supports several primitive and complex type. The following table shows how they are mapped to the Exasol types.
Avro Data Type | Avro Logical Attribute | Recommended Exasol Column Types |
---|---|---|
boolean | BOOLEAN | |
int | INT, INTEGER, DECIMAL(18, 0) | |
int | date | DATE |
long | BIGINT, DECIMAL(36, 0) | |
long | timestamp-millis | TIMESTAMP |
long | timestamp-micros | TIMESTAMP |
float | FLOAT | |
double | DOUBLE, DOUBLE PRECISION | |
bytes | VARCHAR(n), CHAR(n) | |
bytes | decimal(p, s) | DECIMAL(p, s) |
fixed | VARCHAR(n), CHAR(n) | |
fixed | decimal(p, s) | DECIMAL(p, s) |
string | VARCHAR(n), CHAR(n) | |
enum | VARCHAR(n), CHAR(n) | |
union | Corresponding Non Null Type | |
array | VARCHAR(n), CHAR(n) | |
map | VARCHAR(n), CHAR(n) | |
record | VARCHAR(n), CHAR(n) |
You can also enrich regular Avro types with logical type attributes, and use the suggested Exasol column types when preparing the table.
Please notice that we convert Avro complex types to the JSON Strings. Use Exasol
VARCHAR(n)
column type to store them. Depending on the size of complex type,
set the number of characters in the VARCHAR type accordingly.
JSON supports a limited set of primitive and complex type. The following table shows how they are mapped to the Exasol types.
JSON Data Type | Recommended Exasol Column Types |
---|---|
boolean | BOOLEAN |
number | INT, INTEGER, DECIMAL(p,s) |
string | VARCHAR(n), CHAR(n) |
object | VARCHAR(n) |
Similar to Avro, connector will emit any complex types as valid JSON strings, so
you should define them as VARCHAR(n)
column in Exasol table.
Several property values are required to access the Kafka cluster when importing data from Kafka topics using the connector.
You should provide these key-value parameters:
BOOTSTRAP_SERVERS
SCHEMA_REGISTRY_URL
TOPIC_NAME
TABLE_NAME
The BOOTSTRAP_SERVERS is a comma-separated list of host port pairs used to establish an initial connection to the Kafka cluster. The UDF connector will contact all servers in the Kafka cluster, irrespective of servers specified with this parameter. This list only defines initial hosts used to discover the full list of Kafka servers.
The SCHEMA_REGISTRY_URL is an URL to the Schema Registry server. This is only required if you are importing Avro records. It is used to retrieve Avro schemas of Kafka topics. Avro is set as default record value format.
The TOPIC_NAME is the name of the Kafka topic we want to import Avro data from. Please note that we only support a single topic data imports.
The TABLE_NAME is the Exasol table name that we have prepared and we are going to import Kafka topic data.
For more information on Kafka import parameters, please refer to the Kafka consumer properties.
The import command has the following form:
IMPORT INTO <schema_name>.<table_name>
FROM SCRIPT KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<kafka_bootstap_servers>'
SCHEMA_REGISTRY_URL = '<schema_registry_url>'
TOPIC_NAME = '<kafka_topic>
TABLE_NAME = '<schema_name>.<table_name>'
GROUP_ID = 'exasol-kafka-udf-consumers';
For example, given the Kafka topic named SALES-POSITIONS
containing Avro
encoded values, we can import its data into RETAIL.SALES_POSITIONS
table in
Exasol:
IMPORT INTO RETAIL.SALES_POSITIONS
FROM SCRIPT KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = 'kafka01.internal:9092,kafka02.internal:9093,kafka03.internal:9094'
SCHEMA_REGISTRY_URL = 'http://schema-registry.internal:8081'
TOPIC_NAME = 'SALES-POSITIONS'
TABLE_NAME = 'RETAIL.SALES_POSITIONS'
GROUP_ID = 'exasol-kafka-udf-consumers';
Since the recent releases, Apache Kafka supports secure connections to Kafka brokers from clients (producers and consumers) using encryption with SSL/TLS and authentication with various SASL mechanisms.
In order to use the encrypted connections to the Kafka cluster from the UDF, you need to upload the consumer Truststore and Keystore files to the Exasol BucketFS bucket so that we can access them when running the Kafka import UDF.
Upload the consumer Java Keystore format (JKS) files:
# Upload consumer client truststore JKS file
curl -X PUT -T certs/kafka.consumer.truststore.jks \
http://w:<WRITE_PASSWORD>@<EXASOL_DATANODE>:2580/<BUCKET>/kafka.consumer.truststore.jks
# Upload consumer client keystore JKS file
curl -X PUT -T certs/kafka.consumer.keystore.jks \
http://w:<WRITE_PASSWORD>@<EXASOL_DATANODE>:2580/<BUCKET>/kafka.consumer.keystore.jks
Please check out the Apache Kafka documentation on security and Kafka client configurations for more information.
Additionally, we have to provide extra parameters to the UDF in order to enable a secure connection to the Kafka cluster. Please check out the Kafka consumer properties for secure property descriptions.
First, create an Exasol named connection object and encode JKS files credentials
and locations with key-value pairs separated by a semicolon (;
).
CREATE OR REPLACE CONNECTION KAFKA_SSL_CONNECTION
TO ''
USER ''
IDENTIFIED BY 'SSL_KEY_PASSWORD=<PASSWORD>;SSL_KEYSTORE_PASSWORD=<SSLPASSWORD>;SSL_KEYSTORE_LOCATION=/buckets/bfsdefault/<BUCKET>/keystore.jks;SSL_TRUSTSTORE_PASSWORD=<TRUSTSTOREPASS>;SSL_TRUSTSTORE_LOCATION=/buckets/bfsdefault/<BUCKET>/truststore.jks'
Then use the connection object with a Kafka import statement:
IMPORT INTO <schema_name>.<table_name>
FROM SCRIPT KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<kafka_bootstap_servers>'
SCHEMA_REGISTRY_URL = '<schema_registry_url>'
TOPIC_NAME = '<kafka_topic>'
TABLE_NAME = '<schema_name>.<table_name>'
GROUP_ID = 'exasol-kafka-udf-consumers';
-- Secure connection properties
SECURITY_PROTOCOL = 'SSL'
CONNECTION_NAME = 'KAFKA_SSL_CONNECTION';
Create an Exasol named connection object and encode credentials with key-value
pairs separated by a semicolon (;
).
Note: Authentication can be used in conjunction with encryption. For this you
need create connection with combined authentication & encryption settings and
set SECURITY_PROTOCOL
to SASL_SSL.
CREATE OR REPLACE CONNECTION KAFKA_SASL_CONNECTION
TO ''
USER ''
IDENTIFIED BY 'SASL_MECHANISM=PLAIN;SASL_USERNAME=<SASL_USERNAME>;SASL_PASSWORD=<SASL_PASSWORD>'
Then use the connection object with a Kafka import statement:
IMPORT INTO <schema_name>.<table_name>
FROM SCRIPT KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<kafka_bootstap_servers>'
SCHEMA_REGISTRY_URL = '<schema_registry_url>'
TOPIC_NAME = '<kafka_topic>'
TABLE_NAME = '<schema_name>.<table_name>'
GROUP_ID = 'exasol-kafka-udf-consumers';
-- Secure connection properties
SECURITY_PROTOCOL = 'SASL_SSL'
CONNECTION_NAME = 'KAFKA_SASL_CONNECTION';
If you need more complex SASL configuration, you can create a SASL JAAS
configuration file, upload it to Exasol BucketFS and specify
its path into SASL_JAAS_LOCATION
.
Please note that JAAS configuration file should contain just login module definitions. Example of config file:
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/buckets/bfsdefault/bucket1/kafka.keytab"
principal="principal@DOMAIN.COM";
In some complex setups, you might need to provide a custom krb5.conf
file. Thes could be done by uploading it to the BucketFS and providing the path in SASL_KRB5CONF_LOCATION
parameter, similar to SASL_JAAS_LOCATION
.
To import data from Azure Event Hubs, we are going to create a secure SASL_SSL connection to encode the credentials. Please note that for this you need to choose Standard or Premium pricing tier.
CREATE OR REPLACE CONNECTION EVENT_HUBS_SASL_CONNECTION
TO ''
USER ''
IDENTIFIED BY 'SASL_MECHANISM=PLAIN#SASL_USERNAME=$ConnectionString#SASL_PASSWORD=<EVENT_HUBS_NAMESPACE_CONNECTION_STRING>'
Please don't forget to substitute EVENT_HUBS_NAMESPACE_CONNECTION_STRING
with
your namespace connection string above. You can follow the Get an Event Hubs
connection string documentation to obtain it.
You should notice that we use #
as separator instead of usual ;
. This is
required because the Azure Event Hubs namespace connection string already contains
;
in itself. Our custom separator prevents splitting it up.
Let's also create an Exasol table that corresponds to the Azure Event Hub data.
For example, the following table:
CREATE OR REPLACE TABLE EVENT_HUBS_TOPIC (
BODY VARCHAR(20000),
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0)
);
Now we are ready to import data from Azure Event Hub.
Let's run the following SQL statement to import data:
IMPORT INTO EVENT_HUBS_TOPIC
FROM SCRIPT KAFKA_EXTENSION.KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<EVENT_HUBS_NAMESPACE_HOST_NAME>.servicebus.windows.net:9093'
RECORD_VALUE_FORMAT = 'STRING'
SECURITY_PROTOCOL = 'SASL_SSL'
CONNECTION_NAME = 'EVENT_HUBS_SASL_CONNECTION'
CONNECTION_SEPARATOR = '#'
TABLE_NAME = 'EVENT_HUBS_TOPIC'
TOPIC_NAME = '<EVENT_HUB_NAME>';
Please do not forget to replace the placeholders with actual values.
<EVENT_HUBS_NAMESPACE_HOST_NAME>
is the name of your Event Hubs Namespace. You can find this on the overview page of your Event Hub Namespace.<EVENT_HUB_NAME>
is the name for the Event Hub (a topic in Apache Kafka terms) that holds data.
The following properties are related to UDFs when importing data from Kafka clusters. Most of these properties are exactly the same as Kafka consumer configurations.
-
BOOTSTRAP_SERVERS
- It is a comma-separated host-port pairs of Kafka brokers. These addresses will be used to establish the initial connection to the Kafka cluster. -
TOPIC_NAME
- It defines a Kafka topic name that we want to import data from. We only support a single topic data imports. Therefore, it should not contain comma-separated list of more than one topic name. -
TABLE_NAME
- It defines the Exasol table name the data will be imported. This is required as user-provided parameter since unfortunately, we cannot obtain table name from inside UDF even though we are importing data into it.
These are optional parameters with their default values.
-
SCHEMA_REGISTRY_URL
- It specifies an URL to the Confluent Schema Registry which stores Avro schemas as metadata. Schema Registry will be used to parse the Kafka topic Avro data schemas. -
RECORD_KEY_FORMAT
- It specifies the record key format. It should be one ofavro
,json
orstring
values. The default value is string. -
RECORD_VALUE_FORMAT
- It defines the record value format. It should be one ofavro
,json
orstring
values. The default value is avro. -
RECORD_FIELDS
- A comma separated list of fields to import from the source record. It is recommended to set this when the structure of the Kafka records is not under your control and the order and/or the number of fields in the record can change at any time. The options are outlined in the Record Format Configuration section.The default is dependent on the serialization format:
- avro:
value.*
— All fields from the record will be imported. - json:
value
— The record will be imported as a single JSON string into a column. - string:
value
— The record will be imported as a single string into a column.
- avro:
-
GROUP_ID
- It defines the id for this type of consumer. The default value is EXASOL_KAFKA_UDFS_CONSUMERS. It is a unique string that identifies the consumer group this consumer belongs to. -
POLL_TIMEOUT_MS
- It defines the timeout value that is the number of milliseconds to wait for the consumer poll function to return any data. The default value is 30000 milliseconds. -
MIN_RECORDS_PER_RUN
- It is an upper bound on the minimum number of records to the consumer per UDF run. The default value is 100. That is, if the pull function returns fewer records than this number, we consume returned records and finish the UDF process. Otherwise, we continue polling more data until the total number of records reaches a certain threshold, for example,MAX_RECORD_PER_RUN
. -
MAX_RECORD_PER_RUN
- It is a lower bound on the maximum number of records to the consumer per UDF run. The default value is 1000000. When the returned number of records from the poll is more thanMIN_RECORDS_PER_RUN
, we continue polling for more records until the total number reaches this number. -
MAX_POLL_RECORDS
- It is the maximum number of records returned in a single call from the consumer poll method. The default value is 500. -
FETCH_MIN_BYTES
- It is the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default value is 1. -
FETCH_MAX_BYTES
- It is the maximum amount of data the server should return for a fetch request. The default value is 52428800. -
MAX_PARTITION_FETCH_BYTES
- It is the maximum amount of data per partition the server will return. The default value is 1048576. -
CONSUME_ALL_OFFSETS
- It defines whether to consume all available offsets of topic partitions. If it is set to 'true', connector will continue polling Kafka records up until the last offset in each partition that existed when the import started. It overrides any count thresholds and fully catches up on the topic from the last import offset (or initial start offset). Default value is 'false'. -
AS_JSON_DOC
- (deprecated) It defines the way the data will be imported into the database. If set to 'true' data will be imported as one JSON document in one column. Default value is 'false'. When dealing with JSON it should be replaced by specifyingRECORD_VALUE_FORMAT=json
andRECORD_FIELDS=value
.
The following properties should be provided to enable a secure connection to the Kafka clusters. For the safety they must be specified in Exasol named connection not in import statement itself.
-
SSL_ENABLED
- (deprecated) It is a boolean property that should be set totrue
in order to use the secure connections to the Kafka cluster. Default value is 'false'. UseSECURITY_PROTOCOL=SSL
orSECURITY_PROTOCOL=SASL_SSL
instead. -
SECURITY_PROTOCOL
- It is the protocol used to communicate with Kafka servers. Default value is PLAINTEXT. -
SSL_KEY_PASSWORD
- It represents the password of the private key inside the keystore file. -
SSL_KEYSTORE_PASSWORD
- It is the password for the keystore file. -
SSL_KEYSTORE_LOCATION
- It represents the location of the keystore file. This location value should point to the keystore file that is available via Exasol bucket in BucketFS. -
SSL_TRUSTSTORE_PASSWORD
- It is the password for the truststore file. -
SSL_TRUSTSTORE_LOCATION
- It is the location of the truststore file, and it should refer to the truststore file stored inside a bucket in Exasol BucketFS. -
SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
- It is the endpoint identification algorithm to validate server hostname using a server certificate. Default value is https. -
SASL_MECHANISM
- It is SASL mechanism to use for authentication. Default value is GSSAPI. -
SASL_USERNAME
/SASL_PASSWORD
- These are SASL credentials. They can be simply used whenSASL_MECHANISM
is set to PLAIN, DIGEST-* or SCRAM-*. -
SASL_JAAS_LOCATION
- It is the location of the JAAS configuration file for more complex configuration of SASL authentication. It should refer to the file stored inside a bucket in Exasol BucketFS. -
SASL_KRB5CONF_LOCATION
- It is the location of the customkrb5.conf
file. It should refer to the file stored inside a bucket in Exasol BucketFS. In default configuration, the path starts with/buckets/bfsdefault/<bucket_name>/