From 15a7365e37a451df93c1de96b479ac1f4a62e34b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 21 Sep 2023 11:52:18 -0700 Subject: [PATCH 1/7] Whitespace change to trigger CI --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 6a5c374cb5..34a11a9071 100644 --- a/README.md +++ b/README.md @@ -39,3 +39,4 @@ NVIDIA Morpheus is an open AI application framework that provides cybersecurity Full documentation for the latest official release is available at [https://docs.nvidia.com/morpheus/](https://docs.nvidia.com/morpheus/). + From 85557dcbb632c9363a9383ca415eb774bc84be52 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 21 Sep 2023 15:54:10 -0700 Subject: [PATCH 2/7] Revert "Whitespace change to trigger CI" This reverts commit 15a7365e37a451df93c1de96b479ac1f4a62e34b. --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 34a11a9071..6a5c374cb5 100644 --- a/README.md +++ b/README.md @@ -39,4 +39,3 @@ NVIDIA Morpheus is an open AI application framework that provides cybersecurity Full documentation for the latest official release is available at [https://docs.nvidia.com/morpheus/](https://docs.nvidia.com/morpheus/). - From 948f4ef9c2a9d1e5fe64c39ce2e0eae371884639 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 21 Sep 2023 15:55:35 -0700 Subject: [PATCH 3/7] Declare OffsetChecker as a MRC component so that it executes prior to the source's yield returns. Adjust the offset checker to pass if there is a new partition introduced. --- tests/test_kafka_source_stage_pipe.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/test_kafka_source_stage_pipe.py b/tests/test_kafka_source_stage_pipe.py index 35b332e4c7..28d5cd43c1 100644 --- a/tests/test_kafka_source_stage_pipe.py +++ b/tests/test_kafka_source_stage_pipe.py @@ -134,13 +134,15 @@ def _offset_checker(self, x): new_offsets = self._client.list_consumer_group_offsets(self._group_id) if self._offsets is not None: - for (topic_partition, prev_offset) in self._offsets.items(): - new_offset = new_offsets[topic_partition] + at_least_one_gt = len(new_offsets) > len(self._offsets) + if not at_least_one_gt: + for (topic_partition, prev_offset) in self._offsets.items(): + new_offset = new_offsets[topic_partition] - assert new_offset.offset >= prev_offset.offset + assert new_offset.offset >= prev_offset.offset - if new_offset.offset > prev_offset.offset: - at_least_one_gt = True + if new_offset.offset > prev_offset.offset: + at_least_one_gt = True assert at_least_one_gt @@ -149,7 +151,7 @@ def _offset_checker(self, x): return x def _build_single(self, builder: mrc.Builder, input_stream): - node = builder.make_node(self.unique_name, ops.map(self._offset_checker)) + node = builder.make_node_component(self.unique_name, ops.map(self._offset_checker)) builder.make_edge(input_stream[0], node) return node, input_stream[1] From b3d73ae8d443094fb50c38eba45dc305feb33ccd Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 21 Sep 2023 16:21:08 -0700 Subject: [PATCH 4/7] Revert "Declare OffsetChecker as a MRC component so that it executes prior to the source's yield returns." This reverts commit 948f4ef9c2a9d1e5fe64c39ce2e0eae371884639. --- tests/test_kafka_source_stage_pipe.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/test_kafka_source_stage_pipe.py b/tests/test_kafka_source_stage_pipe.py index 28d5cd43c1..35b332e4c7 100644 --- a/tests/test_kafka_source_stage_pipe.py +++ b/tests/test_kafka_source_stage_pipe.py @@ -134,15 +134,13 @@ def _offset_checker(self, x): new_offsets = self._client.list_consumer_group_offsets(self._group_id) if self._offsets is not None: - at_least_one_gt = len(new_offsets) > len(self._offsets) - if not at_least_one_gt: - for (topic_partition, prev_offset) in self._offsets.items(): - new_offset = new_offsets[topic_partition] + for (topic_partition, prev_offset) in self._offsets.items(): + new_offset = new_offsets[topic_partition] - assert new_offset.offset >= prev_offset.offset + assert new_offset.offset >= prev_offset.offset - if new_offset.offset > prev_offset.offset: - at_least_one_gt = True + if new_offset.offset > prev_offset.offset: + at_least_one_gt = True assert at_least_one_gt @@ -151,7 +149,7 @@ def _offset_checker(self, x): return x def _build_single(self, builder: mrc.Builder, input_stream): - node = builder.make_node_component(self.unique_name, ops.map(self._offset_checker)) + node = builder.make_node(self.unique_name, ops.map(self._offset_checker)) builder.make_edge(input_stream[0], node) return node, input_stream[1] From 577bd5bf66c976ddc8e1a0636bc5faa23f2a71ec Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Sep 2023 09:21:24 -0700 Subject: [PATCH 5/7] Expose seek to beginning logic as a function, create kafka_consumer with a group_id --- tests/_utils/kafka.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/tests/_utils/kafka.py b/tests/_utils/kafka.py index 2bb859e93f..1921e9e289 100644 --- a/tests/_utils/kafka.py +++ b/tests/_utils/kafka.py @@ -53,23 +53,28 @@ def kafka_bootstrap_servers_fixture(kafka_server: (subprocess.Popen, int)): # p yield f"localhost:{kafka_port}" -@pytest.fixture(name='kafka_consumer', scope='function') -def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"): - _kafka_consumer.subscribe([kafka_topics.output_topic]) - - # Wait until we have assigned partitions +def seek_to_beginning(kafka_consumer: "KafkaConsumer", timeout: int = PARTITION_ASSIGNMENT_TIMEOUT): + """ + Seeks to the beginning of the Kafka topic + """ start = time.time() - end = start + PARTITION_ASSIGNMENT_TIMEOUT + end = start + timeout partitions_assigned = False while not partitions_assigned and time.time() <= end: - _kafka_consumer.poll(timeout_ms=20) - partitions_assigned = len(_kafka_consumer.assignment()) > 0 + kafka_consumer.poll(timeout_ms=20) + partitions_assigned = len(kafka_consumer.assignment()) > 0 if not partitions_assigned: time.sleep(0.1) assert partitions_assigned - _kafka_consumer.seek_to_beginning() + kafka_consumer.seek_to_beginning() + + +@pytest.fixture(name='kafka_consumer', scope='function') +def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"): + _kafka_consumer.subscribe([kafka_topics.output_topic]) + seek_to_beginning(_kafka_consumer) yield _kafka_consumer @@ -103,7 +108,9 @@ def _init_pytest_kafka() -> (bool, Exception): 'zookeeper_proc', teardown_fn=teardown_fn, scope='session') - _kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server', scope='function') + _kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server', + scope='function', + group_id='morpheus_unittest_consumer') return (True, None) except Exception as e: From 3b5d9b98fe35f3b1854b36f74999d5ce29e9a09d Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Sep 2023 09:25:47 -0700 Subject: [PATCH 6/7] Move the offset checking from a stage to the test body to be executed after the pipeline completes. The reason is that the C++ impl commits the offsets after calling on_next, this also adds the ability to check async commits as well --- tests/test_kafka_source_stage_pipe.py | 94 ++++++++------------------- 1 file changed, 27 insertions(+), 67 deletions(-) diff --git a/tests/test_kafka_source_stage_pipe.py b/tests/test_kafka_source_stage_pipe.py index 35b332e4c7..83b635ff0d 100644 --- a/tests/test_kafka_source_stage_pipe.py +++ b/tests/test_kafka_source_stage_pipe.py @@ -24,6 +24,7 @@ from _utils import TEST_DIRS from _utils import assert_results +from _utils.kafka import seek_to_beginning from _utils.kafka import write_data_to_kafka from _utils.kafka import write_file_to_kafka from _utils.stages.dfp_length_checker import DFPLengthChecker @@ -93,77 +94,27 @@ def test_multi_topic_kafka_source_stage_pipe(config, kafka_bootstrap_servers: st assert_results(comp_stage.get_results()) -class OffsetChecker(SinglePortStage): - """ - Verifies that the kafka offsets are being updated as a way of verifying that the - consumer is performing a commit. - """ - - def __init__(self, c: Config, bootstrap_servers: str, group_id: str): - super().__init__(c) - - # Importing here so that running without the --run_kafka flag won't fail due - # to not having the kafka libs installed - from kafka import KafkaAdminClient - - self._client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) - self._group_id = group_id - self._offsets = None - - @property - def name(self) -> str: - return "morpheus_offset_checker" - - def accepted_types(self) -> typing.Tuple: - """ - Accepted input types for this stage are returned. - - Returns - ------- - typing.Tuple - Accepted input types. - - """ - return (typing.Any, ) - - def supports_cpp_node(self): - return False - - def _offset_checker(self, x): - at_least_one_gt = False - new_offsets = self._client.list_consumer_group_offsets(self._group_id) - - if self._offsets is not None: - for (topic_partition, prev_offset) in self._offsets.items(): - new_offset = new_offsets[topic_partition] - - assert new_offset.offset >= prev_offset.offset - - if new_offset.offset > prev_offset.offset: - at_least_one_gt = True - - assert at_least_one_gt - - self._offsets = new_offsets - - return x - - def _build_single(self, builder: mrc.Builder, input_stream): - node = builder.make_node(self.unique_name, ops.map(self._offset_checker)) - builder.make_edge(input_stream[0], node) - - return node, input_stream[1] - - @pytest.mark.kafka @pytest.mark.parametrize('num_records', [10, 100, 1000]) -def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str, - kafka_topics: typing.Tuple[str, str]) -> None: +def test_kafka_source_commit(num_records, + config, + kafka_bootstrap_servers: str, + kafka_topics: typing.Tuple[str, str], + kafka_consumer: "KafkaConsumer") -> None: + group_id = 'morpheus' data = [{'v': i} for i in range(num_records)] num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data) assert num_written == num_records + kafka_consumer.subscribe([kafka_topics.input_topic]) + seek_to_beginning(kafka_consumer) + partitions = kafka_consumer.assignment() + + # This method does not advance the consumer, and even if it did, this consumer has a different group_id than the + # source stage + expected_offsets = kafka_consumer.end_offsets(partitions) + pipe = LinearPipeline(config) pipe.set_source( KafkaSourceStage(config, @@ -171,12 +122,10 @@ def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str, input_topic=kafka_topics.input_topic, auto_offset_reset="earliest", poll_interval="1seconds", - group_id='morpheus', + group_id=group_id, client_id='morpheus_kafka_source_commit', stop_after=num_records, async_commits=False)) - - pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus')) pipe.add_stage(TriggerStage(config)) pipe.add_stage(DeserializeStage(config)) @@ -187,6 +136,17 @@ def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str, assert_results(comp_stage.get_results()) + from kafka import KafkaAdminClient + admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers, client_id='offset_checker') + offsets = admin_client.list_consumer_group_offsets(group_id) + + # The broker may have created additional partitions, offsets should be a superset of expected_offsets + for (topic_partition, expected_offset) in expected_offsets.items(): + # The value of the offsets dict being returned is a tuple of (offset, metadata), while the value of the + # expected_offsets is just the offset. + actual_offset = offsets[topic_partition][0] + assert actual_offset == expected_offset + @pytest.mark.kafka @pytest.mark.parametrize('num_records', [1000]) From 56332b189b8b82ae53f9f4c798d9568f1b563ed1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Sep 2023 09:31:11 -0700 Subject: [PATCH 7/7] Parametrize on async_commits --- tests/test_kafka_source_stage_pipe.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/test_kafka_source_stage_pipe.py b/tests/test_kafka_source_stage_pipe.py index 83b635ff0d..ea45cca35c 100644 --- a/tests/test_kafka_source_stage_pipe.py +++ b/tests/test_kafka_source_stage_pipe.py @@ -17,10 +17,8 @@ import os import typing -import mrc import pandas as pd import pytest -from mrc.core import operators as ops from _utils import TEST_DIRS from _utils import assert_results @@ -30,13 +28,15 @@ from _utils.stages.dfp_length_checker import DFPLengthChecker from morpheus.config import Config from morpheus.pipeline.linear_pipeline import LinearPipeline -from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.input.kafka_source_stage import KafkaSourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +if (typing.TYPE_CHECKING): + from kafka import KafkaConsumer + @pytest.mark.kafka def test_kafka_source_stage_pipe(config, kafka_bootstrap_servers: str, kafka_topics: typing.Tuple[str, str]) -> None: @@ -95,9 +95,11 @@ def test_multi_topic_kafka_source_stage_pipe(config, kafka_bootstrap_servers: st @pytest.mark.kafka +@pytest.mark.parametrize('async_commits', [True, False]) @pytest.mark.parametrize('num_records', [10, 100, 1000]) -def test_kafka_source_commit(num_records, - config, +def test_kafka_source_commit(num_records: int, + async_commits: bool, + config: Config, kafka_bootstrap_servers: str, kafka_topics: typing.Tuple[str, str], kafka_consumer: "KafkaConsumer") -> None: @@ -125,7 +127,7 @@ def test_kafka_source_commit(num_records, group_id=group_id, client_id='morpheus_kafka_source_commit', stop_after=num_records, - async_commits=False)) + async_commits=async_commits)) pipe.add_stage(TriggerStage(config)) pipe.add_stage(DeserializeStage(config))