diff --git a/tests/rptest/integration-tests/flink_test.py b/tests/rptest/integration-tests/flink_test.py index 9680978042f6a..7606a2fbbcca7 100644 --- a/tests/rptest/integration-tests/flink_test.py +++ b/tests/rptest/integration-tests/flink_test.py @@ -6,6 +6,9 @@ from rptest.clients.rpk import RpkTool, RpkException from rptest.tests.redpanda_test import RedpandaTest from rptest.services.redpanda import ResourceSettings +from rptest.clients.types import TopicSpec +from rptest.services.kafka_cli_consumer import KafkaCliConsumer +from rptest.services.rpk_producer import RpkProducer # from pyflink.common.serialization import SimpleStringSchema # from pyflink.datastream import StreamExecutionEnvironment @@ -13,28 +16,68 @@ # from pyflink.datastream.connectors.python import StreamingFileSink # Import the Workload classes -from lib.workload import Workload, \ +from workload import Workload, \ NumberIncrementalWorkload # RealtimeWordCountWorkload, StreamAggregationWorkload, GeospatialDataProcessingWorkload -# class FlinkTest(RedpandaTest): -# def __init__(self, test_context): -# super(FlinkTest, -# self).__init__(test_context=test_context, -# num_brokers=3, -# resource_settings=ResourceSettings(num_cpus=1), -# ) -# -# def test_flink_integration(self): -# rpk = RpkTool(self.redpanda) -# rpk.create_topic("test_topic") - -# def test_flink_integration(): -# redpanda = RedpandaTest() -# rpk = RpkTool(redpanda) -# rpk.create_topic("test_topic") -# -# if __name__ == "__main__": -# t=FlinkTest() +class FlinkTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + self._ctx = test_ctx + self.producer = None + super(FlinkTest, self).__init__( + test_ctx, + num_brokers=3, + *args, + **kwargs) + + def create_consumer(self, + topic, + group, + instance_name, + instance_id=None, + consumer_properties={}): + return KafkaCliConsumer( + self.test_context, + self.redpanda, + topic=topic, + group=group, + from_beginning=True, + instance_name=instance_name, + formatter_properties={ + 'print.value': 'false', + 'print.key': 'false', + 'print.partition': 'true', + 'print.offset': 'true', + }, + consumer_properties=FlinkTest.make_consumer_properties( + consumer_properties, instance_id)) + + def create_topic(self, p_cnt): + # create topic + self.topic_spec = TopicSpec(partition_count=p_cnt, + replication_factor=3) + + self.client().create_topic(specs=self.topic_spec) + + def start_producer(self, msg_cnt=5000): + + # produce some messages to the topic + self.producer = RpkProducer(self._ctx, self.redpanda, + self.topic_spec.name, 128, msg_cnt, -1) + self.producer.start() + + @cluster(num_nodes=3) + def test_flink_integration(self): + """ + Test validating that end to end flow of redpanda and flink together + """ + rpk = RpkTool(self.redpanda) + rpk.create_topic("test_topic") + + redpanda = RedpandaTest() + redpanda.si_settings() + + # below code will be uncommented gradually after debugging + ''' workload = NumberIncrementalWorkload() # Replace with the desired workload class data = workload.generate_data(1000) # Generate 1000 records by default diff --git a/tests/rptest/integration-tests/lib/workloads.py b/tests/rptest/integration-tests/lib/workloads.py index aa3c17fc935b9..847cf4649480a 100644 --- a/tests/rptest/integration-tests/lib/workloads.py +++ b/tests/rptest/integration-tests/lib/workloads.py @@ -259,3 +259,4 @@ def read_events_and_store(consumer, topic, num_records): prod.send_callback(num_events, workload.generate_data) workload.verify_data(num_events) + diff --git a/tests/rptest/integration-tests/requirements.txt b/tests/rptest/integration-tests/requirements.txt new file mode 100644 index 0000000000000..b28c795a5ea71 --- /dev/null +++ b/tests/rptest/integration-tests/requirements.txt @@ -0,0 +1,53 @@ +apache-beam==2.48.0 +apache-flink==1.18.0 +apache-flink-libraries==1.18.0 +avro-python3==1.10.2 +beautifulsoup4==4.12.2 +cachetools==5.3.2 +certifi==2023.7.22 +charset-normalizer==3.3.2 +cloudpickle==2.2.1 +confluent-kafka==2.3.0 +crcmod==1.7 +dill==0.3.1.1 +dnspython==2.4.2 +docopt==0.6.2 +ducktape==0.11.4 +fastavro==1.9.0 +fasteners==0.19 +find-libpython==0.3.1 +google==3.0.0 +google-api-core==2.13.0 +google-auth==2.23.4 +googleapis-common-protos==1.61.0 +grpcio==1.59.2 +hdfs==2.7.3 +httplib2==0.22.0 +idna==3.4 +kafka-python==2.0.2 +numpy==1.24.4 +objsize==0.6.1 +orjson==3.9.10 +pandas==2.1.2 +pemja==0.3.0 +proto-plus==1.22.3 +protobuf==4.23.4 +py4j==0.10.9.7 +pyarrow==11.0.0 +pyasn1==0.5.0 +pyasn1-modules==0.3.0 +pydot==1.4.2 +pyflink==1.0 +pymongo==4.6.0 +pyparsing==3.1.1 +python-dateutil==2.8.2 +pytz==2023.3.post1 +regex==2023.10.3 +requests==2.31.0 +rsa==4.9 +six==1.16.0 +soupsieve==2.5 +typing_extensions==4.8.0 +tzdata==2023.3 +urllib3==2.0.7 +zstandard==0.22.0 diff --git a/tests/rptest/integration-tests/workloads.py b/tests/rptest/integration-tests/workloads.py new file mode 100644 index 0000000000000..847cf4649480a --- /dev/null +++ b/tests/rptest/integration-tests/workloads.py @@ -0,0 +1,262 @@ +import datetime +import json +import random +import sys +import uuid +from kafka import KafkaProducer, KafkaConsumer + + +class TestProducer: + def __init__(self, topic="test-source-topic"): + """ + Kafka producer for generating test data. + + Parameters: + - topic (str): Kafka topic name. + """ + self.topic = topic + self.producer = KafkaProducer( + bootstrap_servers="localhost:9092", + value_serializer=lambda m: json.dumps(m).encode('ascii')) + + def send_callback(self, num_records, callback): + """ + Send a specified number of records to the Kafka topic using a callback function. + + Parameters: + - num_records (int): Number of records to send. + - callback (function): Callback function to generate data. + """ + for _ in range(num_records): + self.producer.send(self.topic, callback()) + return + + +class TestConsumer: + def __init__(self, group_id='test-group'): + """ + Kafka consumer for consuming test data. + + Parameters: + - group_id (str): Kafka consumer group ID. + """ + conf = { + 'bootstrap_servers': 'localhost:9092', + 'group_id': group_id, + 'auto_offset_reset': 'earliest', + } + self.consumer = KafkaConsumer( + value_deserializer=lambda m: json.loads(m.decode('utf-8')), **conf) + + def consume_fixed_number_of_messages(self, topics, + num_messages_to_consume): + """ + Consume a fixed number of messages from Kafka topics. + + Parameters: + - topics (list): List of Kafka topics. + - num_messages_to_consume (int): Number of messages to consume. + """ + messages_consumed = 0 # Counter for consumed messages + self.consumer.subscribe(topics) + + try: + while messages_consumed < num_messages_to_consume: + msg = self.consumer.poll(5.0) + + if msg is None: + continue + else: + print(f"Message from {topics} : {msg}") + messages_consumed += 1 # Increment the message counter + + finally: + # Close down consumer to commit final offsets. + self.consumer.close() + + def consume_from_topic(self, topics): + """ + Consume messages from Kafka topics. + + Parameters: + - topics (list): List of Kafka topics. + """ + try: + self.consumer.subscribe(topics) + + while True: + msg = self.consumer.poll(5.0) + if msg is None: + continue + else: + print(f"Message from {topics} : {msg}") + finally: + # Close down consumer to commit final offsets. + self.consumer.close() + + +class Workload: + def __init__(self, name, description): + """ + Base class for defining workloads. + + Parameters: + - name (str): Workload name. + - description (str): Workload description. + """ + self.name = name + self.description = description + + def generate_data(self, num_records): + """ + Generate test data. + + Parameters: + - num_records (int): Number of records to generate. + + Returns: + - data (list): List of generated data. + """ + raise NotImplementedError + + def verify_data(self, data): + """ + Verify test data. + + Parameters: + - data: Data to verify. + + Returns: + - result: Result of the verification. + """ + raise NotImplementedError + + +class NumberIncrementalWorkload(Workload): + def __init__(self): + super().__init__( + name="Number Incremental Workload with Timestamp", + description= + "Generates a stream of numbers with incremental values and timestamps." + ) + + def generate_data(self): + num_a = random.randint(1, 1000) + num_b = random.randint(1, 1000) + return { + "id": str(uuid.uuid4()), + "ts": str(datetime.datetime.now()), + "num_a": num_a, + "num_b": num_b, + "source_sum": num_a + num_b + } + + def verify_data(self, num_records): + """ + Verify topics data. + + Parameters: + - num_records (int): Number of records to verify. + """ + print("Validate topics data...") + + # Common Kafka consumer settings + kafka_settings = { + 'bootstrap_servers': 'localhost:9092', + 'group_id': 'test-group', + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'value_deserializer': lambda x: x.decode('utf-8'), + 'request_timeout_ms': 100000, + 'max_poll_records': 100 + } + + # Create Kafka consumers for source and sink topics + source_consumer = KafkaConsumer('test-source-topic', **kafka_settings) + sink_consumer = KafkaConsumer('test-sink-topic', **kafka_settings) + + print("Reading topics...") + source_topic_records = read_events_and_store(source_consumer, + 'test-source-topic', + num_records) + import time + time.sleep(5) + sink_topic_records = read_events_and_store(sink_consumer, + 'test-sink-topic', + num_records) + # print(f"source_topic_records : {source_topic_records}") + # print(f"sink_topic_records : {sink_topic_records}") + + errors = [] + if len(source_topic_records) == len(sink_topic_records): + for source_key, source_value in source_topic_records.items(): + if source_key in sink_topic_records.keys(): + if str(source_value["source_sum"]) == str( + sink_topic_records[source_key]["sink_sum"]): + print( + f"Event match: source_sum = {source_value['source_sum']}, sink_sum = {sink_topic_records[source_key]['sink_sum']}" + ) + else: + + msg = f"Event mismatch: {source_value['source_sum']}, {sink_topic_records[source_key]['sink_sum']}" + errors.append(msg) + if errors: + raise Exception(errors) + + +def read_events_and_store(consumer, topic, num_records): + """ + Read events from a Kafka topic and store them. + + Parameters: + - consumer: Kafka consumer. + - topic (str): Kafka topic name. + - num_records (int): Number of records to read. + + Returns: + - result: Stored events. + """ + result = {} # To store the events + + # Subscribe to the topic + consumer.subscribe([topic]) + + # Read messages from Kafka, print to stdout + try: + for _ in range(num_records): + msg = consumer.poll(100) + if msg is None: + continue + + for topic_partition, records in msg.items(): + for record in records: + # Access the 'value' attribute of the ConsumerRecord + record_value = record.value + + # Parse the JSON string in the 'value' attribute + record_data = json.loads(record_value) + # print(record_data) + + result[record_data['id']] = record_data + + except KeyboardInterrupt: + sys.stderr.write('%% Aborted by user\n') + + finally: + # Close down consumer to commit final offsets. + consumer.close() + + return result + + +if __name__ == '__main__': + prod = TestProducer() + + num_events = 8 + workload = NumberIncrementalWorkload( + ) # Replace with the desired workload class + + prod.send_callback(num_events, workload.generate_data) + + workload.verify_data(num_events) + diff --git a/tests/rptest/services/flink.py b/tests/rptest/services/flink.py index 7687363221cfa..539b7e2779a55 100644 --- a/tests/rptest/services/flink.py +++ b/tests/rptest/services/flink.py @@ -2,6 +2,11 @@ import logging import sys + +from ducktape.cluster.remoteaccount import RemoteCommandError +from ducktape.services.service import Service +from rptest.util import wait_until_result + from pyflink.common import SimpleStringSchema, WatermarkStrategy from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import DeliveryGuarantee @@ -16,6 +21,12 @@ # jar_path = "file://" + os.path.abspath("../integration-tests/flink-connector-kafka-3.0.1-1.18.jar") +class FlinkError(Exception): + """Exception used by Flink services + """ + def __init__(self, error): + super(FlinkError, self).__init__(error) + # change the output type as specified class SumNumbers(FlatMapFunction): def flat_map(self, value): @@ -40,6 +51,22 @@ def flat_map(self, value): num_b=str(num2), sink_sum=str(sum)) +class FlinkServer(Service): + """ + Service used to start and interact with the Flink + """ + def __init__(self, context): + super(FlinkServer, self).__init__(context, num_nodes=1) + + def start_flink(self, node): + cmd = self._start_cmd() + self.logger.debug(f'Starting Flink Server with cmd "{cmd}"') + node.account.ssh(cmd, allow_fail=False) + + def stop_flink(self, node): + cmd = self._stop_cmd() + self.logger.debug(f'Stopping Flink server with cmd "{cmd}"') + node.account.ssh(cmd, allow_fail=True) def flink_job(env): # create a RedPanda Source to read data from the input topic