Skip to content

Commit

Permalink
#56: Updated userguide with Azure Event Hubs usage (#60)
Browse files Browse the repository at this point in the history
Co-authored-by: Pieterjan Spoelders <[email protected]>
  • Loading branch information
morazow and pj-spoelders authored Oct 15, 2021
1 parent b6ab2e4 commit c341ce1
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ max_line_length = 120
trim_trailing_whitespace = true

[*.md]
max_line_length = 120
max_line_length = 80
trim_trailing_whitespace = false

[Makefile]
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ lazy val root =
project
.in(file("."))
.settings(moduleName := "exasol-kafka-connector-extension")
.settings(version := "1.4.0")
.settings(version := "1.5.0")
.settings(orgSettings)
.settings(buildSettings)
.settings(Settings.projectSettings(scalaVersion))
Expand Down
10 changes: 8 additions & 2 deletions doc/changes/changes_1.5.0.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# Kafka Connector Extension 1.5.0, released 2021-10-??
# Kafka Connector Extension 1.5.0, released 2021-10-15

Code name:
Code name: Azure Event Hubs Support

## Summary

In this release, we added documentation for using the connector to import data from Azure Event Hubs. Please check out the user guide for more information.

## Bug Fixes

* #54: Fixed issue with setting empty values to property keys

## Documentation

* #56: Updated user guide with Azure Event Hubs usage information

## Refactoring

* #55: Added optional check for keystore or truststore files when using `SASL_SSL` protocol
Expand Down
139 changes: 104 additions & 35 deletions doc/user_guide/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Using the connector you can import data from a Kafka topic into an Exasol table.
- [JSON Data Mapping](#json-data-mapping)
- [Importing Records](#importing-records)
- [Secure Connection to Kafka Cluster](#secure-connection-to-kafka-cluster)
- [Importing Data From Azure Event Hubs](#importing-data-from-azure-event-hubs)
- [Kafka Consumer Properties](#kafka-consumer-properties)

## Getting Started
Expand Down Expand Up @@ -109,7 +110,7 @@ curl -X PUT -T exasol-kafka-connector-extension-<VERSION>.jar \
http://w:<WRITE_PASSWORD>@<EXASOL_DATANODE>:2580/<BUCKET_NAME>/
```

Please ensure that the file was uploaded.
Please ensure that the file was uploaded.

Check the bucket contents:

Expand Down Expand Up @@ -143,8 +144,8 @@ CREATE OR REPLACE JAVA SET SCRIPT KAFKA_IMPORT(...) EMITS (...) AS
/

CREATE OR REPLACE JAVA SET SCRIPT KAFKA_METADATA(
params VARCHAR(2000),
kafka_partition DECIMAL(18, 0),
params VARCHAR(2000),
kafka_partition DECIMAL(18, 0),
kafka_offset DECIMAL(36, 0)
)
EMITS (partition_index DECIMAL(18, 0), max_offset DECIMAL(36,0)) AS
Expand Down Expand Up @@ -173,7 +174,7 @@ configuration values:

`RECORD_VALUE_FORMAT=avro|json|string`

It should match the format of the records on the topic you are importing. The
It should match the format of the records on the topic you are importing. The
connector can extract fields from the key and value of the records and insert
them into the target table the import is running against.

Expand Down Expand Up @@ -255,7 +256,7 @@ For example, given the following Avro record schema,
}
```

and the setting
and the setting

`RECORD_FIELDS=value.product,value.price,value.sale_time`

Expand Down Expand Up @@ -317,13 +318,13 @@ set the number of characters in the VARCHAR type accordingly.
[JSON][json-spec] supports a limited set of primitive and complex type. The
following table shows how they are mapped to the Exasol types.

| JSON Data Type | Recommended Exasol Column Types |
| JSON Data Type | Recommended Exasol Column Types |
|:---------------|:--------------------------------|
| boolean | BOOLEAN |
| number | INT, INTEGER, DECIMAL(p,s) |
| string | VARCHAR(n), CHAR(n) |
| object | VARCHAR(n) |

Similar to Avro, connector will emit any complex types as valid JSON strings, so
you should define them as `VARCHAR(n)` column in Exasol table.

Expand Down Expand Up @@ -386,9 +387,9 @@ FROM SCRIPT KAFKA_CONSUMER WITH
## Secure Connection to Kafka Cluster
Since the recent releases, Apache Kafka supports secure connections
to Kafka brokers from clients (producers and consumers) using encryption with SSL/TLS
and authentication with various SASL mechanisms.
Since the recent releases, Apache Kafka supports secure connections to Kafka
brokers from clients (producers and consumers) using encryption with SSL/TLS and
authentication with various SASL mechanisms.
In order to use the encrypted connections to the Kafka cluster from the UDF, you
need to upload the consumer Truststore and Keystore files to the Exasol BucketFS
Expand Down Expand Up @@ -447,9 +448,9 @@ FROM SCRIPT KAFKA_CONSUMER WITH
Create an Exasol named connection object and encode credentials with key-value
pairs separated by a semicolon (`;`).
Note: Authentication can be used in conjunction with encryption.
For this you need create connection with combined authentication & encryption settings
and set ``SECURITY_PROTOCOL`` to **SASL_SSL**.
Note: Authentication can be used in conjunction with encryption. For this you
need create connection with combined authentication & encryption settings and
set ``SECURITY_PROTOCOL`` to **SASL_SSL**.
```sql
CREATE OR REPLACE CONNECTION KAFKA_SASL_CONNECTION
Expand All @@ -469,12 +470,76 @@ FROM SCRIPT KAFKA_CONSUMER WITH
TABLE_NAME = '<schema_name>.<table_name>'
GROUP_ID = 'exasol-kafka-udf-consumers';
-- Secure connection properties
SECURITY_PROTOCOL = 'SASL_PLAINTEXT'
SECURITY_PROTOCOL = 'SASL_SSL'
CONNECTION_NAME = 'KAFKA_SASL_CONNECTION';
```
If you need more complex SASL configuration, you can create [SASL JAAS configuration][kafka-sasl-jaas]
file, upload it to Exasol BucketFS and specify its path into ``SASL_JAAS_LOCATION``.
If you need more complex SASL configuration, you can create a [SASL JAAS
configuration][kafka-sasl-jaas] file, upload it to Exasol BucketFS and specify
its path into ``SASL_JAAS_LOCATION``.
## Importing Data From Azure Event Hubs
To import data from [Azure Event Hubs][azure-event-hubs], we are going to create
a secure **SASL_SSL** connection to encode the credentials. Please note that for
this you need to choose **Standard** or **Premium** pricing tier.
```sql
CREATE OR REPLACE CONNECTION EVENT_HUBS_SASL_CONNECTION
TO ''
USER ''
IDENTIFIED BY 'SASL_MECHANISM=PLAIN#SASL_USERNAME=$ConnectionString#SASL_PASSWORD=<EVENT_HUBS_NAMESPACE_CONNECTION_STRING>'
```

Please don't forget to substitute `EVENT_HUBS_NAMESPACE_CONNECTION_STRING` with
your namespace connection string above. You can follow the [Get an Event Hubs
connection string][event-hubs-get-connection-string] documentation to obtain it.

_You should notice that we use `#` as separator instead of usual `;`. This is
required because the Azure Event Hubs namespace connection string already contains
`;` in itself. Our custom separator prevents splitting it up._

### Prepare Table for Azure Event Hubs Topic

Let's also create an Exasol table that corresponds to the Azure Event Hub data.

For example, the following table:

```sql
CREATE OR REPLACE TABLE EVENT_HUBS_TOPIC (
BODY VARCHAR(20000),
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0)
);
```

### Import Data From Azure Event Hub

Now we are ready to import data from Azure Event Hub.

Let's run the following SQL statement to import data:

```
IMPORT INTO EVENT_HUBS_TOPIC
FROM SCRIPT KAFKA_EXTENSION.KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<EVENT_HUBS_NAMESPACE_HOST_NAME>.servicebus.windows.net:9093'
RECORD_VALUE_FORMAT = 'STRING'
SECURITY_PROTOCOL = 'SASL_SSL'
CONNECTION_NAME = 'EVENT_HUBS_SASL_CONNECTION'
CONNECTION_SEPARATOR = '#'
TABLE_NAME = 'EVENT_HUBS_TOPIC'
TOPIC_NAME = '<EVENT_HUB_NAME>';
```

Please do not forget to replace the placeholders with actual values.

* `<EVENT_HUBS_NAMESPACE_HOST_NAME>` is the name of your Event Hubs Namespace. You
can find this on the overview page of your Event Hub Namespace.
* `<EVENT_HUB_NAME>` is the name for the Event Hub (a topic in Apache Kafka terms)
that holds data.

[azure-event-hubs]: https://azure.microsoft.com/en-us/services/event-hubs/
[event-hubs-get-connection-string]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string

## Kafka Consumer Properties

Expand Down Expand Up @@ -510,26 +575,27 @@ These are optional parameters with their default values.
* ``RECORD_VALUE_FORMAT`` - It defines the record value format. It should be one
of `avro`, `json` or `string` values. The default value is **avro**.

* ``RECORD_FIELDS`` - A comma separated list of fields to import from the
source record. It is recommended to set this when the structure of the Kafka
records is not under your control and the order and/or the number of fields in
the record can change at any time. The options are outlined
[Record Format Configuration](#record-format-configuration) section.
* ``RECORD_FIELDS`` - A comma separated list of fields to import from the source
record. It is recommended to set this when the structure of the Kafka records
is not under your control and the order and/or the number of fields in the
record can change at any time. The options are outlined in the [Record Format
Configuration](#record-format-configuration) section.

The default is dependent on the serialization format:
- avro: **`value.*`** &mdash; All fields from the record will be imported.
- json: **`value`** &mdash; The record will be imported as single JSON string
into a column.
- string: **`value`** &mdash; The record will be imported as single string
into a column.
- avro: <b><code>value.*</code></b> &mdash; All fields from the record will be
imported.
- json: <b><code>value</code></b> &mdash; The record will be imported as
a single JSON string into a column.
- string: <b><code>value</code></b> &mdash; The record will be imported as
a single string into a column.

* ``GROUP_ID`` - It defines the id for this type of consumer. The default value
is **EXASOL_KAFKA_UDFS_CONSUMERS**. It is a unique string that identifies the
consumer group this consumer belongs to.

* ``POLL_TIMEOUT_MS`` - It defines the timeout value that is the number of
milliseconds to wait for the consumer poll function to return any data.
The default value is **30000** milliseconds.
milliseconds to wait for the consumer poll function to return any data. The
default value is **30000** milliseconds.

* ``MIN_RECORDS_PER_RUN`` - It is an upper bound on the minimum number of
records to the consumer per UDF run. The default value is **100**. That is, if
Expand Down Expand Up @@ -575,9 +641,10 @@ The following properties should be provided to enable a secure connection to the
Kafka clusters. For the safety they must be specified in Exasol named connection
not in import statement itself.

* ``SSL_ENABLED`` - (_deprecated_) It is a boolean property that should be set to `true`
in order to use the secure connections to the Kafka cluster. Default value is
**'false'**. Use `SECURITY_PROTOCOL=SSL` or `SECURITY_PROTOCOL=SASL_SSL` instead.
* ``SSL_ENABLED`` - (_deprecated_) It is a boolean property that should be set
to `true` in order to use the secure connections to the Kafka cluster. Default
value is **'false'**. Use `SECURITY_PROTOCOL=SSL` or
`SECURITY_PROTOCOL=SASL_SSL` instead.

* ``SECURITY_PROTOCOL`` - It is the protocol used to communicate with Kafka
servers. Default value is **PLAINTEXT**.
Expand All @@ -604,11 +671,13 @@ not in import statement itself.
* ``SASL_MECHANISM`` - It is SASL mechanism to use for authentication.
Default value is **GSSAPI**.

* ``SASL_USERNAME``/``SASL_PASSWORD`` - These are SASL credentials.
They can be simply used when `SASL_MECHANISM` is set to **PLAIN**, __DIGEST-*__ or __SCRAM-*__.
* ``SASL_USERNAME``/``SASL_PASSWORD`` - These are SASL credentials. They can be
simply used when `SASL_MECHANISM` is set to **PLAIN**, __DIGEST-*__ or
__SCRAM-*__.

* ``SASL_JAAS_LOCATION`` - It is the location of the JAAS configuration file for more complex configuration
of SASL authentication. It should refer to the file stored inside a bucket in Exasol BucketFS.
* ``SASL_JAAS_LOCATION`` - It is the location of the JAAS configuration file for
more complex configuration of SASL authentication. It should refer to the file
stored inside a bucket in Exasol BucketFS.

[gh-releases]: https://github.com/exasol/kafka-connector-extension/releases
[schema-registry]: https://docs.confluent.io/current/schema-registry/index.html
Expand Down

0 comments on commit c341ce1

Please sign in to comment.