From d86fe036c2c121f3ad094d1390910117fe6fc92b Mon Sep 17 00:00:00 2001 From: Thiago Pinto Date: Mon, 16 Mar 2020 09:47:07 -0300 Subject: [PATCH 1/3] update kafka-python and readme --- README.md | 4 ++-- requirements.txt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 71702cd..9dfbadd 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/quintoandar/python-kafka.svg?branch=master)](https://travis-ci.org/quintoandar/python-kafka) +[![Build Status](https://travis-ci.org/quintoandar/kafka-python.svg?branch=master)](https://travis-ci.org/github/quintoandar/kafka-python) # QuintoAndar Kafka Python Library @@ -14,7 +14,7 @@ A simple wrapper for kafka-python lib that uses redis to check duplicate events. | ----------------- | -------------------------------------------- | | group_id | The consumer group id | | bootstrap_servers | The bootstrap servers | -| redis_host | The topic to consume from | +| redis_host | The redis host | | redis_port | The function that processes the event | | idempotent_key | Function which extract an unique identifier from the event | diff --git a/requirements.txt b/requirements.txt index 16d9031..3261b2e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ redis==2.10.6 retrying==1.3.3 -kafka-python==1.4.6 +kafka-python==1.4.7 From 66b6c1a6905b67aeaf4944ae28315e3587acd673 Mon Sep 17 00:00:00 2001 From: Thiago Pinto Date: Mon, 16 Mar 2020 10:09:36 -0300 Subject: [PATCH 2/3] add new config --- quintoandar_kafka/consumer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/quintoandar_kafka/consumer.py b/quintoandar_kafka/consumer.py index ddde036..b269566 100644 --- a/quintoandar_kafka/consumer.py +++ b/quintoandar_kafka/consumer.py @@ -13,6 +13,7 @@ def __init__(self, *topics, **kwargs): kwargs["value_deserializer"] = lambda m: m.decode("utf8") if kwargs.get("auto_offset_reset") is None: kwargs["auto_offset_reset"] = "latest" + kwargs['legacy_iterator'] = False super().__init__(*topics, **kwargs) From 684da49dcc1341bdfab40b9692a360035cd93a75 Mon Sep 17 00:00:00 2001 From: Thiago Pinto Date: Mon, 16 Mar 2020 15:24:47 -0300 Subject: [PATCH 3/3] fix unit tests --- quintoandar_kafka/consumer.py | 1 - requirements.txt | 2 +- setup.py | 2 +- tests/steps/step_kafka_idempotent_consumer.py | 15 ++++++--------- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/quintoandar_kafka/consumer.py b/quintoandar_kafka/consumer.py index b269566..ddde036 100644 --- a/quintoandar_kafka/consumer.py +++ b/quintoandar_kafka/consumer.py @@ -13,7 +13,6 @@ def __init__(self, *topics, **kwargs): kwargs["value_deserializer"] = lambda m: m.decode("utf8") if kwargs.get("auto_offset_reset") is None: kwargs["auto_offset_reset"] = "latest" - kwargs['legacy_iterator'] = False super().__init__(*topics, **kwargs) diff --git a/requirements.txt b/requirements.txt index 3261b2e..d341e63 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ redis==2.10.6 retrying==1.3.3 -kafka-python==1.4.7 +kafka-python==2.0.1 diff --git a/setup.py b/setup.py index 3fcec6c..4ab4fb9 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setup( name="quintoandar_kafka", - version="2.0.0", + version="2.0.1", author="QuintoAndar", author_email="rodrigo.oliveira@quintoandar.com.br", description="Checks messages to avoid reprocessing events.", diff --git a/tests/steps/step_kafka_idempotent_consumer.py b/tests/steps/step_kafka_idempotent_consumer.py index 1b25adb..ac62f25 100644 --- a/tests/steps/step_kafka_idempotent_consumer.py +++ b/tests/steps/step_kafka_idempotent_consumer.py @@ -1,7 +1,7 @@ from unittest.mock import MagicMock from behave import given, when, then # pylint: disable=E0611 from hamcrest import assert_that, equal_to -from quintoandar_kafka import KafkaIdempotentConsumer +from quintoandar_kafka import KafkaIdempotentConsumer, KafkaSimpleConsumer message = MagicMock() message.value = {"test1": "test2"} @@ -18,13 +18,12 @@ def step_impl_given_idempotent_kafka_consumer(context): context.topic = "test3" context.consumer = KafkaIdempotentConsumer.__new__(KafkaIdempotentConsumer) context.consumer.idempotence_client = MagicMock() - context.consumer.config = {"consumer_timeout_ms": 0} - context.consumer._iterator = MagicMock() - + context.consumer.config = {"consumer_timeout_ms": 0, "legacy_iterator": False} + context.consumer._closed = False @when("The consumer receives an unique message") def step_impl_when_message(context): - context.consumer._iterator.__next__ = MagicMock(return_value=message) + KafkaSimpleConsumer.__next__ = MagicMock(return_value=message) context.consumer.idempotence_client.is_unique = MagicMock(return_value=True) for m in context.consumer: print(m) @@ -37,10 +36,8 @@ def step_impl_when_repeated_message(context): repeated_msg = MagicMock() repeated_msg.topic = "repeated" repeated_msg.value = "repeated" - context.consumer._iterator.__next__ = MagicMock() - context.consumer._iterator.__next__.side_effect = [repeated_msg, message] - context.consumer.idempotence_client.is_unique = MagicMock() - context.consumer.idempotence_client.is_unique.side_effect = [False, True] + KafkaSimpleConsumer.__next__ = MagicMock(side_effect=[repeated_msg, message]) + context.consumer.idempotence_client.is_unique = MagicMock(side_effect = [False, True]) for m in context.consumer: context.msg = m break