diff --git a/lc_classification_step/scripts/run_step.py b/lc_classification_step/scripts/run_step.py index 1c1c97876..89331c5cc 100644 --- a/lc_classification_step/scripts/run_step.py +++ b/lc_classification_step/scripts/run_step.py @@ -6,7 +6,9 @@ PACKAGE_PATH = os.path.abspath(os.path.join(SCRIPT_PATH, "..")) sys.path.append(PACKAGE_PATH) -from settings import STEP_CONFIG +from settings import settings_creator + +STEP_CONFIG = settings_creator() level = logging.INFO if os.getenv("LOGGING_DEBUG"): diff --git a/lc_classification_step/settings.py b/lc_classification_step/settings.py index 0d260961d..76be964b2 100644 --- a/lc_classification_step/settings.py +++ b/lc_classification_step/settings.py @@ -7,146 +7,155 @@ from fastavro.schema import load_schema from fastavro.repository.base import SchemaRepositoryError -CONSUMER_CONFIG = { - "CLASS": os.getenv("CONSUMER_CLASS", "apf.consumers.KafkaConsumer"), - "TOPICS": os.environ["CONSUMER_TOPICS"].strip().split(","), - "PARAMS": { - "bootstrap.servers": os.environ["CONSUMER_SERVER"], - "group.id": os.environ["CONSUMER_GROUP_ID"], - "auto.offset.reset": "beginning", - "enable.partition.eof": bool(os.getenv("ENABLE_PARTITION_EOF", None)), - }, - "consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 0)), - "consume.messages": int(os.getenv("CONSUME_MESSAGES", 1000)), -} -try: - ELASTICC_SCHEMA = load_schema("schemas/output_elasticc.avsc") -except SchemaRepositoryError: - ELASTICC_SCHEMA = load_schema( - "lc_classification_step/schemas/output_elasticc.avsc" - ) -PRODUCER_CONFIG = { - "TOPIC_STRATEGY": { +def model_config_factory(): + modelclass = os.getenv("MODEL_CLASS") + config = configurator(modelclass) + return config + + +def settings_creator(): + CONSUMER_CONFIG = { + "CLASS": os.getenv("CONSUMER_CLASS", "apf.consumers.KafkaConsumer"), + "TOPICS": os.environ["CONSUMER_TOPICS"].strip().split(","), "PARAMS": { - "topic_format": os.environ["PRODUCER_TOPIC_FORMAT"], - "date_format": os.environ["PRODUCER_DATE_FORMAT"], - "change_hour": int(os.environ["PRODUCER_CHANGE_HOUR"]), - "retention_days": int(os.environ["PRODUCER_RETENTION_DAYS"]), + "bootstrap.servers": os.environ["CONSUMER_SERVER"], + "group.id": os.environ["CONSUMER_GROUP_ID"], + "auto.offset.reset": "beginning", + "enable.partition.eof": bool( + os.getenv("ENABLE_PARTITION_EOF", None) + ), + }, + "consume.timeout": int(os.getenv("CONSUME_TIMEOUT", 0)), + "consume.messages": int(os.getenv("CONSUME_MESSAGES", 1000)), + } + + try: + ELASTICC_SCHEMA = load_schema("schemas/output_elasticc.avsc") + except SchemaRepositoryError: + ELASTICC_SCHEMA = load_schema( + "lc_classification_step/schemas/output_elasticc.avsc" + ) + PRODUCER_CONFIG = { + "TOPIC_STRATEGY": { + "PARAMS": { + "topic_format": os.environ["PRODUCER_TOPIC_FORMAT"], + "date_format": os.environ["PRODUCER_DATE_FORMAT"], + "change_hour": int(os.environ["PRODUCER_CHANGE_HOUR"]), + "retention_days": int(os.environ["PRODUCER_RETENTION_DAYS"]), + }, + "CLASS": os.getenv( + "PRODUCER_TOPIC_STRATEGY_CLASS", + "apf.core.topic_management.DailyTopicStrategy", + ), + }, + "PARAMS": { + "bootstrap.servers": os.environ["PRODUCER_SERVER"], }, "CLASS": os.getenv( - "PRODUCER_TOPIC_STRATEGY_CLASS", - "apf.core.topic_management.DailyTopicStrategy", + "PRODUCER_CLASS", "apf.producers.kafka.KafkaProducer" ), - }, - "PARAMS": { - "bootstrap.servers": os.environ["PRODUCER_SERVER"], - }, - "CLASS": os.getenv("PRODUCER_CLASS", "apf.producers.kafka.KafkaProducer"), - "SCHEMA": SCHEMA - if os.getenv("STREAM", "ztf") == "ztf" - else ELASTICC_SCHEMA, -} - -SCRIBE_PRODUCER_CONFIG = { - "CLASS": "apf.producers.KafkaProducer", - "PARAMS": { - "bootstrap.servers": os.environ["SCRIBE_SERVER"], - }, - "TOPIC": os.environ["SCRIBE_TOPIC"], - "SCHEMA": SCRIBE_SCHEMA, -} + "SCHEMA": SCHEMA + if os.getenv("STREAM", "ztf") == "ztf" + else ELASTICC_SCHEMA, + } -METRICS_CONFIG = { - "CLASS": "apf.metrics.KafkaMetricsProducer", - "EXTRA_METRICS": [{"key": "aid", "alias": "aid"}, {"key": "candid"}], - "PARAMS": { + SCRIBE_PRODUCER_CONFIG = { + "CLASS": "apf.producers.KafkaProducer", "PARAMS": { - "bootstrap.servers": os.environ["METRICS_HOST"], + "bootstrap.servers": os.environ["SCRIBE_SERVER"], }, - "TOPIC": os.environ["METRICS_TOPIC"], - "SCHEMA": { - "type": "object", - "required": ["timestamp_sent", "timestamp_received"], - "properties": { - "timestamp_sent": { - "type": "string", - "description": "Timestamp sent refers to the time at which a message is sent.", - "default": "", - "examples": ["2020-09-01"], - }, - "timestamp_received": { - "type": "string", - "description": "Timestamp received refers to the time at which a message is received.", - "default": "", - "examples": ["2020-09-01"], + "TOPIC": os.environ["SCRIBE_TOPIC"], + "SCHEMA": SCRIBE_SCHEMA, + } + + METRICS_CONFIG = { + "CLASS": "apf.metrics.KafkaMetricsProducer", + "EXTRA_METRICS": [{"key": "aid", "alias": "aid"}, {"key": "candid"}], + "PARAMS": { + "PARAMS": { + "bootstrap.servers": os.environ["METRICS_HOST"], + }, + "TOPIC": os.environ["METRICS_TOPIC"], + "SCHEMA": { + "type": "object", + "required": ["timestamp_sent", "timestamp_received"], + "properties": { + "timestamp_sent": { + "type": "string", + "description": "Timestamp sent refers to the time at which a message is sent.", + "default": "", + "examples": ["2020-09-01"], + }, + "timestamp_received": { + "type": "string", + "description": "Timestamp received refers to the time at which a message is received.", + "default": "", + "examples": ["2020-09-01"], + }, }, + "additionalProperties": True, }, - "additionalProperties": True, }, - }, -} + } -if os.getenv("CONSUMER_KAFKA_USERNAME") and os.getenv( - "CONSUMER_KAFKA_PASSWORD" -): - CONSUMER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL" - CONSUMER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" - CONSUMER_CONFIG["PARAMS"]["sasl.username"] = os.getenv( - "CONSUMER_KAFKA_USERNAME" - ) - CONSUMER_CONFIG["PARAMS"]["sasl.password"] = os.getenv( + if os.getenv("CONSUMER_KAFKA_USERNAME") and os.getenv( "CONSUMER_KAFKA_PASSWORD" - ) -if os.getenv("PRODUCER_KAFKA_USERNAME") and os.getenv( - "PRODUCER_KAFKA_PASSWORD" -): - PRODUCER_CONFIG["PARAMS"]["security.protocol"] = os.getenv( - "PRODUCER_SECURITY_PROTOCOL", "SASL_PLAINTEXT" - ) - PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = os.getenv( - "PRODUCER_SASL_MECHANISM", "SCRAM-SHA-256" - ) - PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv( - "PRODUCER_KAFKA_USERNAME" - ) - PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv( + ): + CONSUMER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL" + CONSUMER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" + CONSUMER_CONFIG["PARAMS"]["sasl.username"] = os.getenv( + "CONSUMER_KAFKA_USERNAME" + ) + CONSUMER_CONFIG["PARAMS"]["sasl.password"] = os.getenv( + "CONSUMER_KAFKA_PASSWORD" + ) + if os.getenv("PRODUCER_KAFKA_USERNAME") and os.getenv( "PRODUCER_KAFKA_PASSWORD" - ) -if os.getenv("SCRIBE_KAFKA_USERNAME") and os.getenv("SCRIBE_KAFKA_PASSWORD"): - SCRIBE_PRODUCER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL" - SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" - SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv( - "SCRIBE_KAFKA_USERNAME" - ) - SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv( + ): + PRODUCER_CONFIG["PARAMS"]["security.protocol"] = os.getenv( + "PRODUCER_SECURITY_PROTOCOL", "SASL_PLAINTEXT" + ) + PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = os.getenv( + "PRODUCER_SASL_MECHANISM", "SCRAM-SHA-256" + ) + PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv( + "PRODUCER_KAFKA_USERNAME" + ) + PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv( + "PRODUCER_KAFKA_PASSWORD" + ) + if os.getenv("SCRIBE_KAFKA_USERNAME") and os.getenv( "SCRIBE_KAFKA_PASSWORD" - ) -if os.getenv("METRICS_KAFKA_USERNAME") and os.getenv("METRICS_KAFKA_PASSWORD"): - METRICS_CONFIG["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL" - METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" - METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv( - "METRICS_KAFKA_USERNAME" - ) - METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv( + ): + SCRIBE_PRODUCER_CONFIG["PARAMS"]["security.protocol"] = "SASL_SSL" + SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" + SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.username"] = os.getenv( + "SCRIBE_KAFKA_USERNAME" + ) + SCRIBE_PRODUCER_CONFIG["PARAMS"]["sasl.password"] = os.getenv( + "SCRIBE_KAFKA_PASSWORD" + ) + if os.getenv("METRICS_KAFKA_USERNAME") and os.getenv( "METRICS_KAFKA_PASSWORD" - ) - - -def model_config_factory(): - modelclass = os.getenv("MODEL_CLASS") - config = configurator(modelclass) - return config - - -STEP_CONFIG = { - "PROMETHEUS": bool(os.getenv("USE_PROMETHEUS", True)), - "SCRIBE_PRODUCER_CONFIG": SCRIBE_PRODUCER_CONFIG, - "CONSUMER_CONFIG": CONSUMER_CONFIG, - "PRODUCER_CONFIG": PRODUCER_CONFIG, - "METRICS_CONFIG": METRICS_CONFIG, - "MODEL_VERSION": os.getenv("MODEL_VERSION", "dev"), - "MODEL_CONFIG": model_config_factory(), - "SCRIBE_PARSER_CLASS": os.getenv("SCRIBE_PARSER_CLASS"), - "STEP_PARSER_CLASS": os.getenv("STEP_PARSER_CLASS"), -} + ): + METRICS_CONFIG["PARAMS"]["PARAMS"]["security.protocol"] = "SASL_SSL" + METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.mechanism"] = "SCRAM-SHA-512" + METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.username"] = os.getenv( + "METRICS_KAFKA_USERNAME" + ) + METRICS_CONFIG["PARAMS"]["PARAMS"]["sasl.password"] = os.getenv( + "METRICS_KAFKA_PASSWORD" + ) + STEP_CONFIG = { + "PROMETHEUS": bool(os.getenv("USE_PROMETHEUS", True)), + "SCRIBE_PRODUCER_CONFIG": SCRIBE_PRODUCER_CONFIG, + "CONSUMER_CONFIG": CONSUMER_CONFIG, + "PRODUCER_CONFIG": PRODUCER_CONFIG, + "METRICS_CONFIG": METRICS_CONFIG, + "MODEL_VERSION": os.getenv("MODEL_VERSION", "dev"), + "MODEL_CONFIG": model_config_factory(), + "SCRIBE_PARSER_CLASS": os.getenv("SCRIBE_PARSER_CLASS"), + "STEP_PARSER_CLASS": os.getenv("STEP_PARSER_CLASS"), + } + return STEP_CONFIG diff --git a/lc_classification_step/tests/integration/conftest.py b/lc_classification_step/tests/integration/conftest.py index 13d120d73..04aea85e1 100644 --- a/lc_classification_step/tests/integration/conftest.py +++ b/lc_classification_step/tests/integration/conftest.py @@ -87,6 +87,8 @@ def kafka_service(docker_ip, docker_services): @pytest.fixture def env_variables_ztf(): + envcopy = os.environ.copy() + def set_env_variables(): random_string = uuid.uuid4().hex env_variables_dict = { @@ -112,11 +114,14 @@ def set_env_variables(): for key, value in env_variables_dict.items(): os.environ[key] = value - return set_env_variables + yield set_env_variables + os.environ = envcopy @pytest.fixture def env_variables_elasticc(): + envcopy = os.environ.copy() + def set_env_variables( model: str, model_class: str, @@ -147,7 +152,8 @@ def set_env_variables( for key, value in env_variables_dict.items(): os.environ[key] = value - return set_env_variables + yield set_env_variables + os.environ = envcopy @pytest.fixture @@ -185,7 +191,7 @@ def _produce_messages(topic, SCHEMA): producer.produce(message) -@pytest.fixture(scope="session") +@pytest.fixture def kafka_consumer(): def factory( stream: str, @@ -212,14 +218,14 @@ def factory( return factory -@pytest.fixture(scope="session") +@pytest.fixture def scribe_consumer(): def factory(): consumer = KafkaConsumer( { "PARAMS": { "bootstrap.servers": "localhost:9092", - "group.id": "test_step_", + "group.id": f"test_step_{time.time()}", "auto.offset.reset": "beginning", "enable.partition.eof": True, }, diff --git a/lc_classification_step/tests/integration/test_step_balto.py b/lc_classification_step/tests/integration/test_step_balto.py index d96f5bf5e..c493bd6fb 100644 --- a/lc_classification_step/tests/integration/test_step_balto.py +++ b/lc_classification_step/tests/integration/test_step_balto.py @@ -10,7 +10,6 @@ assert_command_is_correct, assert_elasticc_object_is_correct, ) -from fastavro.repository.base import SchemaRepositoryError @pytest.mark.elasticc @@ -32,12 +31,12 @@ def test_step_elasticc_result( }, ) - from settings import STEP_CONFIG + from settings import settings_creator kconsumer = kafka_consumer("balto") sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): @@ -54,7 +53,7 @@ def test_step_elasticc_result( def test_step_schemaless( kafka_service, env_variables_elasticc, - kafka_consumer: Callable[[str], KafkaConsumer], + kafka_consumer: Callable[[str, str, dict], KafkaConsumer], scribe_consumer: Callable[[], KafkaConsumer], ): env_variables_elasticc( @@ -68,25 +67,16 @@ def test_step_schemaless( }, ) - from settings import STEP_CONFIG - - try: - kconsumer = kafka_consumer( - "balto_schemaless", - "apf.consumers.kafka.KafkaSchemalessConsumer", - {"SCHEMA_PATH": "schemas/output_elasticc.avsc"}, - ) - except SchemaRepositoryError: - kconsumer = kafka_consumer( - "balto_schemaless", - "apf.consumers.kafka.KafkaSchemalessConsumer", - { - "SCHEMA_PATH": "lc_classification_step/schemas/output_elasticc.avsc" - }, - ) + from settings import settings_creator + + kconsumer = kafka_consumer( + "balto_schemaless", + "apf.consumers.kafka.KafkaSchemalessConsumer", + {"SCHEMA_PATH": "schemas/output_elasticc.avsc"}, + ) sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): diff --git a/lc_classification_step/tests/integration/test_step_messi.py b/lc_classification_step/tests/integration/test_step_messi.py index ec9e3c63b..19563ca9c 100644 --- a/lc_classification_step/tests/integration/test_step_messi.py +++ b/lc_classification_step/tests/integration/test_step_messi.py @@ -37,12 +37,12 @@ def test_step_elasticc_result( }, ) - from settings import STEP_CONFIG + from settings import settings_creator kconsumer = kafka_consumer("messi") sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): diff --git a/lc_classification_step/tests/integration/test_step_mlp.py b/lc_classification_step/tests/integration/test_step_mlp.py index 49a73b6e1..7b34d5675 100644 --- a/lc_classification_step/tests/integration/test_step_mlp.py +++ b/lc_classification_step/tests/integration/test_step_mlp.py @@ -28,12 +28,12 @@ def test_step_elasticc_result_mlp( }, ) - from settings import STEP_CONFIG + from settings import settings_creator kconsumer = kafka_consumer("mlp") sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): @@ -64,12 +64,12 @@ def test_step_elasticc_result_mlp_without_features( }, ) - from settings import STEP_CONFIG + from settings import settings_creator kconsumer = kafka_consumer("mlp") sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): diff --git a/lc_classification_step/tests/integration/test_step_toretto.py b/lc_classification_step/tests/integration/test_step_toretto.py index 53f35771d..5542e9661 100644 --- a/lc_classification_step/tests/integration/test_step_toretto.py +++ b/lc_classification_step/tests/integration/test_step_toretto.py @@ -30,12 +30,12 @@ def test_step_elasticc_result( }, ) - from settings import STEP_CONFIG + from settings import settings_creator kconsumer = kafka_consumer("toretto") sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): diff --git a/lc_classification_step/tests/integration/test_step_ztf.py b/lc_classification_step/tests/integration/test_step_ztf.py index 5ee6da220..fbb3abcdb 100644 --- a/lc_classification_step/tests/integration/test_step_ztf.py +++ b/lc_classification_step/tests/integration/test_step_ztf.py @@ -22,12 +22,12 @@ def test_step_ztf_result( produce_messages("features_ztf") env_variables_ztf() - from settings import STEP_CONFIG + from settings import settings_creator kconsumer = kafka_consumer("ztf") sconsumer = scribe_consumer() - step = LateClassifier(config=STEP_CONFIG) + step = LateClassifier(config=settings_creator()) step.start() for message in kconsumer.consume(): diff --git a/libs/apf/apf/consumers/generic.py b/libs/apf/apf/consumers/generic.py index a1f8791f9..2d24e1e6e 100644 --- a/libs/apf/apf/consumers/generic.py +++ b/libs/apf/apf/consumers/generic.py @@ -9,7 +9,7 @@ class GenericConsumer(ABC): Parameters are passed through *config* as a :py:class:`dict` of params. """ - def __init__(self, config=None): + def __init__(self, config: dict): self.logger = logging.getLogger(f"alerce.{self.__class__.__name__}") self.logger.info(f"Creating {self.__class__.__name__}") self.config = config diff --git a/libs/apf/apf/consumers/kafka.py b/libs/apf/apf/consumers/kafka.py index 3ca9cbd74..0390c8e6e 100644 --- a/libs/apf/apf/consumers/kafka.py +++ b/libs/apf/apf/consumers/kafka.py @@ -1,5 +1,6 @@ +import datetime from apf.consumers.generic import GenericConsumer -from confluent_kafka import Consumer, KafkaException +from confluent_kafka import OFFSET_END, Consumer, KafkaException import fastavro import io @@ -121,7 +122,7 @@ class KafkaConsumer(GenericConsumer): consumer: Consumer - def __init__(self, config): + def __init__(self, config: dict): super().__init__(config) # Disable auto commit self.config["PARAMS"]["enable.auto.commit"] = False @@ -136,7 +137,7 @@ def __init__(self, config): self.dynamic_topic = False if self.config.get("TOPICS"): self.logger.info(f'Subscribing to {self.config["TOPICS"]}') - self.consumer.subscribe(self.config["TOPICS"]) + self.consumer.subscribe(self.config["TOPICS"], on_assign=self._on_assign) elif self.config.get("TOPIC_STRATEGY"): self.dynamic_topic = True module_name, class_name = self.config["TOPIC_STRATEGY"]["CLASS"].rsplit( @@ -149,7 +150,7 @@ def __init__(self, config): self.topics = self.topic_strategy.get_topics() self.logger.info(f'Using {self.config["TOPIC_STRATEGY"]}') self.logger.info(f"Subscribing to {self.topics}") - self.consumer.subscribe(self.topics) + self.consumer.subscribe(self.topics, on_assign=self._on_assign) else: raise Exception("No topics o topic strategy set. ") @@ -180,7 +181,7 @@ def _subscribe_to_new_topics(self): self.topics = self.topic_strategy.get_topics() self.consumer.unsubscribe() self.logger.info(f"Suscribing to {self.topics}") - self.consumer.subscribe(self.topics) + self.consumer.subscribe(self.topics, on_assign=self._on_assign) def set_basic_config(self, num_messages, timeout): if "consume.messages" in self.config: @@ -194,6 +195,15 @@ def set_basic_config(self, num_messages, timeout): timeout = self.config["TIMEOUT"] return num_messages, timeout + def _on_assign(self, consumer, partitions): + if self.config.get("offsets", {}).get("start", False): + self.logger.info(f"Running with offsets {self.config['offsets']}") + for partition in partitions: + partition.offset = self.config["offsets"]["start"] + partitions = consumer.offsets_for_times(partitions) + consumer.assign(partitions) + self.logger.debug(f"Assigned partitions {partitions}") + def _post_process(self, parsed, original_message): parsed["timestamp"] = original_message.timestamp()[1] return parsed @@ -229,6 +239,7 @@ def consume(self, num_messages=1, timeout=60): continue deserialized = [] + do_break = False for message in messages: if message.error(): if message.error().name() == "_PARTITION_EOF": @@ -237,6 +248,16 @@ def consume(self, num_messages=1, timeout=60): self.logger.exception(f"Error in kafka stream: {message.error()}") continue else: + if self.config.get("offsets"): + if message.timestamp()[1] >= self.config["offsets"].get( + "end", datetime.datetime.now().timestamp() + 1 + ): + self.logger.info( + "Reached end offset %s with timestamp %s", + self.config["offsets"]["end"], + message.timestamp()[1], + ) + break ds_message = self._deserialize_message(message) ds_message = self._post_process(ds_message, message) deserialized.append(ds_message) diff --git a/libs/apf/tests/consumers/conftest.py b/libs/apf/tests/consumers/conftest.py new file mode 100644 index 000000000..437c5b8f9 --- /dev/null +++ b/libs/apf/tests/consumers/conftest.py @@ -0,0 +1,28 @@ +from confluent_kafka.admin import AdminClient, NewTopic +import pytest + + +def is_responsive_kafka(url): + client = AdminClient({"bootstrap.servers": url}) + topics = ["sorting-hat"] + new_topics = [NewTopic(topic, num_partitions=1) for topic in topics] + fs = client.create_topics(new_topics) + for topic, f in fs.items(): + try: + f.result() + except Exception as e: + print(f"Can't create topic {topic}") + print(e) + return False + return True + + +@pytest.fixture(scope="session") +def kafka_service(docker_ip, docker_services): + """Ensure that Kafka service is up and responsive.""" + port = docker_services.port_for("kafka", 9092) + server = "{}:{}".format(docker_ip, port) + docker_services.wait_until_responsive( + timeout=30.0, pause=0.1, check=lambda: is_responsive_kafka(server) + ) + return server diff --git a/libs/apf/tests/consumers/test_core.py b/libs/apf/tests/consumers/test_core.py index f95deda3b..d96f97a68 100644 --- a/libs/apf/tests/consumers/test_core.py +++ b/libs/apf/tests/consumers/test_core.py @@ -4,6 +4,9 @@ class Consumer(GenericConsumer): + def __init__(self, config, **kwargs): + super().__init__(config, **kwargs) + def consume(self): yield {} @@ -11,7 +14,7 @@ def consume(self): class GenericConsumerTest(unittest.TestCase): component: GenericConsumer - def test_consume(self, use: GenericConsumer = Consumer()): + def test_consume(self, use: GenericConsumer = Consumer({})): self.component = use for msj in self.component.consume(): self.assertIsInstance(msj, dict) diff --git a/libs/apf/tests/consumers/test_kafka.py b/libs/apf/tests/consumers/test_kafka.py index 96511b93a..a5dc5cbca 100644 --- a/libs/apf/tests/consumers/test_kafka.py +++ b/libs/apf/tests/consumers/test_kafka.py @@ -1,3 +1,8 @@ +import logging +from typing import Generator +import uuid + +from apf.producers.kafka import KafkaProducer from .test_core import GenericConsumerTest from apf.consumers.kafka import ( KafkaJsonConsumer, @@ -15,6 +20,7 @@ ) import datetime import os +import pytest def consume(num_messages=1): @@ -238,3 +244,59 @@ def test_schemaless_deserialize_bad_message(self): with self.assertRaises(Exception): consumer._deserialize_message(schemaless_avro) + + +@pytest.fixture +def consumer(): + def initialize_consumer(topic: str, extra_config: dict = {}): + params = { + "TOPICS": [topic], + "PARAMS": { + "bootstrap.servers": "localhost:9092", + "group.id": uuid.uuid4().hex, + "enable.partition.eof": True, + "auto.offset.reset": "earliest", + }, + } + params.update(extra_config) + return KafkaConsumer(params) + + yield initialize_consumer + + +def test_consumer_with_offests(consumer, kafka_service, caplog): + caplog.set_level(logging.DEBUG) + producer = KafkaProducer( + { + "PARAMS": { + "bootstrap.servers": "localhost:9092", + }, + "TOPIC": "offset_tests", + "SCHEMA": { + "namespace": "example.avro", + "type": "record", + "name": "test", + "fields": [ + {"name": "id", "type": "int"}, + ], + }, + } + ) + producer.produce({"id": 1}, timestamp=10) + producer.produce({"id": 3}, timestamp=15) + producer.produce({"id": 2}, timestamp=20) + producer.produce({"id": 4}, timestamp=30) + producer.producer.flush() + kconsumer = consumer("offset_tests", {"offsets": {"start": 10, "end": 20}}) + messages = list(kconsumer.consume()) + assert len(messages) == 2 + assert messages[0]["id"] == 1 + assert messages[1]["id"] == 3 + kconsumer = consumer("offset_tests", {"offsets": {"start": 20}}) + messages = list(kconsumer.consume()) + assert len(messages) == 2 + assert messages[0]["id"] == 2 + assert messages[1]["id"] == 4 + kconsumer = consumer("offset_tests") + messages = list(kconsumer.consume()) + assert len(messages) == 4 diff --git a/libs/apf/tests/docker-compose.yml b/libs/apf/tests/docker-compose.yml new file mode 100644 index 000000000..79304ff41 --- /dev/null +++ b/libs/apf/tests/docker-compose.yml @@ -0,0 +1,21 @@ +version: "3" +services: + zookeeper: + image: "bitnami/zookeeper:latest" + ports: + - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + kafka: + image: "bitnami/kafka:3.3.1" + ports: + - "9092:9092" + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - zookeeper