Skip to content

Commit

Permalink
Merge pull request #10 from quintoandar/release_updates
Browse files Browse the repository at this point in the history
update kafka-python and readme
  • Loading branch information
thspinto authored Mar 16, 2020
2 parents b68585c + 684da49 commit 5ebb3fc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 13 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Build Status](https://travis-ci.org/quintoandar/python-kafka.svg?branch=master)](https://travis-ci.org/quintoandar/python-kafka)
[![Build Status](https://travis-ci.org/quintoandar/kafka-python.svg?branch=master)](https://travis-ci.org/github/quintoandar/kafka-python)

# QuintoAndar Kafka Python Library

Expand All @@ -14,7 +14,7 @@ A simple wrapper for kafka-python lib that uses redis to check duplicate events.
| ----------------- | -------------------------------------------- |
| group_id | The consumer group id |
| bootstrap_servers | The bootstrap servers |
| redis_host | The topic to consume from |
| redis_host | The redis host |
| redis_port | The function that processes the event |
| idempotent_key | Function which extract an unique identifier from the event |

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
redis==2.10.6
retrying==1.3.3
kafka-python==1.4.6
kafka-python==2.0.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

setup(
name="quintoandar_kafka",
version="2.0.0",
version="2.0.1",
author="QuintoAndar",
author_email="[email protected]",
description="Checks messages to avoid reprocessing events.",
Expand Down
15 changes: 6 additions & 9 deletions tests/steps/step_kafka_idempotent_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest.mock import MagicMock
from behave import given, when, then # pylint: disable=E0611
from hamcrest import assert_that, equal_to
from quintoandar_kafka import KafkaIdempotentConsumer
from quintoandar_kafka import KafkaIdempotentConsumer, KafkaSimpleConsumer

message = MagicMock()
message.value = {"test1": "test2"}
Expand All @@ -18,13 +18,12 @@ def step_impl_given_idempotent_kafka_consumer(context):
context.topic = "test3"
context.consumer = KafkaIdempotentConsumer.__new__(KafkaIdempotentConsumer)
context.consumer.idempotence_client = MagicMock()
context.consumer.config = {"consumer_timeout_ms": 0}
context.consumer._iterator = MagicMock()

context.consumer.config = {"consumer_timeout_ms": 0, "legacy_iterator": False}
context.consumer._closed = False

@when("The consumer receives an unique message")
def step_impl_when_message(context):
context.consumer._iterator.__next__ = MagicMock(return_value=message)
KafkaSimpleConsumer.__next__ = MagicMock(return_value=message)
context.consumer.idempotence_client.is_unique = MagicMock(return_value=True)
for m in context.consumer:
print(m)
Expand All @@ -37,10 +36,8 @@ def step_impl_when_repeated_message(context):
repeated_msg = MagicMock()
repeated_msg.topic = "repeated"
repeated_msg.value = "repeated"
context.consumer._iterator.__next__ = MagicMock()
context.consumer._iterator.__next__.side_effect = [repeated_msg, message]
context.consumer.idempotence_client.is_unique = MagicMock()
context.consumer.idempotence_client.is_unique.side_effect = [False, True]
KafkaSimpleConsumer.__next__ = MagicMock(side_effect=[repeated_msg, message])
context.consumer.idempotence_client.is_unique = MagicMock(side_effect = [False, True])
for m in context.consumer:
context.msg = m
break
Expand Down

0 comments on commit 5ebb3fc

Please sign in to comment.