Skip to content

Commit

Permalink
Added one and two way auth TLS support + Bumped versions + Changed pr…
Browse files Browse the repository at this point in the history
…ogress bar placement

Co-authored-by: Juans94 <[email protected]>
  • Loading branch information
PabloL007 and Juans94 committed Jan 20, 2020
1 parent b7bb801 commit a38a6b6
Show file tree
Hide file tree
Showing 26 changed files with 742 additions and 506 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
__pycache__/
.DS_Store
kafkaexp
.idea
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7.2-alpine3.9
FROM python:3.8.0-alpine3.10

RUN mkdir -p /opt/docker

Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2019, Qvantel
Copyright (c) 2020, Qvantel
All rights reserved.

Redistribution and use in source and binary forms, with or without
Expand Down
162 changes: 158 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,42 @@ limited functionality with 0.9.x and 0.10.0 (as headers weren't introduced till

## Deployment

To deploy simply use the following command filling in the broker list:
For a simple deployment the following command can be used (filling in the broker list):

```shell
docker run -d -m 512m --log-driver json-file --log-opt max-size="1m" \
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-p 5000:5000 \
-e "KAFKA_BOOTSTRAP_BROKERS=<list of bootstrap brokers>" \
--name kafka-explorer qvantel/kafka-explorer:0.8.0
--name kafka-explorer qvantel/kafka-explorer:0.9.0
```

Enabling one-way TLS (without server validation):

```shell
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-p 5000:5000 \
-e "KAFKA_BOOTSTRAP_BROKERS=<list of bootstrap brokers>" \
-e "SECURITY_PROTOCOL=SSL" \
-e "SSL_CHECK_HOSTNAME=false" \
-e "SSL_CIPHERS=<list of ssl ciphers>" \
--name kafka-explorer qvantel/kafka-explorer:0.9.0
```

Enabling two-way TLS:

```shell
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-p 5000:5000 \
-v <local directory containing client certificate, client key and CARoot>:<container path> \
-e "LOG_LEVEL=DEBUG" \
-e "KAFKA_BOOTSTRAP_BROKERS=<list of bootstrap brokers>" \
-e "SECURITY_PROTOCOL=SSL" \
-e "SSL_CHECK_HOSTNAME=false" \
-e "SSL_CIPHERS=<list of ssl ciphers>" \
-e "SSL_CAFILE=<container path to pem ca>" \
-e "SSL_CERTFILE=<container path to pem cert>" \
-e "SSL_KEYFILE=<container path to client key>" \
--name kafka-explorer qvantel/kafka-explorer:0.9.0
```

The following environment variables are available:
Expand All @@ -28,6 +57,16 @@ The following environment variables are available:
- `REPORT_INTERVAL`: Interval at which a metadata event with a refreshed total message count will be generated (every
100 consumed messages by default)
- `BROKER_API_VERSION`: Kafka API version to use, '1.1.0' by default
- `SECURITY_PROTOCOL`: Protocol used to communicate with brokers. Valid values are: PLAINTEXT (by default), SSL
- `SSL_CHECK_HOSTNAME`: Flag to configure whether ssl handshake should verify that the certificate matches the brokers
hostname ('true' by default, case insensitive, anything else will be evaluated to false)
- `SSL_CAFILE`: Optional filename of ca file to use in certificate verification (None by default)
- `SSL_CERTFILE`: Optional filename of file in pem format containing the client certificate, as well as any ca
certificates needed to establish the certificate’s authenticity (None by default)
- `SSL_KEYFILE`: Optional filename containing the client private key (None by default)
- `SSL_PASSWORD`: Optional password to be used when loading the certificate chain (None by default)
- `SSL_CIPHERS`: Optionally set the available ciphers for ssl connections. It should be a string in the OpenSSL cipher
list format.

## Use

Expand Down Expand Up @@ -73,4 +112,119 @@ Once at least one message is selected, the export button will turn blue. At this
select a mode. After picking, your browser will start the download (the file will be named `<topic>_<browser time>` with
an extension dependant on the export mode).

Note that headers and key will only be included in the csv export.
Note that headers and key will only be included in the csv export.

## Testing TLS

In order to test the TLS functionality, we will need to create a CA and client and server certificates. To do so, we
can follow this steps:

### In the server
##### We create server keystore

```shell
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 999 -genkey
```
##### We create CA certificate and key (to sign other certificates)
```shell
openssl req -new -x509 -keyout ca-key -out ca-cert -days 999
```
##### We add ca certificate to client truststore
```shell
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
```
##### We add CA certificate to client truststore
```shell
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
```
##### We create a certificate for the server
```shell
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
```
##### We sign the server certificate with the CA created before
```shell
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 999 -CAcreateserial -passin pass:password
```
##### We store the CA certificate in the server keystore
```shell
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
```
##### We store the signed certificate in the server keystore
```shell
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
```

### In the client
##### We create the client keystore
```shell
keytool -genkey -keystore kafka.client.keystore.jks -validity 365 -storepass "MyClientPassword123" -keypass "MyClientPassword123" -dname "CN=mylaptop1" -alias my-local-pc1 -storetype pkcs12
```
##### We create the client certificate (that we want to sign with the CA)
```shell
keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias my-local-pc1 -storepass "MyClientPassword123" -keypass "MyClientPassword123"
```
##### We sign the certificate with the CA (client certificate must be copied to CA's location in order to do this step)
```shell
openssl x509 -req -CA ca-cert -CAkey ca-key -in /tmp1/client-cert-sign-request -out /tmp1/client-cert-signed -days 365 -CAcreateserial -passin pass:server_password
```
##### We import the CA cert to client's truststore
```shell
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt
```
##### We import the CA cert to client's keystore
```shell
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt
```
##### We import the signed client certificate to the client's keystore
```shell
keytool -keystore kafka.client.keystore.jks -import -file client-cert-signed -alias my-local-pc1 -storepass "MyClientPassword123" -keypass "MyClientPassword123" -noprompt
```

##### We can use this command to list the content of a JKS
```shell
keytool -list -rfc -keystore kafka.client.keystore.jks
```
##### Now we extract the client's certificate from that JKS
```shell
keytool -exportcert -alias my-local-pc1 -keystore kafka.client.keystore.jks -rfc -file certificate.pem
```
##### We extract the client's key from the JKS
```shell
keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias localhost -destkeystore cert_and_key.p12 -deststoretype PKCS12
```
##### And store it in key.pem
```shell
openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes > key.pem.temp
(cat key.pem.temp | sed -n '/-----BEGIN PRIVATE KEY-----/,$p') > key.pem```
rm key.pem.temp
```
##### Then we get CARoot cert with this command
```shell
keytool -exportcert -alias CARoot -keystore kafka.client.keystore.jks -rfc \
-file `CARoot.pem`
```
After completing this process, we have all the necessary files for 2-way TLS connection between kafka-explorer and the kafka server:
* kafka.server.keystore.jks: The server's keystore in jks format
* kafka.server.truststore.jks: The server's truststore in jks format
* kafka.client.keystore.jks: The client's keystore in jks format
* kafka.client.truststore.jks: The client's truststore in jks format
* key.pem: Client's private key
* certificate.pem: Client's signed certificate
* CARoot.pem: The CARoot certificate

With all these files, we can run kafka-explorer as follows:
```shell
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-p 5000:5000 \
-v <local directory containing client certificate, client key and CARoot>:/certs/kafka \
-e "LOG_LEVEL=DEBUG" \
-e "KAFKA_BOOTSTRAP_BROKERS=<list of bootstrap brokers>" \
-e "SECURITY_PROTOCOL=SSL" \
-e "SSL_CHECK_HOSTNAME=false" \
-e "SSL_CIPHERS=ALL" \
-e "SSL_CAFILE=/certs/kafka/CARoot.pem" \
-e "SSL_CERTFILE=/certs/kafka/certificate.pem" \
-e "SSL_KEYFILE=/certs/kafka/key.pem" \
--name kafka-explorer kafka-explorer:local-tls-build
```

46 changes: 40 additions & 6 deletions app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,45 @@
from app import app
from app import utils

import logging
import json
import uuid
import os

if __name__ != '__main__':
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)

# Required vars

try:
servers = os.environ['KAFKA_BOOTSTRAP_BROKERS']
except KeyError:
print('Error: The env var KAFKA_BOOTSTRAP_BROKERS needs to be defined!')
raise

# Optional vars

try:
api_version = tuple(map(lambda x: int(x), os.environ.get('BROKER_API_VERSION', '1.1.0').split('.')))
api_version = tuple(map(lambda x: int(x), os.environ.get('BROKER_API_VERSION', '1.1.1').split('.')))
except ValueError:
print('Error: The env var BROKER_API_VERSION has to be a string formed by numbers and dots!')
raise

report_interval = int(os.environ.get('REPORT_INTERVAL', 100))
security_protocol = os.environ.get('SECURITY_PROTOCOL', 'PLAINTEXT')
ssl_args = {
'ssl_check_hostname': os.environ.get('SSL_CHECK_HOSTNAME', 'True').lower() == 'true',
'ssl_cafile': os.environ.get('SSL_CAFILE', None),
'ssl_certfile': os.environ.get('SSL_CERTFILE', None),
'ssl_keyfile': os.environ.get('SSL_KEYFILE', None),
'ssl_password': os.environ.get('SSL_PASSWORD', None),
'ssl_ciphers': os.environ.get('SSL_CIPHERS', None)
}

if security_protocol != 'PLAINTEXT':
app.logger.debug(json.dumps(ssl_args))


@app.route('/')
Expand All @@ -31,7 +53,13 @@ def index():

@app.route('/topics')
def topics():
consumer = KafkaConsumer(bootstrap_servers=servers)
consumer = KafkaConsumer(
bootstrap_servers=servers,
api_version=api_version,
security_protocol=security_protocol,
ssl_context=utils.get_ssl_context(app.logger, ssl_args)
)
app.logger.debug(f'Consumer config: {consumer.config}')
results = consumer.topics()
consumer.close()
return json.dumps(sorted(results))
Expand All @@ -46,7 +74,9 @@ def count():
bootstrap_servers=servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
api_version=api_version
api_version=api_version,
security_protocol=security_protocol,
ssl_context=utils.get_ssl_context(app.logger, ssl_args)
)
consumer.poll(1)
result = utils.get_message_count(consumer, -1)
Expand All @@ -69,7 +99,9 @@ def generate(args):
bootstrap_servers=servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
api_version=api_version
api_version=api_version,
security_protocol=security_protocol,
ssl_context=utils.get_ssl_context(app.logger, ssl_args)
)

consumed = 0
Expand All @@ -92,12 +124,14 @@ def generate(args):
try:
jdata = json.loads(value)
if not any(utils.is_present(pair['key'], pair['value'], jdata) for pair in exclude):
if len(include) == 0 or all(utils.is_present(pair['key'], pair['value'], jdata) for pair in include):
if len(include) == 0 or \
all(utils.is_present(pair['key'], pair['value'], jdata) for pair in include):
yield utils.message_to_sse(message, key, value, consumed)
except ValueError:
pass
elif not any((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in exclude):
if len(include) == 0 or all((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in include):
if len(include) == 0 or \
all((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in include):
yield utils.message_to_sse(message, key, value, consumed)
consumer.close()

Expand Down
6 changes: 4 additions & 2 deletions app/static/app/search/search.tmpl.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<!-- Status bar -->
<md-progress-linear md-mode="indeterminate" ng-disabled="!ctrl.source || ctrl.error"></md-progress-linear>
<md-progress-linear class="md-warn" md-mode="query" ng-disabled="!ctrl.source || !ctrl.error"></md-progress-linear>
<div class="fixed-top-progressbar">
<md-progress-linear md-mode="indeterminate" ng-disabled="!ctrl.source || ctrl.error"></md-progress-linear>
<md-progress-linear class="md-warn" md-mode="query" ng-disabled="!ctrl.source || !ctrl.error"></md-progress-linear>
</div>

<md-content class="md-padding">

Expand Down
Loading

0 comments on commit a38a6b6

Please sign in to comment.