Skip to content

Commit

Permalink
Merge pull request #3 from quintoandar/kafka_consumer
Browse files Browse the repository at this point in the history
Kafka consumer
  • Loading branch information
thspinto authored Jun 27, 2018
2 parents 1fe5e25 + 24bc57b commit 65e3edf
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 19 deletions.
2 changes: 2 additions & 0 deletions clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from clients.idempotence_client import IdempotenceClient # noqa: F401
from clients.kafka_consumer import KafkaConsumerClient # noqa: F401
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@


class IdempotenceClient:
def __init__(self, host, port, groupId, db=0, expire=14 * 24 * 3600, key_extractor=None):
def __init__(self, host, port, groupId, db=0,
expire=14 * 24 * 3600, key_extractor=None):
self.redis = redis.StrictRedis(host=host, port=port, db=db)
self.groupId = groupId
self.expire = expire # default 2 weeks
Expand Down
55 changes: 55 additions & 0 deletions clients/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json
import logging

from retrying import retry
from kafka.consumer.group import KafkaConsumer


class KafkaConsumerClient:

def __init__(self, group_id, bootstrap_servers, topic,
processor, idempotenceClient=None, deserializer=None):
self.log = logging.getLogger(__name__)
self.topic = topic
self.processor = processor
self.deserializer = deserializer or self.defaultDeserializer
self.idempotenceClient = idempotenceClient or\
DefaultIdempotenceClient()
self.consumer = self.connect(group_id, bootstrap_servers)
self.consumer.subscribe(topic)

@retry(stop_max_attempt_number=10, wait_fixed=3000)
def connect(self, group_id, bootstrap_servers):
return KafkaConsumer(
value_deserializer=self.deserializer,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest')

def start(self):
for message in self.consumer:
if not message.value:
continue

if not self.idempotenceClient.isUnique(self.topic, message):
continue

self.processor(message)

self.idempotenceClient.markConsumedMessage(self.topic, message)

def defaultDeserializer(self, m):
try:
return json.loads(m.decode('utf8'))
except Exception as ex:
self.log.error("Failed to decode message: %s", ex, exc_info=True)
return {}


class DefaultIdempotenceClient:

def isUnique(self, topic, message):
return True

def markConsumedMessage(self, topic, message):
pass
48 changes: 48 additions & 0 deletions clients/tests/features/kafka_consumer.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Feature: KafkaConsumer

Scenario: Consumer configuration
Given A default KafkaConsumerClient is instanciated
Then The KafkaConsumer should be instanciated with the correct params

Scenario: Custom consumer configuration
Given KafkaConsumerClient with custom deserializer and idempotenceClinet
Then The KafkaConsumer should be instanciated with the correct params
And The idempotenceClient should be the provided client

Scenario: Default serializer exception
Given A default KafkaConsumerClient is instanciated
When An invalid value is deserialized
Then The deserialized value should be an empty object

Scenario: Default serializer
Given A default KafkaConsumerClient is instanciated
When A valid value is deserialized
Then The deserialized value should be the expected object

Scenario: Default idempotenceClient unique message
Given A default KafkaConsumerClient is instanciated
And The consumer receives a message
When The message is processed
Then The processor should be called

Scenario: Default idempotenceClient repeated message
Given A default KafkaConsumerClient is instanciated
And The consumer receives repeated messages
When The message is processed
Then The processor should be called for every message

Scenario: Custom idempotenceClient unique message
Given A default KafkaConsumerClient is instanciated
And The consumer receives a message
And The idempotenceClient marks the message as unique
When The message is processed
Then The processor should be called
And The idempotenceClient should mark the message as consumed


Scenario: Custom idempotenceClient repeated message
Given A default KafkaConsumerClient is instanciated
And The consumer receives a message
And The idempotenceClient marks the message as repeated
When The message is processed
Then The processor should not be called
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from unittest.mock import MagicMock
from behave import given, when, then # pylint: disable=E0611
from hamcrest import assert_that
from idempotence_client import IdempotenceClient
from clients import IdempotenceClient


message = MagicMock()
message.value = {'test1': 'test2'}
message.topic = 'test3'


@given('IdempotenceClient is instanciated')
Expand Down Expand Up @@ -39,9 +44,7 @@ def step_impl_when_redis_error(context):

@when('markConsumedMessage is called')
def step_impl_when_markConsumedMessage_called(context):
context.message = MagicMock()
context.message.topic = 'Test'
context.message.value = '{"test": "test"}'
context.message = message
context.message.__str__.return_value = 'Topic="{}", Value="{}"'.format(
context.message.topic, context.message.value)
key = context.IdempotenceClient.key_extractor(context.message)
Expand All @@ -54,9 +57,7 @@ def step_impl_when_markConsumedMessage_called(context):

@when('isUnique is called')
def step_impl_when_isUnique_called(context):
context.message = MagicMock()
context.message.topic = 'Test'
context.message.value = '{"test": "test"}'
context.message = message
context.message.__str__.return_value = 'Topic="{}", Value="{}"'.format(
context.message.topic, context.message.value)
key = context.IdempotenceClient.key_extractor(context.message)
Expand All @@ -82,26 +83,17 @@ def step_impl_then_verify_correct_get_params(context):

@then('isUnique should return true')
def step_impl_then_verify_true(context):
message = MagicMock()
message.value = {'test': 'test'}
message.topic = 'test'
assert_that(context.IdempotenceClient.isUnique(
message.topic, message) is True)


@then('isUnique should return false')
def step_impl_then_verify_false(context):
message = MagicMock()
message.value = {'test': 'test'}
message.topic = 'test'
assert_that(context.IdempotenceClient.isUnique(
message.topic, message) is False)


@then('markConsumedMessage not rase an error')
def step_impl_then_verify_no_errors(context):
message = MagicMock()
message.value = {'test': 'test'}
message.topic = 'test'
assert_that(context.IdempotenceClient.markConsumedMessage(
message.topic, message) is None)
126 changes: 126 additions & 0 deletions clients/tests/steps/step_kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from unittest.mock import MagicMock, patch
from behave import given, when, then # pylint: disable=E0611
from hamcrest import assert_that, equal_to
from clients import KafkaConsumerClient

message = MagicMock()
message.value = {'test1': 'test2'}
message.topic = 'test3'


@given('A default KafkaConsumerClient is instanciated')
@patch('clients.kafka_consumer.KafkaConsumer')
def step_impl_given_default_KafkaConsumer_instance(context, kafkaConsumerMock):
context.processor = MagicMock()
context.group_id = 'test1'
context.bootstrap_servers = 'test2'
context.topic = 'test3'
context.kafkaConsumerMock = kafkaConsumerMock
context.kafkaConsumerClient = KafkaConsumerClient(
context.group_id, context.bootstrap_servers,
context.topic, context.processor)
context.deserializer = context.kafkaConsumerClient.deserializer


@given('KafkaConsumerClient with custom deserializer and idempotenceClinet')
@patch('clients.kafka_consumer.KafkaConsumer')
def step_impl_given_custom_KafkaConsumer_instance(context, kafkaConsumerMock):
context.processor = MagicMock()
context.group_id = 'test1'
context.bootstrap_servers = 'test2'
context.topic = 'test3'
context.kafkaConsumerMock = kafkaConsumerMock
context.deserializer = 'bla'
context.idempotenceClient = 'bla2'
context.kafkaConsumerClient = KafkaConsumerClient(
context.group_id, context.bootstrap_servers,
context.topic, context.processor, deserializer=context.deserializer,
idempotenceClient=context.idempotenceClient)


@given('The consumer receives a message')
def step_impl_given_message(context):
context.kafkaConsumerClient.consumer = [message]


@given('The idempotenceClient marks the message as unique')
def step_impl_given_flaged_unique_message(context):
idempotenceClient = MagicMock()
idempotenceClient.isUnique = MagicMock(return_value=True)
context.kafkaConsumerClient.idempotenceClient = idempotenceClient


@given('The idempotenceClient marks the message as repeated')
def step_impl_given_flagged_repeated_message(context):
idempotenceClient = MagicMock()
idempotenceClient.isUnique = MagicMock(return_value=False)
context.kafkaConsumerClient.idempotenceClient = idempotenceClient


@given('The consumer receives repeated messages')
def step_impl_given_repeated_message(context):
context.kafkaConsumerClient.consumer = [message, message]


@when('The message is processed')
def step_impl_when_msg_processed(context):
context.kafkaConsumerClient.start()


@when('An invalid value is deserialized')
def step_impl_when_invalid_message(context):
context.result = context.kafkaConsumerClient.deserializer(
bytes('not json', 'utf-8'))


@when('A valid value is deserialized')
def step_impl_when_valid_message(context):
context.expected = {'test1': 'test2'}
context.result = context.kafkaConsumerClient.deserializer(
bytes('{"test1": "test2"}', 'utf-8'))


@then('The processor should be called')
def step_impl_then_processor(context):
context.processor.assert_called_once_with(message)


@then('The idempotenceClient should mark the message as consumed')
def step_impl_then_markConsumed(context):
context.kafkaConsumerClient.idempotenceClient.markConsumedMessage.\
assert_called_once_with(message.topic, message)


@then('The processor should not be called')
def step_impl_then_not_processor(context):
context.processor.call_count == 0


@then('The processor should be called for every message')
def step_impl_then_processor_called_all_messages(context):
context.processor.call_count == len(context.kafkaConsumerClient.consumer)


@then('The deserialized value should be an empty object')
def step_impl_then_empty_object(context):
assert_that(context.result, equal_to({}))


@then('The deserialized value should be the expected object')
def step_impl_then_expected_object(context):
assert_that(context.result, equal_to(context.expected))


@then('The KafkaConsumer should be instanciated with the correct params')
def step_impl_then_correct_consumer_configs(context):
context.kafkaConsumerMock.assert_called_with(
group_id=context.group_id,
bootstrap_servers=context.bootstrap_servers,
auto_offset_reset='latest',
value_deserializer=context.deserializer)


@then('The idempotenceClient should be the provided client')
def step_impl_then_correct_idempotence_client(context):
assert_that(context.kafkaConsumerClient.idempotenceClient,
equal_to(context.idempotenceClient))
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
PyYAML==3.12
raven==6.2.1
redis==2.10.6
redis==2.10.6
retrying==1.3.3
kafka-python==1.4.2
2 changes: 1 addition & 1 deletion unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
print("------------------ TESTS ----------------")
print("-----------------------------------------")
print()
result = behave_main("idempotence_client/tests")
result = behave_main("clients/tests")
cov.stop()
print()
print("-----------------------------------------")
Expand Down

0 comments on commit 65e3edf

Please sign in to comment.