From 37b920a7978a6c221551e70fdccc503b7d6702a2 Mon Sep 17 00:00:00 2001 From: el-tegy <67525407+el-tegy@users.noreply.github.com> Date: Sun, 26 Nov 2023 18:47:25 +0000 Subject: [PATCH] =?UTF-8?q?refactor:=20remove=20intial=20dataframe.=20>=20?= =?UTF-8?q?>=20Co-authored-by:=20Elis=C3=A9e=20TEGUE=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20Co-author?= =?UTF-8?q?ed-by:=20Mic=20E=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream_persona/src/spark_streaming.py | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/stream_persona/src/spark_streaming.py b/stream_persona/src/spark_streaming.py index c49c4da..8009394 100644 --- a/stream_persona/src/spark_streaming.py +++ b/stream_persona/src/spark_streaming.py @@ -28,24 +28,4 @@ def create_spark_session(): except Exception: logging.error("Couldn't create the spark session") - return spark - -def create_initial_dataframe(spark_session): - """ - Reads the streaming data and creates the initial dataframe accordingly. - """ - try: - # Gets the streaming data from topic random_names - df = spark_session \ - .readStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "kafka1:19092,kafka2:19093,kafka3:19094") \ - .option("subscribe", "random_names") \ - .option("delimeter",",") \ - .option("startingOffsets", "earliest") \ - .load() - logging.info("Initial dataframe created successfully") - except Exception as e: - logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") - - return df \ No newline at end of file + return spark \ No newline at end of file