diff --git a/tests/rptest/integration-tests/flink_test.py b/tests/rptest/integration-tests/flink_test.py new file mode 100644 index 000000000000..d4f2d15c7feb --- /dev/null +++ b/tests/rptest/integration-tests/flink_test.py @@ -0,0 +1,91 @@ +from workloads import Workload + + + + + + +from rptest.services.cluster import cluster +from rptest.clients.default import DefaultClient + +from rptest.clients.rpk import RpkTool, RpkException +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.redpanda import ResourceSettings + +# from pyflink.common.serialization import SimpleStringSchema +# from pyflink.datastream import StreamExecutionEnvironment +# from pyflink.datastream.connectors.kafka import KafkaSource +# from pyflink.datastream.connectors.python import StreamingFileSink + +# Import the Workload classes +from lib.workload import Workload, \ + NumberIncrementalWorkload # RealtimeWordCountWorkload, StreamAggregationWorkload, GeospatialDataProcessingWorkload + + +# class FlinkTest(RedpandaTest): +# def __init__(self, test_context): +# super(FlinkTest, +# self).__init__(test_context=test_context, +# num_brokers=3, +# resource_settings=ResourceSettings(num_cpus=1), +# ) +# +# def test_flink_integration(self): +# rpk = RpkTool(self.redpanda) +# rpk.create_topic("test_topic") + +# def test_flink_integration(): +# redpanda = RedpandaTest() +# rpk = RpkTool(redpanda) +# rpk.create_topic("test_topic") +# +# if __name__ == "__main__": +# t=FlinkTest() +''' +workload = NumberIncrementalWorkload() # Replace with the desired workload class +data = workload.generate_data(1000) # Generate 1000 records by default + +# Create a StreamExecutionEnvironment +env = StreamExecutionEnvironment.get_execution_environment() + +# Create a KafkaSource to read data from Redpanda +kafka_source = KafkaSource( + topics=["test-topic"], + brokers="localhost:9092", + value_deserializer=SimpleStringSchema() +) + +# Create a StreamTransformation to process the data +data_stream = env.add_source(kafka_source) + +if isinstance(workload, NumberIncrementalWorkload): + # Process the data for real-time word count + word_counts = data_stream.flat_map(lambda sentence: sentence.split()) \ + .map(lambda word: (word, 1)) \ + .key_by(lambda word_count: word_count[0]) \ + .reduce(lambda a, b: (a[0], a[1] + b[1])) + + # Print the word counts to the console + word_counts.print() + +elif isinstance(workload, StreamAggregationWorkload): + # Process the data for stream aggregation + avg_value = data_stream.map(lambda value: value[1]) \ + .reduce(lambda a, b: (a[0] + 1, a[1] + b)) + + # Print the average value to the console + avg_value.print() + +elif isinstance(workload, GeospatialDataProcessingWorkload): + # Process the data for geospatial data processing + avg_latitude, avg_longitude = data_stream.map(lambda point: (point[0], point[1])) \ + .reduce(lambda a, b: (a[0] + b[0], a[1] + b[1])) \ + .map(lambda avg_values: (avg_values[0] / 2, avg_values[1] / 2)) + + # Print the average latitude and longitude to the console + avg_latitude.add_sink(StreamingFileSink.for_row_format("avg_latitude.txt", SimpleStringSchema())) + avg_longitude.add_sink(StreamingFileSink.for_row_format("avg_longitude.txt", SimpleStringSchema())) + +# Execute the Flink job +env.execute("Workload Demo") +''' diff --git a/tests/rptest/integration-tests/lib/__init__.py b/tests/rptest/integration-tests/lib/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/rptest/integration-tests/lib/workloads.py b/tests/rptest/integration-tests/lib/workloads.py new file mode 100644 index 000000000000..f6e5ee94a4ad --- /dev/null +++ b/tests/rptest/integration-tests/lib/workloads.py @@ -0,0 +1,252 @@ +import datetime +import json +import random +import sys +import uuid +from kafka import KafkaProducer, KafkaConsumer + + +class TestProducer: + def __init__(self, topic="test-source-topic"): + """ + Kafka producer for generating test data. + + Parameters: + - topic (str): Kafka topic name. + """ + self.topic = topic + self.producer = KafkaProducer( + bootstrap_servers="localhost:9092", + value_serializer=lambda m: json.dumps(m).encode('ascii') + ) + + def send_callback(self, num_records, callback): + """ + Send a specified number of records to the Kafka topic using a callback function. + + Parameters: + - num_records (int): Number of records to send. + - callback (function): Callback function to generate data. + """ + for _ in range(num_records): + self.producer.send(self.topic, callback()) + return + + +class TestConsumer: + def __init__(self, group_id='test-group'): + """ + Kafka consumer for consuming test data. + + Parameters: + - group_id (str): Kafka consumer group ID. + """ + conf = { + 'bootstrap_servers': 'localhost:9092', + 'group_id': group_id, + 'auto_offset_reset': 'earliest', + } + self.consumer = KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('utf-8')), **conf) + + def consume_fixed_number_of_messages(self, topics, num_messages_to_consume): + """ + Consume a fixed number of messages from Kafka topics. + + Parameters: + - topics (list): List of Kafka topics. + - num_messages_to_consume (int): Number of messages to consume. + """ + messages_consumed = 0 # Counter for consumed messages + self.consumer.subscribe(topics) + + try: + while messages_consumed < num_messages_to_consume: + msg = self.consumer.poll(5.0) + + if msg is None: + continue + else: + print(f"Message from {topics} : {msg}") + messages_consumed += 1 # Increment the message counter + + finally: + # Close down consumer to commit final offsets. + self.consumer.close() + + def consume_from_topic(self, topics): + """ + Consume messages from Kafka topics. + + Parameters: + - topics (list): List of Kafka topics. + """ + try: + self.consumer.subscribe(topics) + + while True: + msg = self.consumer.poll(5.0) + if msg is None: + continue + else: + print(f"Message from {topics} : {msg}") + finally: + # Close down consumer to commit final offsets. + self.consumer.close() + + +class Workload: + def __init__(self, name, description): + """ + Base class for defining workloads. + + Parameters: + - name (str): Workload name. + - description (str): Workload description. + """ + self.name = name + self.description = description + + def generate_data(self, num_records): + """ + Generate test data. + + Parameters: + - num_records (int): Number of records to generate. + + Returns: + - data (list): List of generated data. + """ + raise NotImplementedError + + def verify_data(self, data): + """ + Verify test data. + + Parameters: + - data: Data to verify. + + Returns: + - result: Result of the verification. + """ + raise NotImplementedError + + +class NumberIncrementalWorkload(Workload): + def __init__(self): + super().__init__( + name="Number Incremental Workload with Timestamp", + description="Generates a stream of numbers with incremental values and timestamps." + ) + + def generate_data(self): + num_a = random.randint(1, 1000) + num_b = random.randint(1, 1000) + return { + "id": str(uuid.uuid4()), + "ts": str(datetime.datetime.now()), + "num_a": num_a, + "num_b": num_b, + "source_sum": num_a + num_b + } + + def verify_data(self, num_records): + """ + Verify topics data. + + Parameters: + - num_records (int): Number of records to verify. + """ + print("Validate topics data...") + + # Common Kafka consumer settings + kafka_settings = { + 'bootstrap_servers': 'localhost:9092', + 'group_id': 'test-group', + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'value_deserializer': lambda x: x.decode('utf-8'), + 'request_timeout_ms': 100000, + 'max_poll_records': 100 + } + + # Create Kafka consumers for source and sink topics + source_consumer = KafkaConsumer('test-source-topic', **kafka_settings) + sink_consumer = KafkaConsumer('test-sink-topic', **kafka_settings) + + print("Reading topics...") + source_topic_records = read_events_and_store(source_consumer, 'test-source-topic', num_records) + import time + time.sleep(5) + sink_topic_records = read_events_and_store(sink_consumer, 'test-sink-topic', num_records) + # print(f"source_topic_records : {source_topic_records}") + # print(f"sink_topic_records : {sink_topic_records}") + + errors = [] + if len(source_topic_records) == len(sink_topic_records): + for source_key, source_value in source_topic_records.items(): + if source_key in sink_topic_records.keys(): + if str(source_value["source_sum"]) == str(sink_topic_records[source_key]["sink_sum"]): + print( + f"Event match: source_sum = {source_value['source_sum']}, sink_sum = {sink_topic_records[source_key]['sink_sum']}") + else: + + msg = f"Event mismatch: {source_value['source_sum']}, {sink_topic_records[source_key]['sink_sum']}" + errors.append(msg) + if errors: + raise Exception(errors) + + +def read_events_and_store(consumer, topic, num_records): + """ + Read events from a Kafka topic and store them. + + Parameters: + - consumer: Kafka consumer. + - topic (str): Kafka topic name. + - num_records (int): Number of records to read. + + Returns: + - result: Stored events. + """ + result = {} # To store the events + + # Subscribe to the topic + consumer.subscribe([topic]) + + # Read messages from Kafka, print to stdout + try: + for _ in range(num_records): + msg = consumer.poll(100) + if msg is None: + continue + + for topic_partition, records in msg.items(): + for record in records: + # Access the 'value' attribute of the ConsumerRecord + record_value = record.value + + # Parse the JSON string in the 'value' attribute + record_data = json.loads(record_value) + # print(record_data) + + result[record_data['id']] = record_data + + except KeyboardInterrupt: + sys.stderr.write('%% Aborted by user\n') + + finally: + # Close down consumer to commit final offsets. + consumer.close() + + return result + + +if __name__ == '__main__': + prod = TestProducer() + + num_events = 8 + workload = NumberIncrementalWorkload() # Replace with the desired workload class + + prod.send_callback(num_events, workload.generate_data) + + workload.verify_data(num_events) diff --git a/tests/rptest/services/flink.py b/tests/rptest/services/flink.py new file mode 100644 index 000000000000..f8257561c644 --- /dev/null +++ b/tests/rptest/services/flink.py @@ -0,0 +1,99 @@ +import json +import logging +import sys + +from pyflink.common import SimpleStringSchema, WatermarkStrategy +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors import DeliveryGuarantee +from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink, KafkaOffsetsInitializer, \ + KafkaRecordSerializationSchema +from pyflink.datastream.formats.json import JsonRowSerializationSchema +from pyflink.datastream.functions import FlatMapFunction +from pyflink.common.typeinfo import Types +from pyflink.common import Row + + +# Get the absolute path to the JAR file +# jar_path = "file://" + os.path.abspath("../integration-tests/flink-connector-kafka-3.0.1-1.18.jar") + +# change the output type as specified +class SumNumbers(FlatMapFunction): + def flat_map(self, value): + events = json.loads(value) + print(f"Source events: {events}") + + # Convert the numbers to integers + try: + num1 = events["num_a"] + num2 = events["num_b"] + except ValueError: + # Handle invalid input + return + + # Calculate the sum of the numbers + sum = num1 + num2 + logging.info(f"Sum:{sum}") + + # Return the sum as a string + yield Row(id=events["id"], num_a=str(num1), num_b=str(num2), sink_sum=str(sum)) + + +def flink_job(env): + # create a RedPanda Source to read data from the input topic + redpanda_source = KafkaSource \ + .builder() \ + .set_bootstrap_servers("localhost:9092") \ + .set_group_id('test_group') \ + .set_topics("test-source-topic") \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \ + .build() + + # Read the data as a stream of strings + data_stream = env.from_source(redpanda_source, WatermarkStrategy.no_watermarks(), "kafka source") + KafkaSource.builder().set_topics("test-source-topic") + + # specify output type + sum_stream = data_stream.flat_map( + SumNumbers(), + output_type=Types.ROW_NAMED(field_names=["id", "num_a", "num_b", "sink_sum"], + field_types=[Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]), + ) + + # Define the output type for the sum_stream + output_type = Types.ROW_NAMED(field_names=["id", "num_a", "num_b", "sink_sum"], + field_types=[Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]) + + # set value serialization schema + redpanda_sink = ( + KafkaSink.builder() + .set_bootstrap_servers("localhost:9092") + .set_record_serializer( + KafkaRecordSerializationSchema.builder() + .set_topic("test-sink-topic") + .set_value_serialization_schema( + JsonRowSerializationSchema.builder() + .with_type_info(output_type) + .build() + ) + .build() + ) + .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build() + ) + + # Write the transformed data to the output topic + sum_stream.sink_to(redpanda_sink) + + # Execute the Flink job + print("Flink job for Sum Numbers") + env.execute("Flink job for Sum Numbers") + + +if __name__ == '__main__': + logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") + + # create a StreamExecutionEnvironment + env = StreamExecutionEnvironment.get_execution_environment() + + flink_job(env)