diff --git a/clients/__init__.py b/clients/__init__.py new file mode 100644 index 0000000..3961186 --- /dev/null +++ b/clients/__init__.py @@ -0,0 +1,2 @@ +from clients.idempotence_client import IdempotenceClient # noqa: F401 +from clients.kafka_consumer import KafkaConsumerClient # noqa: F401 diff --git a/idempotence_client/__init__.py b/clients/idempotence_client.py similarity index 90% rename from idempotence_client/__init__.py rename to clients/idempotence_client.py index e81bfd6..8b78556 100644 --- a/idempotence_client/__init__.py +++ b/clients/idempotence_client.py @@ -3,7 +3,8 @@ class IdempotenceClient: - def __init__(self, host, port, groupId, db=0, expire=14 * 24 * 3600, key_extractor=None): + def __init__(self, host, port, groupId, db=0, + expire=14 * 24 * 3600, key_extractor=None): self.redis = redis.StrictRedis(host=host, port=port, db=db) self.groupId = groupId self.expire = expire # default 2 weeks diff --git a/clients/kafka_consumer.py b/clients/kafka_consumer.py new file mode 100644 index 0000000..810d264 --- /dev/null +++ b/clients/kafka_consumer.py @@ -0,0 +1,55 @@ +import json +import logging + +from retrying import retry +from kafka.consumer.group import KafkaConsumer + + +class KafkaConsumerClient: + + def __init__(self, group_id, bootstrap_servers, topic, + processor, idempotenceClient=None, deserializer=None): + self.log = logging.getLogger(__name__) + self.topic = topic + self.processor = processor + self.deserializer = deserializer or self.defaultDeserializer + self.idempotenceClient = idempotenceClient or\ + DefaultIdempotenceClient() + self.consumer = self.connect(group_id, bootstrap_servers) + self.consumer.subscribe(topic) + + @retry(stop_max_attempt_number=10, wait_fixed=3000) + def connect(self, group_id, bootstrap_servers): + return KafkaConsumer( + value_deserializer=self.deserializer, + group_id=group_id, + bootstrap_servers=bootstrap_servers, + auto_offset_reset='latest') + + def start(self): + for message in self.consumer: + if not message.value: + continue + + if not self.idempotenceClient.isUnique(self.topic, message): + continue + + self.processor(message) + + self.idempotenceClient.markConsumedMessage(self.topic, message) + + def defaultDeserializer(self, m): + try: + return json.loads(m.decode('utf8')) + except Exception as ex: + self.log.error("Failed to decode message: %s", ex, exc_info=True) + return {} + + +class DefaultIdempotenceClient: + + def isUnique(self, topic, message): + return True + + def markConsumedMessage(self, topic, message): + pass diff --git a/idempotence_client/tests/features/idempotence_client.feature b/clients/tests/features/idempotence_client.feature similarity index 100% rename from idempotence_client/tests/features/idempotence_client.feature rename to clients/tests/features/idempotence_client.feature diff --git a/clients/tests/features/kafka_consumer.feature b/clients/tests/features/kafka_consumer.feature new file mode 100644 index 0000000..a549aa3 --- /dev/null +++ b/clients/tests/features/kafka_consumer.feature @@ -0,0 +1,48 @@ +Feature: KafkaConsumer + + Scenario: Consumer configuration + Given A default KafkaConsumerClient is instanciated + Then The KafkaConsumer should be instanciated with the correct params + + Scenario: Custom consumer configuration + Given KafkaConsumerClient with custom deserializer and idempotenceClinet + Then The KafkaConsumer should be instanciated with the correct params + And The idempotenceClient should be the provided client + + Scenario: Default serializer exception + Given A default KafkaConsumerClient is instanciated + When An invalid value is deserialized + Then The deserialized value should be an empty object + + Scenario: Default serializer + Given A default KafkaConsumerClient is instanciated + When A valid value is deserialized + Then The deserialized value should be the expected object + + Scenario: Default idempotenceClient unique message + Given A default KafkaConsumerClient is instanciated + And The consumer receives a message + When The message is processed + Then The processor should be called + + Scenario: Default idempotenceClient repeated message + Given A default KafkaConsumerClient is instanciated + And The consumer receives repeated messages + When The message is processed + Then The processor should be called for every message + + Scenario: Custom idempotenceClient unique message + Given A default KafkaConsumerClient is instanciated + And The consumer receives a message + And The idempotenceClient marks the message as unique + When The message is processed + Then The processor should be called + And The idempotenceClient should mark the message as consumed + + + Scenario: Custom idempotenceClient repeated message + Given A default KafkaConsumerClient is instanciated + And The consumer receives a message + And The idempotenceClient marks the message as repeated + When The message is processed + Then The processor should not be called \ No newline at end of file diff --git a/idempotence_client/tests/steps/step_idempotent_client.py b/clients/tests/steps/step_idempotent_client.py similarity index 86% rename from idempotence_client/tests/steps/step_idempotent_client.py rename to clients/tests/steps/step_idempotent_client.py index b904762..1dd518a 100644 --- a/idempotence_client/tests/steps/step_idempotent_client.py +++ b/clients/tests/steps/step_idempotent_client.py @@ -1,7 +1,12 @@ from unittest.mock import MagicMock from behave import given, when, then # pylint: disable=E0611 from hamcrest import assert_that -from idempotence_client import IdempotenceClient +from clients import IdempotenceClient + + +message = MagicMock() +message.value = {'test1': 'test2'} +message.topic = 'test3' @given('IdempotenceClient is instanciated') @@ -39,9 +44,7 @@ def step_impl_when_redis_error(context): @when('markConsumedMessage is called') def step_impl_when_markConsumedMessage_called(context): - context.message = MagicMock() - context.message.topic = 'Test' - context.message.value = '{"test": "test"}' + context.message = message context.message.__str__.return_value = 'Topic="{}", Value="{}"'.format( context.message.topic, context.message.value) key = context.IdempotenceClient.key_extractor(context.message) @@ -54,9 +57,7 @@ def step_impl_when_markConsumedMessage_called(context): @when('isUnique is called') def step_impl_when_isUnique_called(context): - context.message = MagicMock() - context.message.topic = 'Test' - context.message.value = '{"test": "test"}' + context.message = message context.message.__str__.return_value = 'Topic="{}", Value="{}"'.format( context.message.topic, context.message.value) key = context.IdempotenceClient.key_extractor(context.message) @@ -82,26 +83,17 @@ def step_impl_then_verify_correct_get_params(context): @then('isUnique should return true') def step_impl_then_verify_true(context): - message = MagicMock() - message.value = {'test': 'test'} - message.topic = 'test' assert_that(context.IdempotenceClient.isUnique( message.topic, message) is True) @then('isUnique should return false') def step_impl_then_verify_false(context): - message = MagicMock() - message.value = {'test': 'test'} - message.topic = 'test' assert_that(context.IdempotenceClient.isUnique( message.topic, message) is False) @then('markConsumedMessage not rase an error') def step_impl_then_verify_no_errors(context): - message = MagicMock() - message.value = {'test': 'test'} - message.topic = 'test' assert_that(context.IdempotenceClient.markConsumedMessage( message.topic, message) is None) diff --git a/clients/tests/steps/step_kafka_consumer.py b/clients/tests/steps/step_kafka_consumer.py new file mode 100644 index 0000000..0e980d2 --- /dev/null +++ b/clients/tests/steps/step_kafka_consumer.py @@ -0,0 +1,126 @@ +from unittest.mock import MagicMock, patch +from behave import given, when, then # pylint: disable=E0611 +from hamcrest import assert_that, equal_to +from clients import KafkaConsumerClient + +message = MagicMock() +message.value = {'test1': 'test2'} +message.topic = 'test3' + + +@given('A default KafkaConsumerClient is instanciated') +@patch('clients.kafka_consumer.KafkaConsumer') +def step_impl_given_default_KafkaConsumer_instance(context, kafkaConsumerMock): + context.processor = MagicMock() + context.group_id = 'test1' + context.bootstrap_servers = 'test2' + context.topic = 'test3' + context.kafkaConsumerMock = kafkaConsumerMock + context.kafkaConsumerClient = KafkaConsumerClient( + context.group_id, context.bootstrap_servers, + context.topic, context.processor) + context.deserializer = context.kafkaConsumerClient.deserializer + + +@given('KafkaConsumerClient with custom deserializer and idempotenceClinet') +@patch('clients.kafka_consumer.KafkaConsumer') +def step_impl_given_custom_KafkaConsumer_instance(context, kafkaConsumerMock): + context.processor = MagicMock() + context.group_id = 'test1' + context.bootstrap_servers = 'test2' + context.topic = 'test3' + context.kafkaConsumerMock = kafkaConsumerMock + context.deserializer = 'bla' + context.idempotenceClient = 'bla2' + context.kafkaConsumerClient = KafkaConsumerClient( + context.group_id, context.bootstrap_servers, + context.topic, context.processor, deserializer=context.deserializer, + idempotenceClient=context.idempotenceClient) + + +@given('The consumer receives a message') +def step_impl_given_message(context): + context.kafkaConsumerClient.consumer = [message] + + +@given('The idempotenceClient marks the message as unique') +def step_impl_given_flaged_unique_message(context): + idempotenceClient = MagicMock() + idempotenceClient.isUnique = MagicMock(return_value=True) + context.kafkaConsumerClient.idempotenceClient = idempotenceClient + + +@given('The idempotenceClient marks the message as repeated') +def step_impl_given_flagged_repeated_message(context): + idempotenceClient = MagicMock() + idempotenceClient.isUnique = MagicMock(return_value=False) + context.kafkaConsumerClient.idempotenceClient = idempotenceClient + + +@given('The consumer receives repeated messages') +def step_impl_given_repeated_message(context): + context.kafkaConsumerClient.consumer = [message, message] + + +@when('The message is processed') +def step_impl_when_msg_processed(context): + context.kafkaConsumerClient.start() + + +@when('An invalid value is deserialized') +def step_impl_when_invalid_message(context): + context.result = context.kafkaConsumerClient.deserializer( + bytes('not json', 'utf-8')) + + +@when('A valid value is deserialized') +def step_impl_when_valid_message(context): + context.expected = {'test1': 'test2'} + context.result = context.kafkaConsumerClient.deserializer( + bytes('{"test1": "test2"}', 'utf-8')) + + +@then('The processor should be called') +def step_impl_then_processor(context): + context.processor.assert_called_once_with(message) + + +@then('The idempotenceClient should mark the message as consumed') +def step_impl_then_markConsumed(context): + context.kafkaConsumerClient.idempotenceClient.markConsumedMessage.\ + assert_called_once_with(message.topic, message) + + +@then('The processor should not be called') +def step_impl_then_not_processor(context): + context.processor.call_count == 0 + + +@then('The processor should be called for every message') +def step_impl_then_processor_called_all_messages(context): + context.processor.call_count == len(context.kafkaConsumerClient.consumer) + + +@then('The deserialized value should be an empty object') +def step_impl_then_empty_object(context): + assert_that(context.result, equal_to({})) + + +@then('The deserialized value should be the expected object') +def step_impl_then_expected_object(context): + assert_that(context.result, equal_to(context.expected)) + + +@then('The KafkaConsumer should be instanciated with the correct params') +def step_impl_then_correct_consumer_configs(context): + context.kafkaConsumerMock.assert_called_with( + group_id=context.group_id, + bootstrap_servers=context.bootstrap_servers, + auto_offset_reset='latest', + value_deserializer=context.deserializer) + + +@then('The idempotenceClient should be the provided client') +def step_impl_then_correct_idempotence_client(context): + assert_that(context.kafkaConsumerClient.idempotenceClient, + equal_to(context.idempotenceClient)) diff --git a/requirements.txt b/requirements.txt index 2d9e90a..c48fa41 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ PyYAML==3.12 raven==6.2.1 -redis==2.10.6 \ No newline at end of file +redis==2.10.6 +retrying==1.3.3 +kafka-python==1.4.2 diff --git a/unit_tests.py b/unit_tests.py index 11edfbe..4f821de 100644 --- a/unit_tests.py +++ b/unit_tests.py @@ -20,7 +20,7 @@ print("------------------ TESTS ----------------") print("-----------------------------------------") print() -result = behave_main("idempotence_client/tests") +result = behave_main("clients/tests") cov.stop() print() print("-----------------------------------------")