From 328572d6df526a11a0204fcb11b66af90333ee3e Mon Sep 17 00:00:00 2001 From: el-tegy <67525407+el-tegy@users.noreply.github.com> Date: Sun, 26 Nov 2023 18:34:34 +0000 Subject: [PATCH 1/3] feat: add spark session creation func > > > Co-authored-by: Elisee TEGUE > Co-authored-by: Mic E --- stream_persona/src/spark_streaming.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 stream_persona/src/spark_streaming.py diff --git a/stream_persona/src/spark_streaming.py b/stream_persona/src/spark_streaming.py new file mode 100644 index 0000000..95e49c5 --- /dev/null +++ b/stream_persona/src/spark_streaming.py @@ -0,0 +1,26 @@ +import logging +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,StringType +from pyspark.sql.functions import from_json,col + +def create_spark_session(): + """ + Creates the Spark Session with suitable configs. + """ + try: + # Spark session is established with cassandra and kafka jars. Suitable versions can be found in Maven repository. + spark = SparkSession \ + .builder \ + .appName("SparkStructuredStreaming") \ + .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \ + .config("spark.cassandra.connection.host", "cassandra") \ + .config("spark.cassandra.connection.port","9042")\ + .config("spark.cassandra.auth.username", "cassandra") \ + .config("spark.cassandra.auth.password", "cassandra") \ + .getOrCreate() + spark.sparkContext.setLogLevel("ERROR") + logging.info('Spark session created successfully') + except Exception: + logging.error("Couldn't create the spark session") + + return spark \ No newline at end of file From 0d3b9143a03572e0c2c1a6f9b76c469537ebd4c7 Mon Sep 17 00:00:00 2001 From: Elisee TEGUE YOMBI Date: Sun, 26 Nov 2023 19:43:13 +0100 Subject: [PATCH 2/3] =?UTF-8?q?Refactor=20usability=20tests.=20>=20>=20Co-?= =?UTF-8?q?authored-by:=20Elis=C3=A9e=20TEGUE=20=20Co-authored-by:=20Mic=20E=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream_persona/src/spark_streaming.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/stream_persona/src/spark_streaming.py b/stream_persona/src/spark_streaming.py index 95e49c5..c49c4da 100644 --- a/stream_persona/src/spark_streaming.py +++ b/stream_persona/src/spark_streaming.py @@ -3,6 +3,11 @@ from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,StringType from pyspark.sql.functions import from_json,col +logging.basicConfig(level=logging.INFO, + format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s') +logger = logging.getLogger("spark_structured_streaming") + + def create_spark_session(): """ Creates the Spark Session with suitable configs. @@ -23,4 +28,24 @@ def create_spark_session(): except Exception: logging.error("Couldn't create the spark session") - return spark \ No newline at end of file + return spark + +def create_initial_dataframe(spark_session): + """ + Reads the streaming data and creates the initial dataframe accordingly. + """ + try: + # Gets the streaming data from topic random_names + df = spark_session \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "kafka1:19092,kafka2:19093,kafka3:19094") \ + .option("subscribe", "random_names") \ + .option("delimeter",",") \ + .option("startingOffsets", "earliest") \ + .load() + logging.info("Initial dataframe created successfully") + except Exception as e: + logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") + + return df \ No newline at end of file From 37b920a7978a6c221551e70fdccc503b7d6702a2 Mon Sep 17 00:00:00 2001 From: el-tegy <67525407+el-tegy@users.noreply.github.com> Date: Sun, 26 Nov 2023 18:47:25 +0000 Subject: [PATCH 3/3] =?UTF-8?q?refactor:=20remove=20intial=20dataframe.=20?= =?UTF-8?q?>=20>=20Co-authored-by:=20Elis=C3=A9e=20TEGUE=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20Co-?= =?UTF-8?q?authored-by:=20Mic=20E=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream_persona/src/spark_streaming.py | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/stream_persona/src/spark_streaming.py b/stream_persona/src/spark_streaming.py index c49c4da..8009394 100644 --- a/stream_persona/src/spark_streaming.py +++ b/stream_persona/src/spark_streaming.py @@ -28,24 +28,4 @@ def create_spark_session(): except Exception: logging.error("Couldn't create the spark session") - return spark - -def create_initial_dataframe(spark_session): - """ - Reads the streaming data and creates the initial dataframe accordingly. - """ - try: - # Gets the streaming data from topic random_names - df = spark_session \ - .readStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "kafka1:19092,kafka2:19093,kafka3:19094") \ - .option("subscribe", "random_names") \ - .option("delimeter",",") \ - .option("startingOffsets", "earliest") \ - .load() - logging.info("Initial dataframe created successfully") - except Exception as e: - logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") - - return df \ No newline at end of file + return spark \ No newline at end of file