Skip to content

Commit

Permalink
Fixed logging + Bumped versions
Browse files Browse the repository at this point in the history
  • Loading branch information
PabloL007 committed Oct 5, 2020
1 parent a38a6b6 commit f428a17
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 74 deletions.
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.8.0-alpine3.10
FROM python:3.8.2-alpine3.11

RUN mkdir -p /opt/docker

Expand All @@ -12,6 +12,8 @@ RUN apk add --no-cache gcc musl-dev && \

ADD . /opt/docker

ENV VERSION=0.9.1

EXPOSE 5000

CMD [ "gunicorn", "-c", "gunicorn.config.py", "app:app"]
ENTRYPOINT [ "gunicorn", "-c", "gunicorn.config.py", "app:app"]
45 changes: 24 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ For a simple deployment the following command can be used (filling in the broker
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-p 5000:5000 \
-e "KAFKA_BOOTSTRAP_BROKERS=<list of bootstrap brokers>" \
--name kafka-explorer qvantel/kafka-explorer:0.9.0
--name kafka-explorer qvantel/kafka-explorer:0.9.1
```

Enabling one-way TLS (without server validation):
Enabling one-way auth TLS (without server validation):

```shell
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
Expand All @@ -29,10 +29,10 @@ docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-e "SECURITY_PROTOCOL=SSL" \
-e "SSL_CHECK_HOSTNAME=false" \
-e "SSL_CIPHERS=<list of ssl ciphers>" \
--name kafka-explorer qvantel/kafka-explorer:0.9.0
--name kafka-explorer qvantel/kafka-explorer:0.9.1
```

Enabling two-way TLS:
Enabling two-way auth TLS:

```shell
docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
Expand All @@ -46,27 +46,30 @@ docker run -d -m 256m --log-driver json-file --log-opt max-size="1m" \
-e "SSL_CAFILE=<container path to pem ca>" \
-e "SSL_CERTFILE=<container path to pem cert>" \
-e "SSL_KEYFILE=<container path to client key>" \
--name kafka-explorer qvantel/kafka-explorer:0.9.0
--name kafka-explorer qvantel/kafka-explorer:0.9.1
```

The following environment variables are available:

- `KAFKA_BOOTSTRAP_BROKERS`: List of comma separated broker `host:port` pairs.
- `LOG_LEVEL`: Gunicorn logging level, INFO by default
- `WORKERS`: Number of gevent workers, min(10, (cpus * 2) + 1) by default
- `REPORT_INTERVAL`: Interval at which a metadata event with a refreshed total message count will be generated (every
100 consumed messages by default)
- `BROKER_API_VERSION`: Kafka API version to use, '1.1.0' by default
- `SECURITY_PROTOCOL`: Protocol used to communicate with brokers. Valid values are: PLAINTEXT (by default), SSL
- `SSL_CHECK_HOSTNAME`: Flag to configure whether ssl handshake should verify that the certificate matches the brokers
hostname ('true' by default, case insensitive, anything else will be evaluated to false)
- `SSL_CAFILE`: Optional filename of ca file to use in certificate verification (None by default)
- `SSL_CERTFILE`: Optional filename of file in pem format containing the client certificate, as well as any ca
certificates needed to establish the certificate’s authenticity (None by default)
- `SSL_KEYFILE`: Optional filename containing the client private key (None by default)
- `SSL_PASSWORD`: Optional password to be used when loading the certificate chain (None by default)
- `SSL_CIPHERS`: Optionally set the available ciphers for ssl connections. It should be a string in the OpenSSL cipher
list format.
| Variable | Required | Default | Description |
|---------------------------|----------|------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| KAFKA_BOOTSTRAP_BROKERS | YES | N/A | List of comma separated broker `host:port` pairs |
| LOG_LEVEL | NO | INFO | Application/root log level, supported values are `TRACE`, `DEBUG`, `INFO`, `WARNING`, `ERROR` and `CRITICAL` |
| GUNICORN_LOG_LEVEL | NO | INFO | Gunicorn log level, supported values are `DEBUG`, `INFO`, `WARNING`, `ERROR` and `CRITICAL` |
| KAFKA_LOG_LEVEL | NO | ERROR | Kafka log level, supported values are `DEBUG`, `INFO`, `WARNING`, `ERROR` and `CRITICAL` |
| MARATHON_APP_DOCKER_IMAGE | NO | qvantel/kafka-explorer:${VERSION}? | Included in the `artifact_id` field of log messages, gets filled in automatically when ran through Marathon |
| SERVICE_5000_NAME | NO | $SERVICE_NAME | Included in the `service_name` field of the log messages. If set, overrides whatever is defined in `$SERVICE_NAME` |
| SERVICE_NAME | NO | kafka-explorer | Included in the `service_name` field of the log messages |
| WORKERS | NO | min(10, (cpus * 2) + 1) | Number of gevent workers or in simpler terms, how many processes are started to handle client requests |
| REPORT_INTERVAL | NO | 100 | Interval at which a metadata event with a refreshed total message count will be generated, defined as number of consumed messages between reports |
| BROKER_API_VERSION | NO | 1.1.1 | Kafka API version to use |
| SECURITY_PROTOCOL | NO | PLAINTEXT | Protocol used to communicate with brokers. Valid values are: PLAINTEXT (by default), SSL |
| SSL_CHECK_HOSTNAME | NO | true | Flag to configure whether ssl handshake should verify that the certificate matches the broker's hostname (case insensitive, anything other than 'true' will be evaluated to false) |
| SSL_CAFILE | NO | None | Optional filename of ca file to use in certificate verification |
| SSL_CERTFILE | NO | None | Optional filename of file in pem format containing the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity |
| SSL_KEYFILE | NO | None | Optional filename containing the client private key |
| SSL_PASSWORD | NO | None | Optional password to be used when loading the certificate chain |
| SSL_CIPHERS | NO | None | Optionally set the available ciphers for ssl connections. It should be a string in the OpenSSL cipher list format |

## Use

Expand Down
63 changes: 37 additions & 26 deletions app/routes.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
from flask import render_template, Response, request
from kafka import KafkaConsumer
from app import app
from app import utils

import logging
import json
import uuid
import os

if __name__ != '__main__':
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(gunicorn_logger.level)
from flask import render_template, Response, request
from kafka import KafkaConsumer

from app import app
from app import utils

# Required vars

try:
servers = os.environ['KAFKA_BOOTSTRAP_BROKERS']
except KeyError:
print('Error: The env var KAFKA_BOOTSTRAP_BROKERS needs to be defined!')
app.logger.error('Error: The env var KAFKA_BOOTSTRAP_BROKERS needs to be defined!')
raise
app.logger.debug(f'Using {servers} as bootstrap brokers')

# Optional vars

try:
api_version = tuple(map(lambda x: int(x), os.environ.get('BROKER_API_VERSION', '1.1.1').split('.')))
except ValueError:
print('Error: The env var BROKER_API_VERSION has to be a string formed by numbers and dots!')
app.logger.error('Error: The env var BROKER_API_VERSION has to be a string formed by numbers and dots!')
raise
app.logger.debug(f'Using {api_version} as api version')

report_interval = int(os.environ.get('REPORT_INTERVAL', 100))
app.logger.debug(f'Using {report_interval} as report interval')

security_protocol = os.environ.get('SECURITY_PROTOCOL', 'PLAINTEXT')
ssl_args = {
'ssl_check_hostname': os.environ.get('SSL_CHECK_HOSTNAME', 'True').lower() == 'true',
Expand All @@ -39,7 +38,6 @@
'ssl_password': os.environ.get('SSL_PASSWORD', None),
'ssl_ciphers': os.environ.get('SSL_CIPHERS', None)
}

if security_protocol != 'PLAINTEXT':
app.logger.debug(json.dumps(ssl_args))

Expand All @@ -54,12 +52,14 @@ def index():
@app.route('/topics')
def topics():
consumer = KafkaConsumer(
group_id=f'kafka-explorer-{uuid.uuid4()}',
bootstrap_servers=servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
api_version=api_version,
security_protocol=security_protocol,
ssl_context=utils.get_ssl_context(app.logger, ssl_args)
)
app.logger.debug(f'Consumer config: {consumer.config}')
results = consumer.topics()
consumer.close()
return json.dumps(sorted(results))
Expand All @@ -70,7 +70,7 @@ def count():
topic = request.args.get('topic')
consumer = KafkaConsumer(
topic,
group_id='kafka-explorer-' + str(uuid.uuid4()),
group_id=f'kafka-explorer-{uuid.uuid4()}',
bootstrap_servers=servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
Expand All @@ -92,10 +92,11 @@ def generate(args):
exclude = utils.decode_search_pairs(args.get('exclude'))
include = utils.decode_search_pairs(args.get('include'))
start = int(args.get('start', '-1'))
app.logger.debug(f'Started {search_type} search in topic {topic} for {include} excluding {exclude}')

consumer = KafkaConsumer(
topic,
group_id='kafka-explorer-' + str(uuid.uuid4()),
group_id=f'kafka-explorer-{uuid.uuid4()}',
bootstrap_servers=servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
Expand All @@ -114,25 +115,35 @@ def generate(args):
consumer.seek_to_beginning() # Reset offsets in case poll consumed any messages

for message in consumer:
key = 'None' if message.key is None else message.key.decode('utf-8')
value = message.value.decode('utf-8')
try:
key = 'None' if message.key is None else message.key.decode('utf-8')
except UnicodeDecodeError:
app.logger.warning(f'There was an error decoding the key for a message with offset {message.offset} in '
f'partition {message.partition} for the topic {topic}. Skipping...')
continue
try:
value = message.value.decode('utf-8')
except UnicodeDecodeError:
app.logger.warning(f'There was an error decoding the value for a message with offset {message.offset} '
f'in partition {message.partition} for the topic {topic}. Skipping...')
continue
consumed = consumed + 1
if consumed % report_interval == 0 or consumed >= last_count:
last_count = utils.get_message_count(consumer, start)
yield utils.consumer_meta_to_sse(consumer, last_count, consumed)
if search_type == 'json':
try:
jdata = json.loads(value)
if not any(utils.is_present(pair['key'], pair['value'], jdata) for pair in exclude):
if len(include) == 0 or \
all(utils.is_present(pair['key'], pair['value'], jdata) for pair in include):
yield utils.message_to_sse(message, key, value, consumed)
if not any(utils.is_present(pair['key'], pair['value'], jdata) for pair in exclude) and \
(len(include) == 0 or
all(utils.is_present(pair['key'], pair['value'], jdata) for pair in include)):
yield utils.message_to_sse(message, key, value, consumed)
except ValueError:
pass
elif not any((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in exclude):
if len(include) == 0 or \
all((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in include):
yield utils.message_to_sse(message, key, value, consumed)
elif not any((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in exclude) and \
(len(include) == 0 or
all((pair['value'] in value or (key != 'None' and pair['value'] in key)) for pair in include)):
yield utils.message_to_sse(message, key, value, consumed)
consumer.close()

return Response(generate(request.args.copy()), mimetype='text/event-stream')
16 changes: 13 additions & 3 deletions app/static/app/search/search.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ angular.module('search')
searchCtrl.now = new Date();
searchCtrl.currentSearch = null;
searchCtrl.messages = [];
// Used for mitigating duplicates caused by https://github.com/dpkp/kafka-python/issues/1985
searchCtrl.ids = [];
searchCtrl.matched = 0;
searchCtrl.topics = [];
searchCtrl.error = false;
Expand Down Expand Up @@ -63,9 +65,9 @@ angular.module('search')
};

this.encodedPairs = function () {
result = '';
include = this.searchPairs.filter(function (pair) { return !pair.exclude; });
exclude = this.searchPairs.filter(function (pair) { return pair.exclude; });
var result = '';
var include = this.searchPairs.filter(function (pair) { return !pair.exclude; });
var exclude = this.searchPairs.filter(function (pair) { return pair.exclude; });
result += '&include=';
include.forEach(function (pair, index) {
result += pair.key + '<|,|>' + pair.value;
Expand All @@ -92,14 +94,20 @@ angular.module('search')
var onMessage = function (msg) {
$scope.$apply(function () {
var parsedMsg = JSON.parse(msg.data);
var mID = parsedMsg.partition + ';' + parsedMsg.offset + ';' + parsedMsg.timestamp;
if(searchCtrl.ids.includes(mID)){
return;
}
searchCtrl.matched += 1;
searchCtrl.searchMetadata.consumed = parsedMsg.consumed;
delete parsedMsg.consumed;
searchCtrl.messages.unshift(parsedMsg);
searchCtrl.ids.unshift(mID);
if(searchCtrl.messages.length >= searchCtrl.currentSearch.limit && searchCtrl.currentSearch.stop){
searchCtrl.stop();
} else if(searchCtrl.messages.length > searchCtrl.currentSearch.limit && !searchCtrl.currentSearch.stop) {
searchCtrl.messages.pop();
searchCtrl.ids.pop();
}
});
};
Expand All @@ -115,6 +123,7 @@ angular.module('search')
if(searchCtrl.error){
searchCtrl.error = false;
searchCtrl.messages = [];
searchCtrl.ids = [];
searchCtrl.matched = 0;
}
});
Expand All @@ -141,6 +150,7 @@ angular.module('search')
searchCtrl.stop();
searchCtrl.currentSearch = searchCtrl.searchParams.clone();
searchCtrl.messages = [];
searchCtrl.ids = [];
searchCtrl.matched = 0;
searchCtrl.searchMetadata = {
'partitions': -1,
Expand Down
18 changes: 9 additions & 9 deletions app/static/app/search/search.tmpl.html
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@
</md-card-title>
<div layout="row" layout-align="space-between center" ng-show="ctrl.searchMetadata.partitions != -1">
<div layot="column" class="md-body-1">
<p><b>Partitions:</b> {{ ctrl.searchMetadata.partitions }}</p>
<p><b>Total:</b> {{ ctrl.searchMetadata.total }}</p>
<p><b>Scanned:</b> {{ ctrl.searchMetadata.consumed }}</p>
<p><b>Matched:</b> {{ ctrl.matched }}</p>
<p><strong>Partitions:</strong> {{ ctrl.searchMetadata.partitions }}</p>
<p><strong>Total:</strong> {{ ctrl.searchMetadata.total }}</p>
<p><strong>Scanned:</strong> {{ ctrl.searchMetadata.consumed }}</p>
<p><strong>Matched:</strong> {{ ctrl.matched }}</p>
</div>
<div layout-margin>
<md-progress-circular
Expand All @@ -202,13 +202,13 @@

<md-card-content>
<md-list class="md-dense" flex>
<md-list-item class="md-3-line" ng-repeat="message in ctrl.messages track by message.partition + ';' + message.offset">
<md-list-item class="md-3-line" ng-repeat="message in ctrl.messages track by message.partition + ';' + message.offset + ';' + message.timestamp">
<md-checkbox ng-model="message.selected" ng-show="ctrl.showBoxes"></md-checkbox>
<div class="md-list-item-text" layout="column">
<h3><b>Match #{{ ctrl.messages.length - $index }}</b></h3>
<h4 class="md-accent md-hue-2"><b>Partition:</b> {{ message.partition }} | <b>Offset:</b> {{ message.offset }} | <b>Timestamp:</b> {{ message.timestamp | date:'medium' }}</h4>
<p ng-if="message.headers.length > 0"><b>Headers:</b> {{message.headers}}</p>
<p ng-if="message.key !== 'None'"><b>Key:</b> <span ng-bind-html="message.key | multiHighlight:ctrl.currentSearch.searchPairs"></span></p>
<h3><strong>Match #{{ ctrl.messages.length - $index }}</strong></h3>
<h4 class="md-accent md-hue-2"><strong>Partition:</strong> {{ message.partition }} | <strong>Offset:</strong> {{ message.offset }} | <strong>Timestamp:</strong> {{ message.timestamp | date:'medium' }}</h4>
<p ng-if="message.headers.length > 0"><strong>Headers:</strong> {{message.headers}}</p>
<p ng-if="message.key !== 'None'"><strong>Key:</strong> <span ng-bind-html="message.key | multiHighlight:ctrl.currentSearch.searchPairs"></span></p>
<p ng-bind-html="message.value | multiHighlight:ctrl.currentSearch.searchPairs"></p>
</div>
</md-list-item>
Expand Down
10 changes: 5 additions & 5 deletions app/static/style/main.css
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ md-list-item.md-3-line .md-list-item-text, md-list-item.md-3-line>.md-no-style .
}

.comp-select-header .select-header {
box-shadow: 0 1px 0 0 rgba(0, 0, 0, 0.1), 0 0 0 0 rgba(0, 0, 0, 0.14), 0 0 0 0 rgba(0, 0, 0, 0.12);
box-shadow: 0 1px 0 0 rgba(0, 0, 0, .1), 0 0 0 0 rgba(0, 0, 0, .14), 0 0 0 0 rgba(0, 0, 0, .12);
padding-left: 10.667px;
height: 48px;
cursor: pointer;
Expand All @@ -42,10 +42,10 @@ md-list-item.md-3-line .md-list-item-text, md-list-item.md-3-line>.md-no-style .
}

.fixed-top-progressbar {
position:fixed;
top:0px;
z-index:999999;
width:100%;
position: fixed;
top: 0;
z-index: 999999;
width: 100%;
}

.fixed-top-progressbar ._md-progress-linear-disabled {
Expand Down
Loading

0 comments on commit f428a17

Please sign in to comment.