Skip to content

Commit

Permalink
updated the flink-integration code
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Nov 16, 2023
1 parent b311e91 commit 3873875
Show file tree
Hide file tree
Showing 5 changed files with 406 additions and 20 deletions.
83 changes: 63 additions & 20 deletions tests/rptest/integration-tests/flink_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,78 @@
from rptest.clients.rpk import RpkTool, RpkException
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import ResourceSettings
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.rpk_producer import RpkProducer

# from pyflink.common.serialization import SimpleStringSchema
# from pyflink.datastream import StreamExecutionEnvironment
# from pyflink.datastream.connectors.kafka import KafkaSource
# from pyflink.datastream.connectors.python import StreamingFileSink

# Import the Workload classes
from lib.workload import Workload, \
from workload import Workload, \
NumberIncrementalWorkload # RealtimeWordCountWorkload, StreamAggregationWorkload, GeospatialDataProcessingWorkload

# class FlinkTest(RedpandaTest):
# def __init__(self, test_context):
# super(FlinkTest,
# self).__init__(test_context=test_context,
# num_brokers=3,
# resource_settings=ResourceSettings(num_cpus=1),
# )
#
# def test_flink_integration(self):
# rpk = RpkTool(self.redpanda)
# rpk.create_topic("test_topic")

# def test_flink_integration():
# redpanda = RedpandaTest()
# rpk = RpkTool(redpanda)
# rpk.create_topic("test_topic")
#
# if __name__ == "__main__":
# t=FlinkTest()
class FlinkTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
self._ctx = test_ctx
self.producer = None
super(FlinkTest, self).__init__(
test_ctx,
num_brokers=3,
*args,
**kwargs)

def create_consumer(self,
topic,
group,
instance_name,
instance_id=None,
consumer_properties={}):
return KafkaCliConsumer(
self.test_context,
self.redpanda,
topic=topic,
group=group,
from_beginning=True,
instance_name=instance_name,
formatter_properties={
'print.value': 'false',
'print.key': 'false',
'print.partition': 'true',
'print.offset': 'true',
},
consumer_properties=FlinkTest.make_consumer_properties(
consumer_properties, instance_id))

def create_topic(self, p_cnt):
# create topic
self.topic_spec = TopicSpec(partition_count=p_cnt,
replication_factor=3)

self.client().create_topic(specs=self.topic_spec)

def start_producer(self, msg_cnt=5000):

# produce some messages to the topic
self.producer = RpkProducer(self._ctx, self.redpanda,
self.topic_spec.name, 128, msg_cnt, -1)
self.producer.start()

@cluster(num_nodes=3)
def test_flink_integration(self):
"""
Test validating that end to end flow of redpanda and flink together
"""
rpk = RpkTool(self.redpanda)
rpk.create_topic("test_topic")

redpanda = RedpandaTest()
redpanda.si_settings()

# below code will be uncommented gradually after debugging

'''
workload = NumberIncrementalWorkload() # Replace with the desired workload class
data = workload.generate_data(1000) # Generate 1000 records by default
Expand Down
1 change: 1 addition & 0 deletions tests/rptest/integration-tests/lib/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,4 @@ def read_events_and_store(consumer, topic, num_records):
prod.send_callback(num_events, workload.generate_data)

workload.verify_data(num_events)

53 changes: 53 additions & 0 deletions tests/rptest/integration-tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
apache-beam==2.48.0
apache-flink==1.18.0
apache-flink-libraries==1.18.0
avro-python3==1.10.2
beautifulsoup4==4.12.2
cachetools==5.3.2
certifi==2023.7.22
charset-normalizer==3.3.2
cloudpickle==2.2.1
confluent-kafka==2.3.0
crcmod==1.7
dill==0.3.1.1
dnspython==2.4.2
docopt==0.6.2
ducktape==0.11.4
fastavro==1.9.0
fasteners==0.19
find-libpython==0.3.1
google==3.0.0
google-api-core==2.13.0
google-auth==2.23.4
googleapis-common-protos==1.61.0
grpcio==1.59.2
hdfs==2.7.3
httplib2==0.22.0
idna==3.4
kafka-python==2.0.2
numpy==1.24.4
objsize==0.6.1
orjson==3.9.10
pandas==2.1.2
pemja==0.3.0
proto-plus==1.22.3
protobuf==4.23.4
py4j==0.10.9.7
pyarrow==11.0.0
pyasn1==0.5.0
pyasn1-modules==0.3.0
pydot==1.4.2
pyflink==1.0
pymongo==4.6.0
pyparsing==3.1.1
python-dateutil==2.8.2
pytz==2023.3.post1
regex==2023.10.3
requests==2.31.0
rsa==4.9
six==1.16.0
soupsieve==2.5
typing_extensions==4.8.0
tzdata==2023.3
urllib3==2.0.7
zstandard==0.22.0
Loading

0 comments on commit 3873875

Please sign in to comment.