From c341ce17bb5c239964d61859ff2d1c1ffedc2049 Mon Sep 17 00:00:00 2001 From: Muhammet Orazov Date: Fri, 15 Oct 2021 10:35:33 +0200 Subject: [PATCH] #56: Updated userguide with Azure Event Hubs usage (#60) Co-authored-by: Pieterjan Spoelders --- .editorconfig | 2 +- build.sbt | 2 +- doc/changes/changes_1.5.0.md | 10 ++- doc/user_guide/user_guide.md | 139 ++++++++++++++++++++++++++--------- 4 files changed, 114 insertions(+), 39 deletions(-) diff --git a/.editorconfig b/.editorconfig index 3478628..ba5b292 100644 --- a/.editorconfig +++ b/.editorconfig @@ -12,7 +12,7 @@ max_line_length = 120 trim_trailing_whitespace = true [*.md] -max_line_length = 120 +max_line_length = 80 trim_trailing_whitespace = false [Makefile] diff --git a/build.sbt b/build.sbt index 27bb8e2..66b35c7 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ lazy val root = project .in(file(".")) .settings(moduleName := "exasol-kafka-connector-extension") - .settings(version := "1.4.0") + .settings(version := "1.5.0") .settings(orgSettings) .settings(buildSettings) .settings(Settings.projectSettings(scalaVersion)) diff --git a/doc/changes/changes_1.5.0.md b/doc/changes/changes_1.5.0.md index 9ad6228..80b470d 100644 --- a/doc/changes/changes_1.5.0.md +++ b/doc/changes/changes_1.5.0.md @@ -1,13 +1,19 @@ -# Kafka Connector Extension 1.5.0, released 2021-10-?? +# Kafka Connector Extension 1.5.0, released 2021-10-15 -Code name: +Code name: Azure Event Hubs Support ## Summary +In this release, we added documentation for using the connector to import data from Azure Event Hubs. Please check out the user guide for more information. + ## Bug Fixes * #54: Fixed issue with setting empty values to property keys +## Documentation + +* #56: Updated user guide with Azure Event Hubs usage information + ## Refactoring * #55: Added optional check for keystore or truststore files when using `SASL_SSL` protocol diff --git a/doc/user_guide/user_guide.md b/doc/user_guide/user_guide.md index 849910b..4a213cf 100644 --- a/doc/user_guide/user_guide.md +++ b/doc/user_guide/user_guide.md @@ -15,6 +15,7 @@ Using the connector you can import data from a Kafka topic into an Exasol table. - [JSON Data Mapping](#json-data-mapping) - [Importing Records](#importing-records) - [Secure Connection to Kafka Cluster](#secure-connection-to-kafka-cluster) +- [Importing Data From Azure Event Hubs](#importing-data-from-azure-event-hubs) - [Kafka Consumer Properties](#kafka-consumer-properties) ## Getting Started @@ -109,7 +110,7 @@ curl -X PUT -T exasol-kafka-connector-extension-.jar \ http://w:@:2580// ``` -Please ensure that the file was uploaded. +Please ensure that the file was uploaded. Check the bucket contents: @@ -143,8 +144,8 @@ CREATE OR REPLACE JAVA SET SCRIPT KAFKA_IMPORT(...) EMITS (...) AS / CREATE OR REPLACE JAVA SET SCRIPT KAFKA_METADATA( - params VARCHAR(2000), - kafka_partition DECIMAL(18, 0), + params VARCHAR(2000), + kafka_partition DECIMAL(18, 0), kafka_offset DECIMAL(36, 0) ) EMITS (partition_index DECIMAL(18, 0), max_offset DECIMAL(36,0)) AS @@ -173,7 +174,7 @@ configuration values: `RECORD_VALUE_FORMAT=avro|json|string` -It should match the format of the records on the topic you are importing. The +It should match the format of the records on the topic you are importing. The connector can extract fields from the key and value of the records and insert them into the target table the import is running against. @@ -255,7 +256,7 @@ For example, given the following Avro record schema, } ``` -and the setting +and the setting `RECORD_FIELDS=value.product,value.price,value.sale_time` @@ -317,13 +318,13 @@ set the number of characters in the VARCHAR type accordingly. [JSON][json-spec] supports a limited set of primitive and complex type. The following table shows how they are mapped to the Exasol types. -| JSON Data Type | Recommended Exasol Column Types | +| JSON Data Type | Recommended Exasol Column Types | |:---------------|:--------------------------------| | boolean | BOOLEAN | | number | INT, INTEGER, DECIMAL(p,s) | | string | VARCHAR(n), CHAR(n) | | object | VARCHAR(n) | - + Similar to Avro, connector will emit any complex types as valid JSON strings, so you should define them as `VARCHAR(n)` column in Exasol table. @@ -386,9 +387,9 @@ FROM SCRIPT KAFKA_CONSUMER WITH ## Secure Connection to Kafka Cluster -Since the recent releases, Apache Kafka supports secure connections -to Kafka brokers from clients (producers and consumers) using encryption with SSL/TLS -and authentication with various SASL mechanisms. +Since the recent releases, Apache Kafka supports secure connections to Kafka +brokers from clients (producers and consumers) using encryption with SSL/TLS and +authentication with various SASL mechanisms. In order to use the encrypted connections to the Kafka cluster from the UDF, you need to upload the consumer Truststore and Keystore files to the Exasol BucketFS @@ -447,9 +448,9 @@ FROM SCRIPT KAFKA_CONSUMER WITH Create an Exasol named connection object and encode credentials with key-value pairs separated by a semicolon (`;`). -Note: Authentication can be used in conjunction with encryption. -For this you need create connection with combined authentication & encryption settings -and set ``SECURITY_PROTOCOL`` to **SASL_SSL**. +Note: Authentication can be used in conjunction with encryption. For this you +need create connection with combined authentication & encryption settings and +set ``SECURITY_PROTOCOL`` to **SASL_SSL**. ```sql CREATE OR REPLACE CONNECTION KAFKA_SASL_CONNECTION @@ -469,12 +470,76 @@ FROM SCRIPT KAFKA_CONSUMER WITH TABLE_NAME = '.' GROUP_ID = 'exasol-kafka-udf-consumers'; -- Secure connection properties - SECURITY_PROTOCOL = 'SASL_PLAINTEXT' + SECURITY_PROTOCOL = 'SASL_SSL' CONNECTION_NAME = 'KAFKA_SASL_CONNECTION'; ``` -If you need more complex SASL configuration, you can create [SASL JAAS configuration][kafka-sasl-jaas] -file, upload it to Exasol BucketFS and specify its path into ``SASL_JAAS_LOCATION``. +If you need more complex SASL configuration, you can create a [SASL JAAS +configuration][kafka-sasl-jaas] file, upload it to Exasol BucketFS and specify +its path into ``SASL_JAAS_LOCATION``. + +## Importing Data From Azure Event Hubs + +To import data from [Azure Event Hubs][azure-event-hubs], we are going to create +a secure **SASL_SSL** connection to encode the credentials. Please note that for +this you need to choose **Standard** or **Premium** pricing tier. + +```sql +CREATE OR REPLACE CONNECTION EVENT_HUBS_SASL_CONNECTION +TO '' +USER '' +IDENTIFIED BY 'SASL_MECHANISM=PLAIN#SASL_USERNAME=$ConnectionString#SASL_PASSWORD=' +``` + +Please don't forget to substitute `EVENT_HUBS_NAMESPACE_CONNECTION_STRING` with +your namespace connection string above. You can follow the [Get an Event Hubs +connection string][event-hubs-get-connection-string] documentation to obtain it. + +_You should notice that we use `#` as separator instead of usual `;`. This is +required because the Azure Event Hubs namespace connection string already contains +`;` in itself. Our custom separator prevents splitting it up._ + +### Prepare Table for Azure Event Hubs Topic + +Let's also create an Exasol table that corresponds to the Azure Event Hub data. + +For example, the following table: + +```sql +CREATE OR REPLACE TABLE EVENT_HUBS_TOPIC ( + BODY VARCHAR(20000), + KAFKA_PARTITION DECIMAL(18, 0), + KAFKA_OFFSET DECIMAL(36, 0) +); +``` + +### Import Data From Azure Event Hub + +Now we are ready to import data from Azure Event Hub. + +Let's run the following SQL statement to import data: + +``` +IMPORT INTO EVENT_HUBS_TOPIC +FROM SCRIPT KAFKA_EXTENSION.KAFKA_CONSUMER WITH + BOOTSTRAP_SERVERS = '.servicebus.windows.net:9093' + RECORD_VALUE_FORMAT = 'STRING' + SECURITY_PROTOCOL = 'SASL_SSL' + CONNECTION_NAME = 'EVENT_HUBS_SASL_CONNECTION' + CONNECTION_SEPARATOR = '#' + TABLE_NAME = 'EVENT_HUBS_TOPIC' + TOPIC_NAME = ''; +``` + +Please do not forget to replace the placeholders with actual values. + +* `` is the name of your Event Hubs Namespace. You + can find this on the overview page of your Event Hub Namespace. +* `` is the name for the Event Hub (a topic in Apache Kafka terms) + that holds data. + +[azure-event-hubs]: https://azure.microsoft.com/en-us/services/event-hubs/ +[event-hubs-get-connection-string]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string ## Kafka Consumer Properties @@ -510,26 +575,27 @@ These are optional parameters with their default values. * ``RECORD_VALUE_FORMAT`` - It defines the record value format. It should be one of `avro`, `json` or `string` values. The default value is **avro**. -* ``RECORD_FIELDS`` - A comma separated list of fields to import from the - source record. It is recommended to set this when the structure of the Kafka - records is not under your control and the order and/or the number of fields in - the record can change at any time. The options are outlined - [Record Format Configuration](#record-format-configuration) section. +* ``RECORD_FIELDS`` - A comma separated list of fields to import from the source + record. It is recommended to set this when the structure of the Kafka records + is not under your control and the order and/or the number of fields in the + record can change at any time. The options are outlined in the [Record Format + Configuration](#record-format-configuration) section. The default is dependent on the serialization format: - - avro: **`value.*`** — All fields from the record will be imported. - - json: **`value`** — The record will be imported as single JSON string - into a column. - - string: **`value`** — The record will be imported as single string - into a column. + - avro: value.* — All fields from the record will be + imported. + - json: value — The record will be imported as + a single JSON string into a column. + - string: value — The record will be imported as + a single string into a column. * ``GROUP_ID`` - It defines the id for this type of consumer. The default value is **EXASOL_KAFKA_UDFS_CONSUMERS**. It is a unique string that identifies the consumer group this consumer belongs to. * ``POLL_TIMEOUT_MS`` - It defines the timeout value that is the number of - milliseconds to wait for the consumer poll function to return any data. - The default value is **30000** milliseconds. + milliseconds to wait for the consumer poll function to return any data. The + default value is **30000** milliseconds. * ``MIN_RECORDS_PER_RUN`` - It is an upper bound on the minimum number of records to the consumer per UDF run. The default value is **100**. That is, if @@ -575,9 +641,10 @@ The following properties should be provided to enable a secure connection to the Kafka clusters. For the safety they must be specified in Exasol named connection not in import statement itself. -* ``SSL_ENABLED`` - (_deprecated_) It is a boolean property that should be set to `true` - in order to use the secure connections to the Kafka cluster. Default value is - **'false'**. Use `SECURITY_PROTOCOL=SSL` or `SECURITY_PROTOCOL=SASL_SSL` instead. +* ``SSL_ENABLED`` - (_deprecated_) It is a boolean property that should be set + to `true` in order to use the secure connections to the Kafka cluster. Default + value is **'false'**. Use `SECURITY_PROTOCOL=SSL` or + `SECURITY_PROTOCOL=SASL_SSL` instead. * ``SECURITY_PROTOCOL`` - It is the protocol used to communicate with Kafka servers. Default value is **PLAINTEXT**. @@ -604,11 +671,13 @@ not in import statement itself. * ``SASL_MECHANISM`` - It is SASL mechanism to use for authentication. Default value is **GSSAPI**. -* ``SASL_USERNAME``/``SASL_PASSWORD`` - These are SASL credentials. - They can be simply used when `SASL_MECHANISM` is set to **PLAIN**, __DIGEST-*__ or __SCRAM-*__. +* ``SASL_USERNAME``/``SASL_PASSWORD`` - These are SASL credentials. They can be + simply used when `SASL_MECHANISM` is set to **PLAIN**, __DIGEST-*__ or + __SCRAM-*__. -* ``SASL_JAAS_LOCATION`` - It is the location of the JAAS configuration file for more complex configuration - of SASL authentication. It should refer to the file stored inside a bucket in Exasol BucketFS. +* ``SASL_JAAS_LOCATION`` - It is the location of the JAAS configuration file for + more complex configuration of SASL authentication. It should refer to the file + stored inside a bucket in Exasol BucketFS. [gh-releases]: https://github.com/exasol/kafka-connector-extension/releases [schema-registry]: https://docs.confluent.io/current/schema-registry/index.html