Skip to content

Commit

Permalink
refactor: remove intial dataframe.
Browse files Browse the repository at this point in the history
>
>
Co-authored-by: Elisée TEGUE <[email protected]>                                                                            Co-authored-by: Mic E <[email protected]>
  • Loading branch information
el-tegy committed Nov 26, 2023
1 parent 0d3b914 commit 37b920a
Showing 1 changed file with 1 addition and 21 deletions.
22 changes: 1 addition & 21 deletions stream_persona/src/spark_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,4 @@ def create_spark_session():
except Exception:
logging.error("Couldn't create the spark session")

return spark

def create_initial_dataframe(spark_session):
"""
Reads the streaming data and creates the initial dataframe accordingly.
"""
try:
# Gets the streaming data from topic random_names
df = spark_session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:19092,kafka2:19093,kafka3:19094") \
.option("subscribe", "random_names") \
.option("delimeter",",") \
.option("startingOffsets", "earliest") \
.load()
logging.info("Initial dataframe created successfully")
except Exception as e:
logging.warning(f"Initial dataframe couldn't be created due to exception: {e}")

return df
return spark

0 comments on commit 37b920a

Please sign in to comment.